diff --git a/src/conn.c b/src/conn.c index f6adfa55d..ea019c1ae 100644 --- a/src/conn.c +++ b/src/conn.c @@ -127,6 +127,7 @@ static void raft_connect(struct conn *c) /* Close the connection without actually closing the transport, since * the stream will be used by raft */ c->closed = true; + gateway__close(&c->gateway); closeCb(&c->transport); } @@ -311,7 +312,7 @@ int conn__start(struct conn *c, c->transport.data = c; c->uv_transport = uv_transport; c->close_cb = close_cb; - gateway__init(&c->gateway, config, registry, raft, seed); + gateway__init(&c->gateway, config, loop, registry, raft, seed); rv = buffer__init(&c->read); if (rv != 0) { goto err_after_transport_init; diff --git a/src/gateway.c b/src/gateway.c index 845e992ed..31320ba1a 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -16,6 +16,7 @@ void gateway__init(struct gateway *g, struct config *config, + uv_loop_t *loop, struct registry *registry, struct raft *raft, struct id_state seed) @@ -34,6 +35,10 @@ void gateway__init(struct gateway *g, g->protocol = DQLITE_PROTOCOL_VERSION; g->client_id = 0; g->random_state = seed; + g->defer = (uv_timer_t){}; + if (loop != NULL) { + uv_timer_init(loop, &g->defer); + } } void gateway__leader_close(struct gateway *g, int reason) @@ -96,6 +101,11 @@ void gateway__leader_close(struct gateway *g, int reason) void gateway__close(struct gateway *g) { tracef("gateway close"); + + if (g->defer.loop != NULL) { + uv_close((uv_handle_t *)&g->defer, NULL); + } + if (g->leader == NULL) { stmt__registry_close(&g->stmts); return; @@ -718,6 +728,14 @@ static void handle_exec_sql_next(struct gateway *g, struct handle *req, bool done); +static void handle_exec_sql_next_defer_cb(uv_timer_t *t) +{ + struct gateway *g = t->data; + PRE(g != NULL && g->req != NULL); + PRE(g->req->type == DQLITE_REQUEST_EXEC_SQL); + handle_exec_sql_next(g, g->req, true); +} + static void handle_exec_sql_cb(struct exec *exec, int status) { tracef("handle exec sql cb status %d", status); @@ -727,12 +745,27 @@ static void handle_exec_sql_cb(struct exec *exec, int status) req->exec_count += 1; sqlite3_finalize(exec->stmt); - if (status == SQLITE_DONE) { - handle_exec_sql_next(g, req, true); - } else { + if (status != SQLITE_DONE) { assert(g->leader != NULL); failure(req, status, error_message(g->leader->conn, status)); g->req = NULL; + return; + } + + /* It would be valid to always invoke handle_exec_sql_next directly + * here. But that can lead to bounded recursion when we have several + * `;`-separated statements in a row that do not generate rows. To make + * sure the stack depth stays under control, we defer have the event + * loop invoke handle_exec_sql_next itself on the next iteration, but + * only if there is a prior call to handle_exec_sql_next above us on + * the stack. We also invoke handle_exec_sql_next directly if the + * gateway doesn't have access to an event loop (this is only the case + * in the unit tests). */ + if (exec->async || g->defer.loop == NULL) { + handle_exec_sql_next(g, req, true); + } else { + g->defer.data = g; + uv_timer_start(&g->defer, handle_exec_sql_next_defer_cb, 0, 0); } } diff --git a/src/gateway.h b/src/gateway.h index 2a5805084..e3f198990 100644 --- a/src/gateway.h +++ b/src/gateway.h @@ -35,10 +35,22 @@ struct gateway { uint64_t protocol; /* Protocol format version */ uint64_t client_id; struct id_state random_state; /* For generating IDs */ + /* The EXEC_SQL request handler uses this to defer work to the next + * loop iteration, to avoid recursion when processing multi-statement + * SQL strings. */ + uv_timer_t defer; }; +/** + * Initialize a gateway. + * + * Passing NULL for the `loop` is permitted. Currently the loop is only used + * optionally to break potential recursion when handling an EXEC_SQL request + * that containss multiple `;`-separated statements. + */ void gateway__init(struct gateway *g, struct config *config, + uv_loop_t *loop, struct registry *registry, struct raft *raft, struct id_state seed); diff --git a/src/leader.c b/src/leader.c index 61ef91639..0b4a4f23e 100644 --- a/src/leader.c +++ b/src/leader.c @@ -295,6 +295,7 @@ static void leaderApplyFramesCb(struct raft_apply *req, finish: l->inflight = NULL; l->db->tx_id = 0; + l->exec->async = true; leaderExecDone(l->exec); } @@ -439,6 +440,8 @@ static void execBarrierCb(struct barrier *barrier, int status) struct exec *req = barrier->data; struct leader *l = req->leader; + req->async = req->barrier.async; + if (status != 0) { l->exec->status = status; leaderExecDone(l->exec); @@ -505,6 +508,7 @@ static void raftBarrierCb(struct raft_barrier *req, int status) return; } barrier->cb = NULL; + barrier->async = true; cb(barrier, rv); } @@ -514,6 +518,7 @@ int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb) int rv; if (!needsBarrier(l)) { tracef("not needed"); + barrier->async = false; cb(barrier, 0); return 0; } diff --git a/src/leader.h b/src/leader.h index 9d022d3e9..d2ea80602 100644 --- a/src/leader.h +++ b/src/leader.h @@ -49,6 +49,9 @@ struct barrier { void *data; struct leader *leader; struct raft_barrier req; + /* When the callback is invoked, this field is `true` if raft_barrier + * was called and `false` if the callback was invoked immediately. */ + bool async; barrier_cb cb; }; @@ -63,8 +66,12 @@ struct exec { uint64_t id; int status; queue queue; - exec_cb cb; pool_work_t work; + /* When the callback is invoked, this field is `true` if raft_barrier + * or raft_apply was called and `false` if the callback was invoked + * immediately. */ + bool async; + exec_cb cb; }; /** diff --git a/test/integration/test_client.c b/test/integration/test_client.c index 98fccd428..c2a1cfec2 100644 --- a/test/integration/test_client.c +++ b/test/integration/test_client.c @@ -145,3 +145,29 @@ TEST(client, querySql, setUp, tearDown, 0, client_params) return MUNIT_OK; } + +/* Stress test of an EXEC_SQL with many ';'-separated statements. */ +TEST(client, semicolons, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + (void)params; + + static const char trivial_stmt[] = "CREATE TABLE IF NOT EXISTS foo (n INT);"; + + size_t n = 1000; + size_t unit = sizeof(trivial_stmt) - 1; + char *sql = munit_malloc(n * unit); + char *p = sql; + for (size_t i = 0; i < n; i++) { + memcpy(p, trivial_stmt, unit); + p += unit; + } + sql[n * unit - 1] = '\0'; + + uint64_t last_insert_id; + uint64_t rows_affected; + EXEC_SQL(sql, &last_insert_id, &rows_affected); + + free(sql); + return MUNIT_OK; +} diff --git a/test/unit/test_concurrency.c b/test/unit/test_concurrency.c index 6008efacc..c3a1afee3 100644 --- a/test/unit/test_concurrency.c +++ b/test/unit/test_concurrency.c @@ -49,8 +49,12 @@ struct connection { struct request_open open; \ struct response_db db; \ struct id_state seed = { { 1 } }; \ - gateway__init(&c->gateway, CLUSTER_CONFIG(0), \ - CLUSTER_REGISTRY(0), CLUSTER_RAFT(0), seed); \ + gateway__init(&c->gateway, \ + CLUSTER_CONFIG(0), \ + NULL, \ + CLUSTER_REGISTRY(0), \ + CLUSTER_RAFT(0), \ + seed); \ c->handle.data = &c->context; \ rc = buffer__init(&c->request); \ munit_assert_int(rc, ==, 0); \ diff --git a/test/unit/test_gateway.c b/test/unit/test_gateway.c index d0e1fb7c6..467cdb1ae 100644 --- a/test/unit/test_gateway.c +++ b/test/unit/test_gateway.c @@ -55,8 +55,12 @@ struct connection { struct id_state seed = { { 1 } }; \ config = CLUSTER_CONFIG(i); \ config->page_size = 512; \ - gateway__init(&c->gateway, config, CLUSTER_REGISTRY(i), \ - CLUSTER_RAFT(i), seed); \ + gateway__init(&c->gateway, \ + config, \ + NULL, \ + CLUSTER_REGISTRY(i), \ + CLUSTER_RAFT(i), \ + seed); \ c->handle.data = &c->context; \ rc = buffer__init(&c->buf1); \ munit_assert_int(rc, ==, 0); \