Skip to content

Commit

Permalink
Merge pull request #687 from cole-miller/exec-sql-suspend-v2
Browse files Browse the repository at this point in the history
Break recursion in handle_exec_sql_next (second attempt)
  • Loading branch information
cole-miller authored Aug 26, 2024
2 parents 6035a35 + 4236cd3 commit 296fab9
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 31 deletions.
64 changes: 55 additions & 9 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,27 @@ static void gateway_handle_cb(struct handle *req,
conn__stop(c);
}

static void closeCb(struct transport *transport)
static void conn_defer_close_cb(uv_handle_t *t)
{
struct conn *c = transport->data;
buffer__close(&c->write);
buffer__close(&c->read);
struct conn *c = t->data;
if (c->close_cb != NULL) {
c->close_cb(c);
}
}

static void conn_fini(struct conn *c)
{
buffer__close(&c->write);
buffer__close(&c->read);
c->defer.data = c;
uv_close((uv_handle_t *)&c->defer, conn_defer_close_cb);
}

/**
* This is called when raft wants to take over the connection to use for
* node-to-node communication. It consumes the conn and gives ownership of the
* underlying stream to raft.
*/
static void raft_connect(struct conn *c)
{
struct cursor *cursor = &c->handle.cursor;
Expand All @@ -124,10 +135,14 @@ static void raft_connect(struct conn *c)
}
raftProxyAccept(c->uv_transport, request.id, request.address,
c->transport.stream);
/* Close the connection without actually closing the transport, since
* the stream will be used by raft */
/* Run the closing sequence to release resources held by the conn,
* since raft doesn't use the conn object or its buffers to handle
* incoming messages. However, don't close c->transport, because its
* uv_stream_t has been stolen by raft. */
PRE(!c->closed);
c->closed = true;
closeCb(&c->transport);
gateway__close(&c->gateway);
conn_fini(c);
}

static void read_request_cb(struct transport *transport, int status)
Expand Down Expand Up @@ -289,6 +304,28 @@ static int read_protocol(struct conn *c)
return 0;
}

static void conn_defer_cb(uv_timer_t *defer)
{
struct conn *c = defer->data;
void (*cb)(void *) = c->defer_cb;
void *arg = c->defer_arg;
c->defer_cb = NULL;
c->defer_arg = NULL;
PRE(cb != NULL);
cb(arg);
}

static void conn_defer(void (*cb)(void *arg), void *arg, void *data)
{
struct conn *c = data;
PRE(c->defer_cb == NULL);
PRE(c->defer_arg == NULL);
c->defer_cb = cb;
c->defer_arg = arg;
c->defer.data = c;
uv_timer_start(&c->defer, conn_defer_cb, 0, 0);
}

