Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert suspending inside handle_exec_sql #685

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ static void raft_connect(struct conn *c)
/* Close the connection without actually closing the transport, since
* the stream will be used by raft */
c->closed = true;
gateway__close(&c->gateway);
closeCb(&c->transport);
}

Expand Down Expand Up @@ -312,7 +311,7 @@ int conn__start(struct conn *c,
c->transport.data = c;
c->uv_transport = uv_transport;
c->close_cb = close_cb;
gateway__init(&c->gateway, config, loop, registry, raft, seed);
gateway__init(&c->gateway, config, registry, raft, seed);
rv = buffer__init(&c->read);
if (rv != 0) {
goto err_after_transport_init;
Expand Down
39 changes: 3 additions & 36 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

void gateway__init(struct gateway *g,
struct config *config,
uv_loop_t *loop,
struct registry *registry,
struct raft *raft,
struct id_state seed)
Expand All @@ -35,10 +34,6 @@ void gateway__init(struct gateway *g,
g->protocol = DQLITE_PROTOCOL_VERSION;
g->client_id = 0;
g->random_state = seed;
g->defer = (uv_timer_t){};
if (loop != NULL) {
uv_timer_init(loop, &g->defer);
}
}

void gateway__leader_close(struct gateway *g, int reason)
Expand Down Expand Up @@ -101,11 +96,6 @@ void gateway__leader_close(struct gateway *g, int reason)
void gateway__close(struct gateway *g)
{
tracef("gateway close");

if (g->defer.loop != NULL) {
uv_close((uv_handle_t *)&g->defer, NULL);
}

if (g->leader == NULL) {
stmt__registry_close(&g->stmts);
return;
Expand Down Expand Up @@ -728,14 +718,6 @@ static void handle_exec_sql_next(struct gateway *g,
struct handle *req,
bool done);

static void handle_exec_sql_next_defer_cb(uv_timer_t *t)
{
struct gateway *g = t->data;
PRE(g != NULL && g->req != NULL);
PRE(g->req->type == DQLITE_REQUEST_EXEC_SQL);
handle_exec_sql_next(g, g->req, true);
}

static void handle_exec_sql_cb(struct exec *exec, int status)
{
tracef("handle exec sql cb status %d", status);
Expand All @@ -745,27 +727,12 @@ static void handle_exec_sql_cb(struct exec *exec, int status)
req->exec_count += 1;
sqlite3_finalize(exec->stmt);

if (status != SQLITE_DONE) {
if (status == SQLITE_DONE) {
handle_exec_sql_next(g, req, true);
} else {
assert(g->leader != NULL);
failure(req, status, error_message(g->leader->conn, status));
g->req = NULL;
return;
}

/* It would be valid to always invoke handle_exec_sql_next directly
* here. But that can lead to bounded recursion when we have several
* `;`-separated statements in a row that do not generate rows. To make
* sure the stack depth stays under control, we defer have the event
* loop invoke handle_exec_sql_next itself on the next iteration, but
* only if there is a prior call to handle_exec_sql_next above us on
* the stack. We also invoke handle_exec_sql_next directly if the
* gateway doesn't have access to an event loop (this is only the case
* in the unit tests). */
if (exec->async || g->defer.loop == NULL) {
handle_exec_sql_next(g, req, true);
} else {
g->defer.data = g;
uv_timer_start(&g->defer, handle_exec_sql_next_defer_cb, 0, 0);
}
}

Expand Down
12 changes: 0 additions & 12 deletions src/gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,10 @@ struct gateway {
uint64_t protocol; /* Protocol format version */
uint64_t client_id;
struct id_state random_state; /* For generating IDs */
/* The EXEC_SQL request handler uses this to defer work to the next
* loop iteration, to avoid recursion when processing multi-statement
* SQL strings. */
uv_timer_t defer;
};

/**
* Initialize a gateway.
*
* Passing NULL for the `loop` is permitted. Currently the loop is only used
* optionally to break potential recursion when handling an EXEC_SQL request
* that containss multiple `;`-separated statements.
*/
void gateway__init(struct gateway *g,
struct config *config,
uv_loop_t *loop,
struct registry *registry,
struct raft *raft,
struct id_state seed);
Expand Down
5 changes: 0 additions & 5 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ static void leaderApplyFramesCb(struct raft_apply *req,
finish:
l->inflight = NULL;
l->db->tx_id = 0;
l->exec->async = true;
leaderExecDone(l->exec);
}

Expand Down Expand Up @@ -440,8 +439,6 @@ static void execBarrierCb(struct barrier *barrier, int status)
struct exec *req = barrier->data;
struct leader *l = req->leader;

req->async = req->barrier.async;

if (status != 0) {
l->exec->status = status;
leaderExecDone(l->exec);
Expand Down Expand Up @@ -508,7 +505,6 @@ static void raftBarrierCb(struct raft_barrier *req, int status)
return;
}
barrier->cb = NULL;
barrier->async = true;
cb(barrier, rv);
}

Expand All @@ -518,7 +514,6 @@ int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb)
int rv;
if (!needsBarrier(l)) {
tracef("not needed");
barrier->async = false;
cb(barrier, 0);
return 0;
}
Expand Down
9 changes: 1 addition & 8 deletions src/leader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ struct barrier {
void *data;
struct leader *leader;
struct raft_barrier req;
/* When the callback is invoked, this field is `true` if raft_barrier
* was called and `false` if the callback was invoked immediately. */
bool async;
barrier_cb cb;
};

Expand All @@ -66,12 +63,8 @@ struct exec {
uint64_t id;
int status;
queue queue;
pool_work_t work;
/* When the callback is invoked, this field is `true` if raft_barrier
* or raft_apply was called and `false` if the callback was invoked
* immediately. */
bool async;
exec_cb cb;
pool_work_t work;
};

/**
Expand Down
26 changes: 0 additions & 26 deletions test/integration/test_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,29 +145,3 @@ TEST(client, querySql, setUp, tearDown, 0, client_params)

return MUNIT_OK;
}

/* Stress test of an EXEC_SQL with many ';'-separated statements. */
TEST(client, semicolons, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
(void)params;

static const char trivial_stmt[] = "CREATE TABLE IF NOT EXISTS foo (n INT);";

size_t n = 1000;
size_t unit = sizeof(trivial_stmt) - 1;
char *sql = munit_malloc(n * unit);
char *p = sql;
for (size_t i = 0; i < n; i++) {
memcpy(p, trivial_stmt, unit);
p += unit;
}
sql[n * unit - 1] = '\0';

uint64_t last_insert_id;
uint64_t rows_affected;
EXEC_SQL(sql, &last_insert_id, &rows_affected);

free(sql);
return MUNIT_OK;
}
8 changes: 2 additions & 6 deletions test/unit/test_concurrency.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,8 @@ struct connection {
struct request_open open; \
struct response_db db; \
struct id_state seed = { { 1 } }; \
gateway__init(&c->gateway, \
CLUSTER_CONFIG(0), \
NULL, \
CLUSTER_REGISTRY(0), \
CLUSTER_RAFT(0), \
seed); \
gateway__init(&c->gateway, CLUSTER_CONFIG(0), \
CLUSTER_REGISTRY(0), CLUSTER_RAFT(0), seed); \
c->handle.data = &c->context; \
rc = buffer__init(&c->request); \
munit_assert_int(rc, ==, 0); \
Expand Down
8 changes: 2 additions & 6 deletions test/unit/test_gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@ struct connection {
struct id_state seed = { { 1 } }; \
config = CLUSTER_CONFIG(i); \
config->page_size = 512; \
gateway__init(&c->gateway, \
config, \
NULL, \
CLUSTER_REGISTRY(i), \
CLUSTER_RAFT(i), \
seed); \
gateway__init(&c->gateway, config, CLUSTER_REGISTRY(i), \
CLUSTER_RAFT(i), seed); \
c->handle.data = &c->context; \
rc = buffer__init(&c->buf1); \
munit_assert_int(rc, ==, 0); \
Expand Down
Loading