Skip to content

Commit

Permalink
Merge pull request #560 from MathieuBordere/interrupt
Browse files Browse the repository at this point in the history
Interrupt query yielding rows.
  • Loading branch information
Mathieu Borderé authored Feb 7, 2024
2 parents 71562ee + 0976158 commit 4ca4dae
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 43 deletions.
27 changes: 22 additions & 5 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "tracing.h"
#include "transport.h"

#include <stdlib.h>

/* Initialize the given buffer for reading, ensure it has the given size. */
static int init_read(struct conn *c, uv_buf_t *buf, size_t size)
{
Expand All @@ -18,34 +20,46 @@ static int init_read(struct conn *c, uv_buf_t *buf, size_t size)
}

static int read_message(struct conn *c);
static void conn_write_cb(struct transport *transport, int status)
static void conn_write_cb(uv_write_t *req, int status)
{
struct conn *c = transport->data;
struct transport *t = req->data;
assert(t != NULL);
struct conn *c = t->data;
assert(c != NULL);
bool finished;
int rv;
if (status != 0) {
tracef("write cb status %d", status);
goto abort;
}
if (c->closed) {
tracef("connection closing");
goto abort;
}

buffer__reset(&c->write);
buffer__advance(&c->write, message__sizeof(&c->response)); /* Header */

rv = gateway__resume(&c->gateway, &finished);
tracef("request finished: %d", finished);
if (rv != 0) {
goto abort;
}
if (!finished) {

/* Start reading the next message if we're not doing that already. */
if (c->reading_message) {
free(req);
return;
}

/* Start reading the next request */
rv = read_message(c);
if (rv != 0) {
goto abort;
}

free(req);
return;
abort:
free(req);
conn__stop(c);
}

Expand Down Expand Up @@ -191,6 +205,7 @@ static void read_message_cb(struct transport *transport, int status)
struct cursor cursor;
int rv;

c->reading_message = false;
if (status != 0) {
// errorf(c->logger, "read error");
tracef("read error %d", status);
Expand Down Expand Up @@ -222,6 +237,7 @@ static int read_message(struct conn *c)
tracef("init read failed %d", rv);
return rv;
}
c->reading_message = true;
rv = transport__read(&c->transport, &buf, read_message_cb);
if (rv != 0) {
tracef("transport read failed %d", rv);
Expand Down Expand Up @@ -320,6 +336,7 @@ int conn__start(struct conn *c,
}
c->handle.data = c;
c->closed = false;
c->reading_message = false;
/* First, we expect the client to send us the protocol version. */
rv = read_protocol(c);
if (rv != 0) {
Expand Down
1 change: 1 addition & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct conn
struct message response; /* Response message meta data */
struct handle handle;
bool closed;
bool reading_message; /* Conn is waiting for a message */
queue queue;
};

Expand Down
14 changes: 4 additions & 10 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -1394,11 +1394,8 @@ int gateway__handle(struct gateway *g,
goto handle;
}

/* Request in progress. TODO The current implementation doesn't allow
* reading a new request while a query is yielding rows, in that case
* gateway__resume in write_cb will indicate it has not finished
* returning results and a new request (in this case, the interrupt)
* will not be read. */
/* Request in progress, the only time we allow interleaving requests is
* when the second request tries to interrupt a query yielding rows. */
if (g->req->type == DQLITE_REQUEST_QUERY &&
type == DQLITE_REQUEST_INTERRUPT) {
goto handle;
Expand All @@ -1410,9 +1407,8 @@ int gateway__handle(struct gateway *g,
}

/* Receiving a request when one is ongoing on the same connection
* is a hard error. The connection will be stopped due to the non-0
* return code in case asserts are off. */
assert(false);
* is an error, unless it's an interrupt request. The connection will be
* stopped due to the non-0 return value. */
return SQLITE_BUSY;

handle:
Expand Down Expand Up @@ -1445,11 +1441,9 @@ int gateway__resume(struct gateway *g, bool *finished)
{
if (g->req == NULL || (g->req->type != DQLITE_REQUEST_QUERY &&
g->req->type != DQLITE_REQUEST_QUERY_SQL)) {
tracef("gateway resume - finished");
*finished = true;
return 0;
}
tracef("gateway resume - not finished");
*finished = false;
query_batch(g);
return 0;
Expand Down
26 changes: 9 additions & 17 deletions src/lib/transport.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <raft.h>

#include <stdlib.h>

#include "../../include/dqlite.h"

#include "assert.h"
Expand Down Expand Up @@ -122,9 +124,7 @@ int transport__init(struct transport *t, struct uv_stream_s *stream)
t->stream->data = t;
t->read.base = NULL;
t->read.len = 0;
t->write.data = t;
t->read_cb = NULL;
t->write_cb = NULL;
t->close_cb = NULL;

return 0;
Expand Down Expand Up @@ -161,23 +161,15 @@ int transport__read(struct transport *t, uv_buf_t *buf, transport_read_cb cb)
return 0;
}

static void write_cb(uv_write_t *req, int status)
{
struct transport *t = req->data;
transport_write_cb cb = t->write_cb;

assert(cb != NULL);
t->write_cb = NULL;

cb(t, status);
}

int transport__write(struct transport *t, uv_buf_t *buf, transport_write_cb cb)
int transport__write(struct transport *t, uv_buf_t *buf, uv_write_cb cb)
{
int rv;
assert(t->write_cb == NULL);
t->write_cb = cb;
rv = uv_write(&t->write, t->stream, buf, 1, write_cb);
uv_write_t *req = malloc(sizeof(*req));
if (req == NULL) {
return DQLITE_NOMEM;
}
req->data = t;
rv = uv_write(req, t->stream, buf, 1, cb);
if (rv != 0) {
return rv;
}
Expand Down
7 changes: 3 additions & 4 deletions src/lib/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
struct transport;
typedef void (*transport_read_cb)(struct transport *t, int status);
typedef void (*transport_write_cb)(struct transport *t, int status);
typedef void (*transport_close_cb)(struct transport *t);

/**
Expand All @@ -28,7 +27,6 @@ struct transport
uv_buf_t read; /* Read buffer */
uv_write_t write; /* Write request */
transport_read_cb read_cb; /* Read callback */
transport_write_cb write_cb; /* Write callback */
transport_close_cb close_cb; /* Close callback */
};

Expand All @@ -49,9 +47,10 @@ void transport__close(struct transport *t, transport_close_cb cb);
int transport__read(struct transport *t, uv_buf_t *buf, transport_read_cb cb);

/**
* Write the given buffer to the transport.
* Write the given buffer to the transport. The @cb gains ownership of
* uv_write_t and must `free` it at its own convenience.
*/
int transport__write(struct transport *t, uv_buf_t *buf, transport_write_cb cb);
int transport__write(struct transport *t, uv_buf_t *buf, uv_write_cb cb);

/* Create an UV stream object from the given fd. */
int transport__stream(struct uv_loop_s *loop,
Expand Down
72 changes: 72 additions & 0 deletions test/integration/test_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,46 @@ TEST(client, query, setUp, tearDown, 0, client_params)
return MUNIT_OK;
}

TEST(client, queryReuseStmtIdAferInterrupt, setUp, tearDown, 0, client_params)
{
struct fixture *f = data;
uint32_t stmt_id;
uint64_t last_insert_id;
uint64_t rows_affected;
unsigned i;
struct rows rows;
(void)params;
PREPARE("CREATE TABLE test (n INT)", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

PREPARE("BEGIN", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

PREPARE("INSERT INTO test (n) VALUES(123)", &stmt_id);
for (i = 0; i < 4098; i++) {
EXEC(stmt_id, &last_insert_id, &rows_affected);
}

PREPARE("COMMIT", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

/* More than 1 response buffer will be needed to return all the rows, so
* we are able to interrupt the query. */
PREPARE("SELECT * FROM test", &stmt_id);
bool done = true;
QUERY_DONE(stmt_id, &rows, &done);
munit_assert_false(done);

clientSendInterrupt(f->client, NULL);
clientCloseRows(&rows);

/* Ensure stmt_id is still valid after interrupt. */
QUERY(stmt_id, &rows);

clientCloseRows(&rows);
return MUNIT_OK;
}

TEST(client, querySql, setUp, tearDown, 0, client_params)
{
struct fixture *f = data;
Expand All @@ -149,3 +189,35 @@ TEST(client, querySql, setUp, tearDown, 0, client_params)

return MUNIT_OK;
}

TEST(client, querySqlInterrupt, setUp, tearDown, 0, client_params)
{
struct fixture *f = data;
uint32_t stmt_id;
uint64_t last_insert_id;
uint64_t rows_affected;
unsigned i;
struct rows rows;
bool done = true;
(void)params;
EXEC_SQL("CREATE TABLE test (n INT)", &last_insert_id, &rows_affected);

EXEC_SQL("BEGIN", &last_insert_id, &rows_affected);

PREPARE("INSERT INTO test (n) VALUES(123)", &stmt_id);
for (i = 0; i < 4098; i++) {
EXEC(stmt_id, &last_insert_id, &rows_affected);
}

EXEC_SQL("COMMIT", &last_insert_id, &rows_affected);

/* More than 1 response buffer will be needed to return all the rows, so
* we are able to interrupt the query. */
QUERY_SQL_DONE("SELECT * FROM test", &rows, &done);
munit_assert_false(done);

clientSendInterrupt(f->client, NULL);
clientCloseRows(&rows);

return MUNIT_OK;
}
16 changes: 11 additions & 5 deletions test/lib/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,29 @@
munit_assert_int(rv_, ==, 0); \
}

/* Perform a query. */
#define QUERY(STMT_ID, ROWS) \
/* Perform a query, DONE is a pointer to a bool that will be true when the query
* is done. */
#define QUERY_DONE(STMT_ID, ROWS, DONE) \
{ \
int rv_; \
rv_ = clientSendQuery(f->client, STMT_ID, NULL, 0, NULL); \
munit_assert_int(rv_, ==, 0); \
rv_ = clientRecvRows(f->client, ROWS, NULL, NULL); \
rv_ = clientRecvRows(f->client, ROWS, DONE, NULL); \
munit_assert_int(rv_, ==, 0); \
}

#define QUERY_SQL(SQL, ROWS) \
/* Perform a query. */
#define QUERY(STMT_ID, ROWS) QUERY_DONE(STMT_ID, ROWS, NULL)

#define QUERY_SQL_DONE(SQL, ROWS, DONE) \
{ \
int rv_; \
rv_ = clientSendQuerySQL(f->client, SQL, NULL, 0, NULL); \
munit_assert_int(rv_, ==, 0); \
rv_ = clientRecvRows(f->client, ROWS, NULL, NULL); \
rv_ = clientRecvRows(f->client, ROWS, DONE, NULL); \
munit_assert_int(rv_, ==, 0); \
}

#define QUERY_SQL(SQL, ROWS) QUERY_SQL_DONE(SQL, ROWS, NULL)

#endif /* TEST_CLIENT_H */
7 changes: 5 additions & 2 deletions test/unit/lib/test_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ static void read_cb(struct transport *transport, int status)
f->read.status = status;
}

static void write_cb(struct transport *transport, int status)
static void write_cb(uv_write_t *req, int status)
{
struct fixture *f = transport->data;
struct transport *t = req->data;
munit_assert_ptr_not_null(t);
struct fixture *f = t->data;
f->write.invoked = true;
f->write.status = status;
free(req);
}

static void *setup(const MunitParameter params[], void *user_data)
Expand Down

0 comments on commit 4ca4dae

Please sign in to comment.