int conn__start(struct conn *c,
struct config *config,
struct uv_loop_s *loop,
Expand All @@ -311,7 +348,10 @@ int conn__start(struct conn *c,
c->transport.data = c;
c->uv_transport = uv_transport;
c->close_cb = close_cb;
gateway__init(&c->gateway, config, registry, raft, seed);
uv_timer_init(loop, &c->defer);
c->defer_cb = NULL;
c->defer_arg = NULL;
gateway__init(&c->gateway, config, registry, raft, seed, conn_defer, c);
rv = buffer__init(&c->read);
if (rv != 0) {
goto err_after_transport_init;
Expand Down Expand Up @@ -339,6 +379,12 @@ int conn__start(struct conn *c,
return rv;
}

static void conn_transport_close_cb(struct transport *transport)
{
struct conn *c = transport->data;
conn_fini(c);
}

void conn__stop(struct conn *c)
{
tracef("conn stop");
Expand All @@ -347,5 +393,5 @@ void conn__stop(struct conn *c)
}
c->closed = true;
gateway__close(&c->gateway);
transport__close(&c->transport, closeCb);
transport__close(&c->transport, conn_transport_close_cb);
}
5 changes: 5 additions & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ struct conn
struct message request; /* Request message meta data */
struct message response; /* Response message meta data */
struct handle handle;
/* Callback that the gateway has asked to be invoked on the next loop
* iteration, and its argument. */
void (*defer_cb)(void *);
void *defer_arg;
uv_timer_t defer;
bool closed;
queue queue;
};
Expand Down
33 changes: 29 additions & 4 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ void gateway__init(struct gateway *g,
struct config *config,
struct registry *registry,
struct raft *raft,
struct id_state seed)
struct id_state seed,
defer_impl defer,
void *defer_data)
{
tracef("gateway init");
g->config = config;
Expand All @@ -34,6 +36,8 @@ void gateway__init(struct gateway *g,
g->protocol = DQLITE_PROTOCOL_VERSION;
g->client_id = 0;
g->random_state = seed;
g->defer = defer;
g->defer_data = defer_data;
}

/* FIXME: This function becomes unsound when using the new thread pool, since
Expand Down Expand Up @@ -98,6 +102,7 @@ void gateway__leader_close(struct gateway *g, int reason)
void gateway__close(struct gateway *g)
{
tracef("gateway close");

if (g->leader == NULL) {
stmt__registry_close(&g->stmts);
return;
Expand Down Expand Up @@ -720,6 +725,14 @@ static void handle_exec_sql_next(struct gateway *g,
struct handle *req,
bool done);

static void handle_exec_sql_next_defer_cb(void *arg)
{
struct gateway *g = arg;
PRE(g != NULL && g->req != NULL);
PRE(g->req->type == DQLITE_REQUEST_EXEC_SQL);
handle_exec_sql_next(g, g->req, true);
}

static void handle_exec_sql_cb(struct exec *exec, int status)
{
tracef("handle exec sql cb status %d", status);
Expand All @@ -729,12 +742,24 @@ static void handle_exec_sql_cb(struct exec *exec, int status)
req->exec_count += 1;
sqlite3_finalize(exec->stmt);

if (status == SQLITE_DONE) {
handle_exec_sql_next(g, req, true);
} else {
if (status != SQLITE_DONE) {
assert(g->leader != NULL);
failure(req, status, error_message(g->leader->conn, status));
g->req = NULL;
return;
}

/* It would be valid to always invoke handle_exec_sql_next directly
* here. But that can lead to bounded recursion when we have several
* `;`-separated statements in a row that do not generate rows. To make
* sure the stack depth stays under control, we have the event loop
* loop invoke handle_exec_sql_next itself on the next iteration, but
* only if there is a prior call to handle_exec_sql_next above us on
* the stack. */
if (exec->async) {
handle_exec_sql_next(g, req, true);
} else {
g->defer(handle_exec_sql_next_defer_cb, g, g->defer_data);
}
}

Expand Down
24 changes: 23 additions & 1 deletion src/gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@

struct handle;

/**
* Interface for requesting that a callback be invoked "later".
*
* When running on a libuv event loop, "later" means "on the next loop
* iteration". In the unit tests, where no loop is available, "later" means
* "right now".
*
* The gateway uses this interface to prevent recursion in the handler for
* EXEC_SQL requests.
*
* `arg` is provided by the gateway to be passed to `cb`, while `data` is
* the userdata passed by the owner of the gateway into gateway__init.
*/
typedef void (*defer_impl)(void (*cb)(void *arg), void *arg, void *data);

/**
* Handle requests from a single connected client and forward them to
* SQLite.
Expand All @@ -35,13 +50,20 @@ struct gateway {
uint64_t protocol; /* Protocol format version */
uint64_t client_id;
struct id_state random_state; /* For generating IDs */
defer_impl defer;
void *defer_data;
};

/**
* Initialize a gateway.
*/
void gateway__init(struct gateway *g,
struct config *config,
struct registry *registry,
struct raft *raft,
struct id_state seed);
struct id_state seed,
defer_impl defer,
void *defer_data);

void gateway__close(struct gateway *g);

Expand Down
5 changes: 5 additions & 0 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ static void leaderApplyFramesCb(struct raft_apply *req,
finish:
l->inflight = NULL;
l->db->tx_id = 0;
l->exec->async = true;
leaderExecDone(l->exec);
}

Expand Down Expand Up @@ -439,6 +440,8 @@ static void execBarrierCb(struct barrier *barrier, int status)
struct exec *req = barrier->data;
struct leader *l = req->leader;

req->async = req->barrier.async;

if (status != 0) {
l->exec->status = status;
leaderExecDone(l->exec);
Expand Down Expand Up @@ -505,6 +508,7 @@ static void raftBarrierCb(struct raft_barrier *req, int status)
return;
}
barrier->cb = NULL;
barrier->async = true;
cb(barrier, rv);
}

