view fix_command_pipe_read_loop.patch @ 250:a0066335c86e

final state before finishing fixes
author Louis Opter <kalessin@kalessin.fr>
date Sat, 15 Aug 2015 16:36:28 -0700
parents 848fe96c58a5
children
line wrap: on
line source

# HG changeset patch
# Parent  870e94c0386d9595949a21c3486875759c4a1ff6
Fix infinite loop in the command pipe read callback

Looks like I suck at Unix, EOF & EAGAIN weren't handled properly. The
pipe is now re-opened on EOF.

We could probably make the command pipe bi-directional at this point.

I wish conky wasn't fucked nowadays, that would have let me spot this
earlier.

diff --git a/core/pipe.c b/core/pipe.c
--- a/core/pipe.c
+++ b/core/pipe.c
@@ -41,7 +41,7 @@
     SLIST_HEAD_INITIALIZER(&lgtd_command_pipes);
 
 static void
-lgtd_command_pipe_close(struct lgtd_command_pipe *pipe)
+_lgtd_command_pipe_close(struct lgtd_command_pipe *pipe)
 {
     assert(pipe);
 
@@ -49,16 +49,24 @@
     if (pipe->fd != -1) {
         close(pipe->fd);
     }
-    unlink(pipe->path);
     SLIST_REMOVE(&lgtd_command_pipes, pipe, lgtd_command_pipe, link);
     evbuffer_free(pipe->read_buf);
     event_free(pipe->read_ev);
-
-    lgtd_info("closed command pipe %s", pipe->path);
     free(pipe);
 }
 
 static void
+lgtd_command_pipe_close(struct lgtd_command_pipe *pipe)
+{
+    const char *path = pipe->path;
+    _lgtd_command_pipe_close(pipe);
+    unlink(path);
+    lgtd_info("closed command pipe %s", path);
+}
+
+static void lgtd_command_pipe_reset(struct lgtd_command_pipe *);
+
+static void
 lgtd_command_pipe_read_callback(evutil_socket_t socket, short events, void *ctx)
 {
     assert(ctx);
@@ -70,7 +78,6 @@
     struct lgtd_command_pipe *pipe = ctx;
 
     bool drain = false;
-    int read = 0;
     for (int nbytes = evbuffer_read(pipe->read_buf, pipe->fd, -1);
          nbytes;
          nbytes = evbuffer_read(pipe->read_buf, pipe->fd, -1)) {
@@ -80,16 +87,12 @@
             }
             if (errno != EAGAIN) {
                 lgtd_warn("read error on command pipe %s", pipe->path);
-                const char *path = pipe->path;
-                lgtd_command_pipe_close(pipe);
-                lgtd_command_pipe_open(path);
-                return;
+                break;
             }
-            continue;
+            return; // EAGAIN, go back to the event loop
         }
 
         if (!drain) {
-            read += nbytes;
         next_request:
             (void)0;
             const char *buf = (char *)evbuffer_pullup(pipe->read_buf, -1);
@@ -127,25 +130,22 @@
                 jsmn_init(&pipe->client.jsmn_ctx);
                 int request_size = pipe->client.jsmn_tokens[0].end;
                 evbuffer_drain(pipe->read_buf, request_size);
-                read -= request_size;
-                if (read) {
+                if (request_size < bufsz) {
                     goto next_request;
                 }
                 break;
             }
-        } else {
-            evbuffer_drain(pipe->read_buf, read + nbytes);
-            read = 0;
+        }
+
+        if (drain) {
+            ssize_t bufsz = evbuffer_get_length(pipe->read_buf);
+            evbuffer_drain(pipe->read_buf, bufsz);
+            drain = false;
+            jsmn_init(&pipe->client.jsmn_ctx);
         }
     }
 
-    if (read) {
-        lgtd_debug(
-            "pipe %s: discarding %d bytes of unusable data", pipe->path, read
-        );
-        evbuffer_drain(pipe->read_buf, read);
-    }
-    jsmn_init(&pipe->client.jsmn_ctx);
+    lgtd_command_pipe_reset(pipe);
 }
 
 static mode_t
@@ -156,8 +156,8 @@
     return mask;
 }
 
-bool
-lgtd_command_pipe_open(const char *path)
+static bool
+_lgtd_command_pipe_open(const char *path)
 {
     assert(path);
 
@@ -218,8 +218,6 @@
         goto error;
     }
 
-    lgtd_info("command pipe ready at %s", pipe->path);
-
     SLIST_INSERT_HEAD(&lgtd_command_pipes, pipe, link);
 
     return true;
@@ -239,6 +237,27 @@
     return false;
 }
 
