Skip to content

Commit

Permalink
Merge pull request #681 from cole-miller/exec-sql-suspend
Browse files Browse the repository at this point in the history
gateway: Break recursion in handle_exec_sql
  • Loading branch information
cole-miller authored Aug 14, 2024
2 parents d5631a9 + 5f69b54 commit 7dea935
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ static void raft_connect(struct conn *c)
/* Close the connection without actually closing the transport, since
* the stream will be used by raft */
c->closed = true;
gateway__close(&c->gateway);
closeCb(&c->transport);
}

Expand Down Expand Up @@ -311,7 +312,7 @@ int conn__start(struct conn *c,
c->transport.data = c;
c->uv_transport = uv_transport;
c->close_cb = close_cb;
gateway__init(&c->gateway, config, registry, raft, seed);
gateway__init(&c->gateway, config, loop, registry, raft, seed);
rv = buffer__init(&c->read);
if (rv != 0) {
goto err_after_transport_init;
Expand Down
39 changes: 36 additions & 3 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

void gateway__init(struct gateway *g,
struct config *config,
uv_loop_t *loop,
struct registry *registry,
struct raft *raft,
struct id_state seed)
Expand All @@ -34,6 +35,10 @@ void gateway__init(struct gateway *g,
g->protocol = DQLITE_PROTOCOL_VERSION;
g->client_id = 0;
g->random_state = seed;
g->defer = (uv_timer_t){};
if (loop != NULL) {
uv_timer_init(loop, &g->defer);
}
}

void gateway__leader_close(struct gateway *g, int reason)
Expand Down Expand Up @@ -96,6 +101,11 @@ void gateway__leader_close(struct gateway *g, int reason)
void gateway__close(struct gateway *g)
{
tracef("gateway close");

if (g->defer.loop != NULL) {
uv_close((uv_handle_t *)&g->defer, NULL);
}

if (g->leader == NULL) {
stmt__registry_close(&g->stmts);
return;
Expand Down Expand Up @@ -718,6 +728,14 @@ static void handle_exec_sql_next(struct gateway *g,
struct handle *req,
bool done);

static void handle_exec_sql_next_defer_cb(uv_timer_t *t)
{
struct gateway *g = t->data;
PRE(g != NULL && g->req != NULL);
PRE(g->req->type == DQLITE_REQUEST_EXEC_SQL);
handle_exec_sql_next(g, g->req, true);
}

static void handle_exec_sql_cb(struct exec *exec, int status)
{
tracef("handle exec sql cb status %d", status);
Expand All @@ -727,12 +745,27 @@ static void handle_exec_sql_cb(struct exec *exec, int status)
req->exec_count += 1;
sqlite3_finalize(exec->stmt);

if (status == SQLITE_DONE) {
handle_exec_sql_next(g, req, true);
} else {
if (status != SQLITE_DONE) {
assert(g->leader != NULL);
failure(req, status, error_message(g->leader->conn, status));
g->req = NULL;
return;
}

/* It would be valid to always invoke handle_exec_sql_next directly
* here. But that can lead to bounded recursion when we have several
* `;`-separated statements in a row that do not generate rows. To make
* sure the stack depth stays under control, we defer have the event
* loop invoke handle_exec_sql_next itself on the next iteration, but
* only if there is a prior call to handle_exec_sql_next above us on
* the stack. We also invoke handle_exec_sql_next directly if the
* gateway doesn't have access to an event loop (this is only the case
* in the unit tests). */
if (exec->async || g->defer.loop == NULL) {
handle_exec_sql_next(g, req, true);
} else {
g->defer.data = g;
uv_timer_start(&g->defer, handle_exec_sql_next_defer_cb, 0, 0);
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,22 @@ struct gateway {
uint64_t protocol; /* Protocol format version */
uint64_t client_id;
struct id_state random_state; /* For generating IDs */
/* The EXEC_SQL request handler uses this to defer work to the next
* loop iteration, to avoid recursion when processing multi-statement
* SQL strings. */
uv_timer_t defer;
};

/**
* Initialize a gateway.
*
* Passing NULL for the `loop` is permitted. Currently the loop is only used
* optionally to break potential recursion when handling an EXEC_SQL request
* that containss multiple `;`-separated statements.
*/
void gateway__init(struct gateway *g,
struct config *config,
uv_loop_t *loop,
struct registry *registry,
struct raft *raft,
struct id_state seed);
Expand Down
5 changes: 5 additions & 0 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ static void leaderApplyFramesCb(struct raft_apply *req,
finish:
l->inflight = NULL;
l->db->tx_id = 0;
l->exec->async = true;
leaderExecDone(l->exec);
}

Expand Down Expand Up @@ -439,6 +440,8 @@ static void execBarrierCb(struct barrier *barrier, int status)
struct exec *req = barrier->data;
struct leader *l = req->leader;

req->async = req->barrier.async;

if (status != 0) {
l->exec->status = status;
leaderExecDone(l->exec);
Expand Down Expand Up @@ -505,6 +508,7 @@ static void raftBarrierCb(struct raft_barrier *req, int status)
return;
}
barrier->cb = NULL;
barrier->async = true;
cb(barrier, rv);
}

Expand All @@ -514,6 +518,7 @@ int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb)
int rv;
if (!needsBarrier(l)) {
tracef("not needed");
barrier->async = false;
cb(barrier, 0);
return 0;
}
Expand Down
9 changes: 8 additions & 1 deletion src/leader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ struct barrier {
void *data;
struct leader *leader;
struct raft_barrier req;
/* When the callback is invoked, this field is `true` if raft_barrier
* was called and `false` if the callback was invoked immediately. */
bool async;
barrier_cb cb;
};

Expand All @@ -63,8 +66,12 @@ struct exec {
uint64_t id;
int status;
queue queue;
exec_cb cb;
pool_work_t work;
/* When the callback is invoked, this field is `true` if raft_barrier
* or raft_apply was called and `false` if the callback was invoked
* immediately. */
bool async;
exec_cb cb;
};

/**
Expand Down
26 changes: 26 additions & 0 deletions test/integration/test_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,29 @@ TEST(client, querySql, setUp, tearDown, 0, client_params)

return MUNIT_OK;
}

/* Stress test of an EXEC_SQL with many ';'-separated statements. */
TEST(client, semicolons, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
(void)params;

static const char trivial_stmt[] = "CREATE TABLE IF NOT EXISTS foo (n INT);";

size_t n = 1000;
size_t unit = sizeof(trivial_stmt) - 1;
char *sql = munit_malloc(n * unit);
char *p = sql;
for (size_t i = 0; i < n; i++) {
memcpy(p, trivial_stmt, unit);
p += unit;
}
sql[n * unit - 1] = '\0';

uint64_t last_insert_id;
uint64_t rows_affected;
EXEC_SQL(sql, &last_insert_id, &rows_affected);

free(sql);
return MUNIT_OK;
}
8 changes: 6 additions & 2 deletions test/unit/test_concurrency.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ struct connection {
struct request_open open; \
struct response_db db; \
struct id_state seed = { { 1 } }; \
gateway__init(&c->gateway, CLUSTER_CONFIG(0), \
CLUSTER_REGISTRY(0), CLUSTER_RAFT(0), seed); \
gateway__init(&c->gateway, \
CLUSTER_CONFIG(0), \
NULL, \
CLUSTER_REGISTRY(0), \
CLUSTER_RAFT(0), \
seed); \
c->handle.data = &c->context; \
rc = buffer__init(&c->request); \
munit_assert_int(rc, ==, 0); \
Expand Down
8 changes: 6 additions & 2 deletions test/unit/test_gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ struct connection {
struct id_state seed = { { 1 } }; \
config = CLUSTER_CONFIG(i); \
config->page_size = 512; \
gateway__init(&c->gateway, config, CLUSTER_REGISTRY(i), \
CLUSTER_RAFT(i), seed); \
gateway__init(&c->gateway, \
config, \
NULL, \
CLUSTER_REGISTRY(i), \
CLUSTER_RAFT(i), \
seed); \
c->handle.data = &c->context; \
rc = buffer__init(&c->buf1); \
munit_assert_int(rc, ==, 0); \
Expand Down

0 comments on commit 7dea935

Please sign in to comment.