#184 start implementation of the flush feature

2022-04-30

author
Mike Becker <universe@uap-core.de>
date
Sat, 30 Apr 2022 09:47:20 +0200 (2022-04-30)
changeset 539
9cd98da9ee17
parent 538
2cfbcbe86a7c
child 540
47e0f2237a94

#184 start implementation of the flush feature

src/buffer.c file | annotate | diff | comparison | revisions
src/cx/buffer.h file | annotate | diff | comparison | revisions
test/test_buffer.cpp file | annotate | diff | comparison | revisions
--- a/src/buffer.c	Sat Apr 30 09:03:17 2022 +0200
+++ b/src/buffer.c	Sat Apr 30 09:47:20 2022 +0200
@@ -32,6 +32,7 @@
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
+#include <stdint.h>
 
 int cxBufferInit(
         CxBuffer *buffer,
@@ -53,8 +54,12 @@
     }
     buffer->capacity = capacity;
     buffer->size = 0;
+    buffer->pos = 0;
 
-    buffer->pos = 0;
+    buffer->flush_func = NULL;
+    buffer->flush_blkmax = 0;
+    buffer->flush_blksize = 4096;
+    buffer->flush_threshold = SIZE_MAX;
 
     return 0;
 }
@@ -134,6 +139,7 @@
         size_t nitems,
         CxBuffer *buffer
 ) {
+    // TODO: optimize for special case size == nitems == 1
     size_t len;
     if (cx_szmul(size, nitems, &len)) {
         return 0;
@@ -143,15 +149,27 @@
         return 0;
     }
 
+    bool perform_flush = false;
     if (required > buffer->capacity) {
-        if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND) {
-            if (cxBufferMinimumCapacity(buffer, required)) {
-                return 0;
+        if ((buffer->flags & CX_BUFFER_AUTO_EXTEND) == CX_BUFFER_AUTO_EXTEND && required) {
+            if (buffer->flush_blkmax > 0 && required > buffer->flush_threshold) {
+                perform_flush = true;
+            } else {
+                if (cxBufferMinimumCapacity(buffer, required)) {
+                    return 0;
+                }
             }
         } else {
-            len = buffer->capacity - buffer->pos;
-            if (size > 1) {
-                len -= len % size;
+            if (buffer->flush_blkmax > 0) {
+                perform_flush = true;
+            } else {
+                // truncate data to be written, if we can neither extend nor flush
+                len = buffer->capacity - buffer->pos;
+                if (size > 1) {
+                    // TODO: this is bugged - it would only discard one element and not as many as required
+                    len -= len % size;
+                    nitems = len / size;
+                }
             }
         }
     }
@@ -160,13 +178,25 @@
         return len;
     }
 
-    memcpy(buffer->bytes + buffer->pos, ptr, len);
-    buffer->pos += len;
-    if (buffer->pos > buffer->size) {
-        buffer->size = buffer->pos;
+    if (perform_flush) {
+        // TODO: implement flushing
+        // (1) determine how many bytes to flush (use flushmax = blkmax * blksize)
+        // (2) if len is larger than the number computed in (1) we need more flush cycles, compute how many
+        // (3) determine how many bytes from the buffer shall be flushed
+        // (4) if something remains in the buffer, shift the buffer to the left
+        // (4a) if buffer was shifted, append the new data to the buffer
+        // (4b) if the buffer was flushed entirely AND the new data also fits into flushmax,
+        //      directly write the new data to the flush sink
+        return 0; // remove this after implementation
+    } else {
+        memcpy(buffer->bytes + buffer->pos, ptr, len);
+        buffer->pos += len;
+        if (buffer->pos > buffer->size) {
+            buffer->size = buffer->pos;
+        }
     }
 
-    return len / size;
+    return nitems;
 }
 
 int cxBufferPut(
--- a/src/cx/buffer.h	Sat Apr 30 09:03:17 2022 +0200
+++ b/src/cx/buffer.h	Sat Apr 30 09:47:20 2022 +0200
@@ -91,6 +91,40 @@
     /** Current size of the buffer content. */
     size_t size;
     /**
+     * The buffer may not extend beyond this threshold before starting to flush.
+     * Default is \c SIZE_MAX (flushing disabled when auto extension is enabled).
+     */
+    size_t flush_threshold;
+    /**
+     * The block size for the elements to flush.
+     * Default is 4096 bytes.
+     */
+    size_t flush_blksize;
+    /**
+     * The maximum number of blocks to flush in one cycle.
+     * Zero disables flushing entirely (this is the default).
+     * Set this to \c SIZE_MAX to flush the entire buffer.
+     *
+     * @attention if the maximum number of blocks multiplied with the block size
+     * is smaller than the expected contents written to this buffer within one write
+     * operation, multiple flush cycles are performed after that write.
+     * That means the total number of blocks flushed after one write to this buffer may
+     * be larger than \c flush_blkmax.
+     */
+    size_t flush_blkmax;
+
+    /**
+     * The write function used for flushing.
+     * If NULL, the flushed content gets discarded.
+     */
+    size_t (*flush_func)(
+            void const *,
+            size_t,
+            size_t,
+            void *
+    );
+
+    /**
      * Flag register for buffer features.
      * @see #CX_BUFFER_DEFAULT
      * @see #CX_BUFFER_FREE_CONTENTS
--- a/test/test_buffer.cpp	Sat Apr 30 09:03:17 2022 +0200
+++ b/test/test_buffer.cpp	Sat Apr 30 09:47:20 2022 +0200
@@ -31,11 +31,19 @@
 #include <gtest/gtest.h>
 #include "util_allocator.h"
 
+static void expect_default_flush_config(CxBuffer *buf) {
+    EXPECT_EQ(buf->flush_blkmax, 0);
+    EXPECT_EQ(buf->flush_blksize, 4096);
+    EXPECT_EQ(buf->flush_threshold, SIZE_MAX);
+    EXPECT_EQ(buf->flush_func, nullptr);
+}
+
 TEST(BufferInit, WrapSpace) {
     CxTestingAllocator alloc;
     CxBuffer buf;
     void *space = cxMalloc(&alloc, 16);
     cxBufferInit(&buf, space, 16, &alloc, CX_BUFFER_DEFAULT);
+    expect_default_flush_config(&buf);
     EXPECT_EQ(buf.space, space);
     EXPECT_EQ(buf.flags & CX_BUFFER_AUTO_EXTEND, 0);
     EXPECT_EQ(buf.flags & CX_BUFFER_FREE_CONTENTS, 0);
@@ -49,11 +57,31 @@
     EXPECT_TRUE(alloc.verify());
 }
 
+TEST(BufferInit, WrapSpaceAutoExtend) {
+    CxTestingAllocator alloc;
+    CxBuffer buf;
+    void *space = cxMalloc(&alloc, 16);
+    cxBufferInit(&buf, space, 16, &alloc, CX_BUFFER_AUTO_EXTEND);
+    expect_default_flush_config(&buf);
+    EXPECT_EQ(buf.space, space);
+    EXPECT_EQ(buf.flags & CX_BUFFER_AUTO_EXTEND, CX_BUFFER_AUTO_EXTEND);
+    EXPECT_EQ(buf.flags & CX_BUFFER_FREE_CONTENTS, 0);
+    EXPECT_EQ(buf.pos, 0);
+    EXPECT_EQ(buf.size, 0);
+    EXPECT_EQ(buf.capacity, 16);
+    EXPECT_EQ(buf.allocator, &alloc);
+    cxBufferDestroy(&buf);
+    EXPECT_FALSE(alloc.verify());
+    cxFree(&alloc, space);
+    EXPECT_TRUE(alloc.verify());
+}
+
 TEST(BufferInit, WrapSpaceAutoFree) {
     CxTestingAllocator alloc;
     CxBuffer buf;
     void *space = cxMalloc(&alloc, 16);
     cxBufferInit(&buf, space, 16, &alloc, CX_BUFFER_FREE_CONTENTS);
+    expect_default_flush_config(&buf);
     EXPECT_EQ(buf.space, space);
     EXPECT_EQ(buf.flags & CX_BUFFER_AUTO_EXTEND, 0);
     EXPECT_EQ(buf.flags & CX_BUFFER_FREE_CONTENTS, CX_BUFFER_FREE_CONTENTS);
@@ -70,6 +98,7 @@
     CxTestingAllocator alloc;
     CxBuffer buf;
     cxBufferInit(&buf, nullptr, 8, &alloc, CX_BUFFER_DEFAULT);
+    expect_default_flush_config(&buf);
     EXPECT_NE(buf.space, nullptr);
     EXPECT_EQ(buf.flags & CX_BUFFER_AUTO_EXTEND, 0);
     EXPECT_EQ(buf.flags & CX_BUFFER_FREE_CONTENTS, CX_BUFFER_FREE_CONTENTS);

mercurial