+static void
+lgtd_command_pipe_reset(struct lgtd_command_pipe *pipe)
+{
+    const char *path = pipe->path;
+    // we could optimize a bit to avoid re-allocations here:
+    _lgtd_command_pipe_close(pipe);
+    if (!_lgtd_command_pipe_open(path)) {
+        lgtd_warn("can't re-open pipe %s", path);
+    }
+}
+
+bool
+lgtd_command_pipe_open(const char *path)
+{
+    if (_lgtd_command_pipe_open(path)) {
+        lgtd_info("command pipe ready at %s", path);
+        return true;
+    }
+    return false;
+}
+
 void
 lgtd_command_pipe_close_all(void)
 {
diff --git a/tests/core/pipe/test_pipe_read_callback.c b/tests/core/pipe/test_pipe_read_callback.c
--- a/tests/core/pipe/test_pipe_read_callback.c
+++ b/tests/core/pipe/test_pipe_read_callback.c
@@ -7,6 +7,7 @@
 #include "lifx/wire_proto.h"
 
 #define MOCKED_EVENT_NEW
+#define MOCKED_EVENT_DEL
 #define MOCKED_EVBUFFER_NEW
 #define MOCKED_EVBUFFER_READ
 #define MOCKED_EVBUFFER_PULLUP
@@ -69,6 +70,16 @@
     return (void *)1;
 }
 
+static int event_del_call_count = 0;
+
+int
+event_del(struct event *ev)
+{
+    (void)ev;
+    event_del_call_count++;
+    return 0;
+}
+
 static int
 get_nbytes_read(int call_count)
 {
@@ -186,8 +197,22 @@
     }
 
     struct lgtd_command_pipe *pipe = SLIST_FIRST(&lgtd_command_pipes);
+    lgtd_command_pipe_read_callback(pipe->fd, EV_READ, pipe);
+    if (event_del_call_count != 1) {
+        errx(1, "the pipe wasn't reset");
+    }
 
+    jsonrpc_dispatch_request_call_count = 0;
+    evbuffer_drain_call_count = 0;
+    evbuffer_read_call_count = 0;
+    evbuffer_pullup_call_count = 0;
+    evbuffer_get_length_call_count = 0;
+    event_del_call_count = 0;
+    pipe = SLIST_FIRST(&lgtd_command_pipes);
     lgtd_command_pipe_read_callback(pipe->fd, EV_READ, pipe);
+    if (event_del_call_count != 1) {
+        errx(1, "the pipe wasn't reset");
+    }
 
     return 0;
 }
