+/*
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ *
+ * Copyright 2025 Olaf Wintermann. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "message.h"
+
+UiMessageHandler* uic_simple_msg_handler(int in, int out, msg_received_callback callback) {
+ UiSimpleMessageHandler *handler = malloc(sizeof(UiSimpleMessageHandler));
+ handler->handler.start = uic_simple_msg_handler_start;
+ handler->handler.stop = uic_simple_msg_handler_stop;
+ handler->handler.send = uic_simple_msg_handler_send;
+ handler->handler.callback = callback;
+ handler->in = in;
+ handler->out = out;
+ handler->outbuf = cxBufferCreate(NULL, 4096, NULL, CX_BUFFER_FREE_CONTENTS | CX_BUFFER_AUTO_EXTEND);
+ handler->stop = 0;
+ pthread_mutex_init(&handler->queue_lock, NULL);
+ pthread_mutex_init(&handler->avlbl_lock, NULL);
+ pthread_cond_init(&handler->available, NULL);
+ return (UiMessageHandler*)handler;
+}
+
+int uic_simple_msg_handler_start(UiMessageHandler *handler) {
+ UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
+ if(pthread_create(&sh->in_thread, NULL, uic_simple_msg_handler_in_thread, sh)) {
+ return 1;
+ }
+ if(pthread_create(&sh->out_thread, NULL, uic_simple_msg_handler_out_thread, sh)) {
+ return 1;
+ }
+ return 0;
+}
+
+int uic_simple_msg_handler_stop(UiMessageHandler *handler) {
+ UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
+ pthread_mutex_lock(&sh->queue_lock);
+ sh->stop = 0;
+ pthread_cond_signal(&sh->available);
+ pthread_mutex_unlock(&sh->queue_lock);
+ close(sh->in);
+ sh->in = -1;
+
+ pthread_join(sh->in_thread, NULL);
+ pthread_join(sh->out_thread, NULL);
+
+ return 0;
+}
+
+int uic_simple_msg_handler_send(UiMessageHandler *handler, cxstring msg) {
+ UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
+ pthread_mutex_lock(&sh->queue_lock);
+ cxBufferWrite(msg.ptr, 1, msg.length, sh->outbuf);
+ pthread_cond_signal(&sh->available);
+ pthread_mutex_unlock(&sh->queue_lock);
+ return 0;
+}
+
+#define HEADERBUF_SIZE 64
+
+void* uic_simple_msg_handler_in_thread(void *data) {
+ UiSimpleMessageHandler *handler = data;
+
+ char *msg = NULL;
+ size_t msg_size = 0;
+ size_t msg_pos = 0; // currently received message length
+
+ char headerbuf[HEADERBUF_SIZE];
+ size_t headerpos = 0;
+
+ char buf[2048];
+ ssize_t r;
+ while((r = read(handler->in, buf, 2024)) > 0) {
+ char *buffer = buf;
+ size_t available = r;
+
+ while(available > 0) {
+ if(msg) {
+ // read message
+ size_t need = msg_size - msg_pos;
+ size_t cplen = r > need ? need : available;
+ memcpy(msg+msg_pos, buffer, cplen);
+ buffer += cplen;
+ available -= cplen;
+ msg_pos += cplen;
+ if(msg_pos == msg_size) {
+ // message complete
+ //fprintf(stderr, "send: %.*s\n", (int)msg_size, msg);
+ if(handler->handler.callback) {
+ handler->handler.callback(cx_mutstrn(msg, msg_size));
+ }
+ msg = NULL;
+ msg_size = 0;
+ msg_pos = 0;
+ }
+ } else {
+ size_t header_max = HEADERBUF_SIZE - headerpos - 1;
+ if(header_max > available) {
+ header_max = available;
+ }
+ // search for line break
+ int i;
+ int header_complete = 0;
+ for(i=0;i<header_max;i++) {
+ if(buffer[i] == '\n') {
+ header_complete = 1;
+ break;
+ }
+ }
+ i++;
+ memcpy(headerbuf+headerpos, buffer, i);
+ headerpos += i;
+ buffer += i;
+ available -= i;
+
+ if(header_complete) {
+ headerbuf[headerpos-1] = 0; // terminate buffer
+ char *end;
+ long length = strtol(headerbuf, &end, 10);
+ if(*end == '\0') {
+ //fprintf(stderr, "header: %d\n", (int)length);
+ msg = malloc(length);
+ msg_size = length;
+ headerpos = 0;
+ } else {
+ fprintf(stderr, "Error: invalid message {%s}\n", headerbuf);
+ }
+ } else if(headerpos+1 >= HEADERBUF_SIZE) {
+ fprintf(stderr, "Error: message header too big\n");
+ exit(-1);
+ }
+ }
+ }
+
+
+ }
+ perror("error");
+ fprintf(stderr, "stop simple_msg_handler_in_thread\n");
+
+ return NULL;
+}
+
+void* uic_simple_msg_handler_out_thread(void *data) {
+ UiSimpleMessageHandler *handler = data;
+ CxBuffer *buffer = handler->outbuf;
+
+ pthread_mutex_lock(&handler->queue_lock);
+
+ for(;;) {
+ if(buffer->pos == 0) {
+ pthread_cond_wait(&handler->available, &handler->queue_lock);
+ continue;
+ } else {
+ size_t n = buffer->pos;
+ size_t pos = 0;
+ while(n > 0) {
+ ssize_t w = write(handler->out, buffer->space + pos, n);
+ if(w <= 0) {
+ fprintf(stderr, "Error: output error\n");
+ break;
+ }
+ n -= w;
+ pos += w;
+ }
+ if(n > 0) {
+ break; // error
+ }
+ buffer->pos = 0;
+ }
+ }
+
+ pthread_mutex_unlock(&handler->queue_lock);
+
+ return NULL;
+}