#184 untested implementation of the flush feature

2022-05-01

author
Mike Becker <universe@uap-core.de>
date
Sun, 01 May 2022 13:14:31 +0200 (2022-05-01)
changeset 544
2e73456e5f84
parent 543
7b9114030ca4
child 545
3700ac4bd9a3

#184 untested implementation of the flush feature

src/buffer.c file | annotate | diff | comparison | revisions
--- a/src/buffer.c	Sun May 01 11:54:10 2022 +0200
+++ b/src/buffer.c	Sun May 01 13:14:31 2022 +0200
@@ -134,6 +134,41 @@
     }
 }
 
+/**
+ * Helps flushing data to the flush target of a buffer.
+ *
+ * @param buffer the buffer containing the config
+ * @param space the data to flush
+ * @param size the element size
+ * @param nitems the number of items
+ * @return the number of items flushed
+ */
+static size_t cx_buffer_write_flush_helper(
+        CxBuffer *buffer,
+        unsigned char const *space,
+        size_t size,
+        size_t nitems
+) {
+    size_t pos = 0;
+    size_t remaining = nitems;
+    size_t max_items = buffer->flush_blksize / size;
+    while (remaining > 0) {
+        size_t items = remaining > max_items ? max_items : remaining;
+        size_t flushed = buffer->flush_func(
+                space + pos,
+                size, items,
+                buffer->flush_target);
+        if (flushed > 0) {
+            pos += (flushed * size);
+            remaining -= flushed;
+        } else {
+            // if no bytes can be flushed out anymore, we give up
+            break;
+        }
+    }
+    return nitems - remaining;
+}
+
 size_t cxBufferWrite(
         void const *ptr,
         size_t size,
@@ -151,6 +186,7 @@
     }
 
     size_t len;
+    size_t nitems_out = nitems;
     if (cx_szmul(size, nitems, &len)) {
         return 0;
     }
@@ -178,7 +214,7 @@
                 if (size > 1) {
                     len -= len % size;
                 }
-                nitems = len / size;
+                nitems_out = len / size;
             }
         }
     }
@@ -188,24 +224,52 @@
     }
 
     if (perform_flush) {
-        // TODO: implement flushing
-        // (1) determine how many bytes to flush (use flushmax = blkmax * blksize)
-        // (2) if len is larger than the number computed in (1) we need more flush cycles, compute how many
-        // (3) determine how many bytes from the buffer shall be flushed
-        // (4) if something remains in the buffer, shift the buffer to the left
-        // (4a) if buffer was shifted, append the new data to the buffer
-        // (4b) if the buffer was flushed entirely AND the new data also fits into flushmax,
-        //      directly write the new data to the flush sink
-        return 0; // remove this after implementation
+        size_t flush_max;
+        if (cx_szmul(buffer->flush_blkmax, buffer->flush_blksize, &flush_max)) {
+            return 0;
+        }
+        size_t flush_pos = buffer->flush_func == NULL || buffer->flush_target == NULL
+                           ? buffer->pos
+                           : cx_buffer_write_flush_helper(buffer, buffer->bytes, 1, buffer->pos);
+        if (flush_pos == buffer->pos) {
+            // entire buffer has been flushed, we can reset
+            buffer->size = buffer->pos = 0;
+
+            size_t items_flush; // how many items can also be directly flushed
+            size_t items_keep; // how many items have to be written to the buffer
+
+            items_flush = flush_max >= required ? nitems : (flush_max - flush_pos) / size;
+            if (items_flush > 0) {
+                items_flush = cx_buffer_write_flush_helper(buffer, ptr, size, items_flush / size);
+                // in case we could not flush everything, keep the rest
+            }
+            items_keep = nitems - items_flush;
+            if (items_keep > 0) {
+                // try again with the remaining stuff
+                unsigned char const *new_ptr = ptr;
+                new_ptr += items_flush * size;
+                return cxBufferWrite(new_ptr, size, items_keep, buffer);
+            } else {
+                // all items have been flushed - report them as written
+                return nitems;
+            }
+        } else if (flush_pos == 0) {
+            // nothing could be flushed at all, we immediately give up without writing any data
+            return 0;
+        } else {
+            // we were partially successful, we have shift left and try again
+            cxBufferShiftLeft(buffer, flush_pos);
+            return cxBufferWrite(ptr, size, nitems, buffer);
+        }
     } else {
         memcpy(buffer->bytes + buffer->pos, ptr, len);
         buffer->pos += len;
         if (buffer->pos > buffer->size) {
             buffer->size = buffer->pos;
         }
+        return nitems_out;
     }
 
-    return nitems;
 }
 
 int cxBufferPut(

mercurial