diff --git a/tests/core/pipe/test_pipe_read_callback_yield_on_eagain.c b/tests/core/pipe/test_pipe_read_callback_yield_on_eagain.c
new file mode 100644
--- /dev/null
+++ b/tests/core/pipe/test_pipe_read_callback_yield_on_eagain.c
@@ -0,0 +1,269 @@
+#include "pipe.c"
+
+#include <sys/tree.h>
+#include <endian.h>
+#include <limits.h>
+
+#include "lifx/wire_proto.h"
+
+#define MOCKED_EVENT_NEW
+#define MOCKED_EVBUFFER_NEW
+#define MOCKED_EVBUFFER_READ
+#define MOCKED_EVBUFFER_PULLUP
+#define MOCKED_EVBUFFER_GET_LENGTH
+#define MOCKED_EVBUFFER_DRAIN
+#include "mock_event2.h"
+#include "mock_gateway.h"
+
+#include "tests_utils.h"
+#define MOCKED_JSONRPC_DISPATCH_REQUEST
+#include "tests_pipe_utils.h"
+
+#define REQUEST_1 "{"                   \
+    "\"jsonrpc\": \"2.0\","             \
+    "\"method\": \"get_light_state\","  \
+    "\"params\": [\"*\"],"              \
+    "\"id\": 42"                        \
+"}"
+
+#define REQUEST_2 "{"           \
+    "\"jsonrpc\": \"2.0\","     \
+    "\"method\": \"power_on\"," \
+    "\"params\": [\"*\"],"      \
+    "\"id\": 43"                \
+"}"
+
+static unsigned char request[] = (
+    REQUEST_1
+    REQUEST_2
+);
+
+static char *tmpdir = NULL;
+
+void
+cleanup_tmpdir(void)
+{
+    lgtd_tests_remove_temp_dir(tmpdir);
+}
+
+static int jsonrpc_dispatch_request_call_count = 0;
+
+void
+lgtd_jsonrpc_dispatch_request(struct lgtd_client *client, int parsed)
+{
+    (void)client;
+    (void)parsed;
+
+    if (!parsed) {
+        errx(1, "number of parsed json tokens not passed in");
+    }
+
+    switch (jsonrpc_dispatch_request_call_count) {
+    case 0:
+        if (memcmp(client->json, request, sizeof(request) - 1)) {
+            errx(
+                1, "got unexpected json %s (expected %s)",
+                client->json, request
+            );
+        }
+        break;
+    case 1:
+        if (memcmp(client->json, REQUEST_2, sizeof(REQUEST_2) - 1)) {
+            errx(
+                1, "got unexpected json %s (expected %s)",
+                client->json, REQUEST_2
+            );
+        }
+        break;
+    default:
+        errx(
+            1, "jsonrpc_dispatch_request_call_count = %d",
+            jsonrpc_dispatch_request_call_count
+        );
+        break;
+    }
+
+    jsonrpc_dispatch_request_call_count++;
+}
+
+struct event *
+event_new(struct event_base *base,
+          evutil_socket_t fd,
+          short events,
+          event_callback_fn cb,
+          void *ctx)
+{
+    (void)base;
+    (void)fd;
+    (void)events;
+    (void)cb;
+    (void)ctx;
+
+    return (void *)1;
+}
+
+struct evbuffer *
+evbuffer_new(void)
+{
+    return (void *)2;
+}
+
+static int evbuffer_drain_call_count = 0;
+
+int
+evbuffer_drain(struct evbuffer *buf, size_t len)
+{
+    if (buf != (void *)2) {
+        errx(1, "got unexpected buf %p (expected %p)", buf, (void *)2);
+    }
+
+    jsmn_parser jsmn_ctx;
+    jsmn_init(&jsmn_ctx);
+    struct lgtd_command_pipe *pipe = SLIST_FIRST(&lgtd_command_pipes);
+    if (memcmp(&pipe->client.jsmn_ctx, &jsmn_ctx, sizeof(jsmn_ctx))) {
+        errx(1, "the client json parser context wasn't re-initialized");
+    }
+
+    switch (evbuffer_drain_call_count) {
+    case 0:
+        if (len != sizeof(REQUEST_1) - 1) {
+            errx(
+                1, "trying to drain %ju bytes (expected %ju)",
+                (uintmax_t)len, (uintmax_t)sizeof(REQUEST_1) - 1
+            );
+        }
+        break;
+    case 1:
+        if (len != sizeof(REQUEST_2) - 1) {
+            errx(
+                1, "trying to drain %ju bytes (expected %ju)",
+                (uintmax_t)len, (uintmax_t)sizeof(REQUEST_2) - 1
+            );
+        }
+        break;
+    default:
+        errx(1, "evbuffer_drain_call_count = %d", evbuffer_drain_call_count);
+        break;
+    }
+    evbuffer_drain_call_count++;
+
+    return 0;
+}
+
+static int evbuffer_pullup_call_count = 0;
+
+unsigned char *
+evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
+{
+    if (buf != (void *)2) {
+        errx(1, "got unexpected buf %p (expected %p)", buf, (void *)2);
+    }
+
+    if (size != -1) {
+        errx(
+            1, "got unexpected size %jd in pullup (expected -1)", (intmax_t)size
+        );
+    }
+
+    int offset;
+    switch (evbuffer_pullup_call_count) {
+    case 0:
+        offset = 0;
+        break;
+    case 1:
+        offset = sizeof(REQUEST_1) - 1;
+        break;
+    default:
+        offset = sizeof(request);
+        break;
+    }
+    evbuffer_pullup_call_count++;
+
+    return &request[offset];
+}
+
+static int evbuffer_get_length_call_count = 0;
+
+size_t
+evbuffer_get_length(const struct evbuffer *buf)
+{
+    if (buf != (void *)2) {
+        errx(1, "got unexpected buf %p (expected %p)", buf, (void *)2);
+    }
+
+    size_t len;
+    switch (evbuffer_get_length_call_count) {
+    case 0:
+        len = sizeof(REQUEST_1) - 1;
+        break;
+    case 1:
+        len = sizeof(request) - sizeof(REQUEST_1);
+        break;
+    default:
+        len = 0;
+        break;
+    }
+    evbuffer_get_length_call_count++;
+
+    return len;
+}
+
+static int evbuffer_read_call_count = 0;
+
+int
+evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
+{
+    if (buf != (void *)2) {
+        errx(1, "got unexpected buf %p (expected %p)", buf, (void *)2);
+    }
+
+    struct lgtd_command_pipe *pipe = SLIST_FIRST(&lgtd_command_pipes);
+    if (fd != pipe->fd) {
+        errx(1, "got unexpected fd %d (expected %d)", fd, pipe->fd);
+    }
+
+    if (howmuch != -1) {
+        errx(
+            1, "got unexpected howmuch bytes to read %d (expected -1)", howmuch
+        );
+    }
+
+    int rv = 0;
+    switch (evbuffer_read_call_count) {
+    case 0:
+        rv = sizeof(REQUEST_1) - 1;
+        break;
+    case 1:
+        rv = -1;
+        errno = EAGAIN;
+        break;
+    case 2:
+        rv = sizeof(request) - sizeof(REQUEST_1);
+        break;
+    default:
+        break;
+    }
+    evbuffer_read_call_count++;
+
+    return rv;
+}
+
+int
+main(void)
+{
+    tmpdir = lgtd_tests_make_temp_dir();
+    atexit(cleanup_tmpdir);
+
+    char path[PATH_MAX] = { 0 };
+    snprintf(path, sizeof(path), "%s/lightsd.pipe", tmpdir);
+    if (!lgtd_command_pipe_open(path)) {
+        errx(1, "couldn't open pipe");
+    }
+
+    struct lgtd_command_pipe *pipe = SLIST_FIRST(&lgtd_command_pipes);
+
+    lgtd_command_pipe_read_callback(pipe->fd, EV_READ, pipe);
+    lgtd_command_pipe_read_callback(pipe->fd, EV_READ, pipe);
+
+    return 0;
+}