Mercurial > louis > mq > lightsd
view fix_command_pipe_read_loop.patch @ 250:a0066335c86e
final state before finishing fixes
author | Louis Opter <kalessin@kalessin.fr> |
---|---|
date | Sat, 15 Aug 2015 16:36:28 -0700 |
parents | 848fe96c58a5 |
children |
line wrap: on
line source
# HG changeset patch # Parent 870e94c0386d9595949a21c3486875759c4a1ff6 Fix infinite loop in the command pipe read callback Looks like I suck at Unix, EOF & EAGAIN weren't handled properly. The pipe is now re-opened on EOF. We could probably make the command pipe bi-directional at this point. I wish conky wasn't fucked nowadays, that would have let me spot this earlier. diff --git a/core/pipe.c b/core/pipe.c --- a/core/pipe.c +++ b/core/pipe.c @@ -41,7 +41,7 @@ SLIST_HEAD_INITIALIZER(&lgtd_command_pipes); static void -lgtd_command_pipe_close(struct lgtd_command_pipe *pipe) +_lgtd_command_pipe_close(struct lgtd_command_pipe *pipe) { assert(pipe); @@ -49,16 +49,24 @@ if (pipe->fd != -1) { close(pipe->fd); } - unlink(pipe->path); SLIST_REMOVE(&lgtd_command_pipes, pipe, lgtd_command_pipe, link); evbuffer_free(pipe->read_buf); event_free(pipe->read_ev); - - lgtd_info("closed command pipe %s", pipe->path); free(pipe); } static void +lgtd_command_pipe_close(struct lgtd_command_pipe *pipe) +{ + const char *path = pipe->path; + _lgtd_command_pipe_close(pipe); + unlink(path); + lgtd_info("closed command pipe %s", path); +} + +static void lgtd_command_pipe_reset(struct lgtd_command_pipe *); + +static void lgtd_command_pipe_read_callback(evutil_socket_t socket, short events, void *ctx) { assert(ctx); @@ -70,7 +78,6 @@ struct lgtd_command_pipe *pipe = ctx; bool drain = false; - int read = 0; for (int nbytes = evbuffer_read(pipe->read_buf, pipe->fd, -1); nbytes; nbytes = evbuffer_read(pipe->read_buf, pipe->fd, -1)) { @@ -80,16 +87,12 @@ } if (errno != EAGAIN) { lgtd_warn("read error on command pipe %s", pipe->path); - const char *path = pipe->path; - lgtd_command_pipe_close(pipe); - lgtd_command_pipe_open(path); - return; + break; } - continue; + return; // EAGAIN, go back to the event loop } if (!drain) { - read += nbytes; next_request: (void)0; const char *buf = (char *)evbuffer_pullup(pipe->read_buf, -1); @@ -127,25 +130,22 @@ jsmn_init(&pipe->client.jsmn_ctx); int request_size = pipe->client.jsmn_tokens[0].end; evbuffer_drain(pipe->read_buf, request_size); - read -= request_size; - if (read) { + if (request_size < bufsz) { goto next_request; } break; } - } else { - evbuffer_drain(pipe->read_buf, read + nbytes); - read = 0; + } + + if (drain) { + ssize_t bufsz = evbuffer_get_length(pipe->read_buf); + evbuffer_drain(pipe->read_buf, bufsz); + drain = false; + jsmn_init(&pipe->client.jsmn_ctx); } } - if (read) { - lgtd_debug( - "pipe %s: discarding %d bytes of unusable data", pipe->path, read - ); - evbuffer_drain(pipe->read_buf, read); - } - jsmn_init(&pipe->client.jsmn_ctx); + lgtd_command_pipe_reset(pipe); } static mode_t @@ -156,8 +156,8 @@ return mask; } -bool -lgtd_command_pipe_open(const char *path) +static bool +_lgtd_command_pipe_open(const char *path) { assert(path); @@ -218,8 +218,6 @@ goto error; } - lgtd_info("command pipe ready at %s", pipe->path); - SLIST_INSERT_HEAD(&lgtd_command_pipes, pipe, link); return true; @@ -239,6 +237,27 @@ return false; } +static void +lgtd_command_pipe_reset(struct lgtd_command_pipe *pipe) +{ + const char *path = pipe->path; + // we could optimize a bit to avoid re-allocations here: + _lgtd_command_pipe_close(pipe); + if (!_lgtd_command_pipe_open(path)) { + lgtd_warn("can't re-open pipe %s", path); + } +} + +bool +lgtd_command_pipe_open(const char *path) +{ + if (_lgtd_command_pipe_open(path)) { + lgtd_info("command pipe ready at %s", path); + return true; + } + return false; +} + void lgtd_command_pipe_close_all(void) { diff --git a/tests/core/pipe/test_pipe_read_callback.c b/tests/core/pipe/test_pipe_read_callback.c --- a/tests/core/pipe/test_pipe_read_callback.c +++ b/tests/core/pipe/test_pipe_read_callback.c @@ -7,6 +7,7 @@ #include "lifx/wire_proto.h" #define MOCKED_EVENT_NEW +#define MOCKED_EVENT_DEL #define MOCKED_EVBUFFER_NEW #define MOCKED_EVBUFFER_READ #define MOCKED_EVBUFFER_PULLUP @@ -69,6 +70,16 @@ return (void *)1; } +static int event_del_call_count = 0; + +int +event_del(struct event *ev) +{ + (void)ev; + event_del_call_count++; + return 0; +} + static int get_nbytes_read(int call_count) { @@ -186,8 +197,22 @@ } struct lgtd_command_pipe *pipe = SLIST_FIRST(&lgtd_command_pipes); + lgtd_command_pipe_read_callback(pipe->fd, EV_READ, pipe); + if (event_del_call_count != 1) { + errx(1, "the pipe wasn't reset"); + } + jsonrpc_dispatch_request_call_count = 0; + evbuffer_drain_call_count = 0; + evbuffer_read_call_count = 0; + evbuffer_pullup_call_count = 0; + evbuffer_get_length_call_count = 0; + event_del_call_count = 0; + pipe = SLIST_FIRST(&lgtd_command_pipes); lgtd_command_pipe_read_callback(pipe->fd, EV_READ, pipe); + if (event_del_call_count != 1) { + errx(1, "the pipe wasn't reset"); + } return 0; } diff --git a/tests/core/pipe/test_pipe_read_callback_yield_on_eagain.c b/tests/core/pipe/test_pipe_read_callback_yield_on_eagain.c new file mode 100644 --- /dev/null +++ b/tests/core/pipe/test_pipe_read_callback_yield_on_eagain.c @@ -0,0 +1,269 @@ +#include "pipe.c" + +#include <sys/tree.h> +#include <endian.h> +#include <limits.h> + +#include "lifx/wire_proto.h" + +#define MOCKED_EVENT_NEW +#define MOCKED_EVBUFFER_NEW +#define MOCKED_EVBUFFER_READ +#define MOCKED_EVBUFFER_PULLUP +#define MOCKED_EVBUFFER_GET_LENGTH +#define MOCKED_EVBUFFER_DRAIN +#include "mock_event2.h" +#include "mock_gateway.h" + +#include "tests_utils.h" +#define MOCKED_JSONRPC_DISPATCH_REQUEST +#include "tests_pipe_utils.h" + +#define REQUEST_1 "{" \ + "\"jsonrpc\": \"2.0\"," \ + "\"method\": \"get_light_state\"," \ + "\"params\": [\"*\"]," \ + "\"id\": 42" \ +"}" + +#define REQUEST_2 "{" \ + "\"jsonrpc\": \"2.0\"," \ + "\"method\": \"power_on\"," \ + "\"params\": [\"*\"]," \ + "\"id\": 43" \ +"}" + +static unsigned char request[] = ( + REQUEST_1 + REQUEST_2 +); + +static char *tmpdir = NULL; + +void +cleanup_tmpdir(void) +{ + lgtd_tests_remove_temp_dir(tmpdir); +} + +static int jsonrpc_dispatch_request_call_count = 0; + +void +lgtd_jsonrpc_dispatch_request(struct lgtd_client *client, int parsed) +{ + (void)client; + (void)parsed; + + if (!parsed) { + errx(1, "number of parsed json tokens not passed in"); + } + + switch (jsonrpc_dispatch_request_call_count) { + case 0: + if (memcmp(client->json, request, sizeof(request) - 1)) { + errx( + 1, "got unexpected json %s (expected %s)", + client->json, request + ); + } + break; + case 1: + if (memcmp(client->json, REQUEST_2, sizeof(REQUEST_2) - 1)) { + errx( + 1, "got unexpected json %s (expected %s)", + client->json, REQUEST_2 + ); + } + break; + default: + errx( + 1, "jsonrpc_dispatch_request_call_count = %d", + jsonrpc_dispatch_request_call_count + ); + break; + } + + jsonrpc_dispatch_request_call_count++; +} + +struct event * +event_new(struct event_base *base, + evutil_socket_t fd, + short events, + event_callback_fn cb, + void *ctx) +{ + (void)base; + (void)fd; + (void)events; + (void)cb; + (void)ctx; + + return (void *)1; +} + +struct evbuffer * +evbuffer_new(void) +{ + return (void *)2; +} + +static int evbuffer_drain_call_count = 0; + +int +evbuffer_drain(struct evbuffer *buf, size_t len) +{ + if (buf != (void *)2) { + errx(1, "got unexpected buf %p (expected %p)", buf, (void *)2); + } + + jsmn_parser jsmn_ctx; + jsmn_init(&jsmn_ctx); + struct lgtd_command_pipe *pipe = SLIST_FIRST(&lgtd_command_pipes); + if (memcmp(&pipe->client.jsmn_ctx, &jsmn_ctx, sizeof(jsmn_ctx))) { + errx(1, "the client json parser context wasn't re-initialized"); + } + + switch (evbuffer_drain_call_count) { + case 0: + if (len != sizeof(REQUEST_1) - 1) { + errx( + 1, "trying to drain %ju bytes (expected %ju)", + (uintmax_t)len, (uintmax_t)sizeof(REQUEST_1) - 1 + ); + } + break; + case 1: + if (len != sizeof(REQUEST_2) - 1) { + errx( + 1, "trying to drain %ju bytes (expected %ju)", + (uintmax_t)len, (uintmax_t)sizeof(REQUEST_2) - 1 + ); + } + break; + default: + errx(1, "evbuffer_drain_call_count = %d", evbuffer_drain_call_count); + break; + } + evbuffer_drain_call_count++; + + return 0; +} + +static int evbuffer_pullup_call_count = 0; + +unsigned char * +evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size) +{ + if (buf != (void *)2) { + errx(1, "got unexpected buf %p (expected %p)", buf, (void *)2); + } + + if (size != -1) { + errx( + 1, "got unexpected size %jd in pullup (expected -1)", (intmax_t)size + ); + } + + int offset; + switch (evbuffer_pullup_call_count) { + case 0: + offset = 0; + break; + case 1: + offset = sizeof(REQUEST_1) - 1; + break; + default: + offset = sizeof(request); + break; + } + evbuffer_pullup_call_count++; + + return &request[offset]; +} + +static int evbuffer_get_length_call_count = 0; + +size_t +evbuffer_get_length(const struct evbuffer *buf) +{ + if (buf != (void *)2) { + errx(1, "got unexpected buf %p (expected %p)", buf, (void *)2); + } + + size_t len; + switch (evbuffer_get_length_call_count) { + case 0: + len = sizeof(REQUEST_1) - 1; + break; + case 1: + len = sizeof(request) - sizeof(REQUEST_1); + break; + default: + len = 0; + break; + } + evbuffer_get_length_call_count++; + + return len; +} + +static int evbuffer_read_call_count = 0; + +int +evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) +{ + if (buf != (void *)2) { + errx(1, "got unexpected buf %p (expected %p)", buf, (void *)2); + } + + struct lgtd_command_pipe *pipe = SLIST_FIRST(&lgtd_command_pipes); + if (fd != pipe->fd) { + errx(1, "got unexpected fd %d (expected %d)", fd, pipe->fd); + } + + if (howmuch != -1) { + errx( + 1, "got unexpected howmuch bytes to read %d (expected -1)", howmuch + ); + } + + int rv = 0; + switch (evbuffer_read_call_count) { + case 0: + rv = sizeof(REQUEST_1) - 1; + break; + case 1: + rv = -1; + errno = EAGAIN; + break; + case 2: + rv = sizeof(request) - sizeof(REQUEST_1); + break; + default: + break; + } + evbuffer_read_call_count++; + + return rv; +} + +int +main(void) +{ + tmpdir = lgtd_tests_make_temp_dir(); + atexit(cleanup_tmpdir); + + char path[PATH_MAX] = { 0 }; + snprintf(path, sizeof(path), "%s/lightsd.pipe", tmpdir); + if (!lgtd_command_pipe_open(path)) { + errx(1, "couldn't open pipe"); + } + + struct lgtd_command_pipe *pipe = SLIST_FIRST(&lgtd_command_pipes); + + lgtd_command_pipe_read_callback(pipe->fd, EV_READ, pipe); + lgtd_command_pipe_read_callback(pipe->fd, EV_READ, pipe); + + return 0; +}