Expand All @@ -514,6 +518,7 @@ int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb)
int rv;
if (!needsBarrier(l)) {
tracef("not needed");
barrier->async = false;
cb(barrier, 0);
return 0;
}
Expand Down
28 changes: 15 additions & 13 deletions src/leader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ struct barrier {
void *data;
struct leader *leader;
struct raft_barrier req;
/* Whether we yielded control to the event loop at least once while
* handling the request. */
bool async;
barrier_cb cb;
};

Expand All @@ -63,8 +66,11 @@ struct exec {
uint64_t id;
int status;
queue queue;
exec_cb cb;
pool_work_t work;
/* Whether we yielded control to the event loop at least once while
* handling the request. */
bool async;
exec_cb cb;
};

/**
Expand All @@ -81,17 +87,10 @@ void leader__close(struct leader *l);
/**
* Submit a request to step a SQLite statement.
*
* The request will be dispatched to the leader loop coroutine, which will be
* resumed and will invoke sqlite_step(). If the statement triggers the
* replication hooks and one or more new Raft log entries need to be appended,
* then the loop coroutine will be paused and control will be transferred back
* to the main coroutine. In this state the leader loop coroutine call stack
* will be "blocked" on the xFrames() replication hook call triggered by the top
* sqlite_step() call. The leader loop coroutine will be resumed once the Raft
* append request completes (either successfully or not) and at that point the
* stack will rewind back to the @sqlite_step() call, returning to the leader
* loop which will then have completed the request and transfer control back to
* the main coroutine, pausing until the next request.
* The callback may be invoked synchronously (i.e. while leader__exec is still
* on the stack) or asynchronously after suspending to wait for a raft request
* to finish. The callback can read the `async` field of the exec object to
* determine how it's being invoked.
*/
int leader__exec(struct leader *l,
struct exec *req,
Expand All @@ -103,7 +102,10 @@ int leader__exec(struct leader *l,
* Submit a raft barrier request if there is no transaction in progress in the
* underlying database and the FSM is behind the last log index.
*
* Otherwise, just invoke the given @cb immediately.
* If a barrier is needed, the callback is invoked asychronously after the
* raft barrier request has completed. Otherwise, the callback is invoked
* synchronously. The callback can read the `async` field of the barrier object
* to determine how it's being invoked.
*/
int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb);

Expand Down
26 changes: 26 additions & 0 deletions test/integration/test_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,29 @@ TEST(client, querySql, setUp, tearDown, 0, client_params)

return MUNIT_OK;
}

/* Stress test of an EXEC_SQL with many ';'-separated statements. */
TEST(client, semicolons, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
(void)params;

static const char trivial_stmt[] = "CREATE TABLE IF NOT EXISTS foo (n INT);";

size_t n = 1000;
size_t unit = sizeof(trivial_stmt) - 1;
char *sql = munit_malloc(n * unit);
char *p = sql;
for (size_t i = 0; i < n; i++) {
memcpy(p, trivial_stmt, unit);
p += unit;
}
sql[n * unit - 1] = '\0';

uint64_t last_insert_id;
uint64_t rows_affected;
EXEC_SQL(sql, &last_insert_id, &rows_affected);

free(sql);
return MUNIT_OK;
}
15 changes: 13 additions & 2 deletions test/unit/test_concurrency.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ struct connection {
struct context context;
};

static void defer_run_now(void (*cb)(void *arg), void *arg, void *data)
{
(void)data;
cb(arg);
}

#define FIXTURE \
FIXTURE_CLUSTER; \
struct connection connections[N_GATEWAYS]
Expand All @@ -49,8 +55,13 @@ struct connection {
struct request_open open; \
struct response_db db; \
struct id_state seed = { { 1 } }; \
gateway__init(&c->gateway, CLUSTER_CONFIG(0), \
CLUSTER_REGISTRY(0), CLUSTER_RAFT(0), seed); \
gateway__init(&c->gateway, \
CLUSTER_CONFIG(0), \
CLUSTER_REGISTRY(0), \
CLUSTER_RAFT(0), \
seed, \
defer_run_now, \
NULL); \
c->handle.data = &c->context; \
rc = buffer__init(&c->request); \
munit_assert_int(rc, ==, 0); \
Expand Down
Loading

0 comments on commit 296fab9

Please sign in to comment.