diff --git a/src/conn.c b/src/conn.c index 4b9a990d8..1b031bc5b 100644 --- a/src/conn.c +++ b/src/conn.c @@ -5,8 +5,6 @@ #include "tracing.h" #include "transport.h" -#include - /* Initialize the given buffer for reading, ensure it has the given size. */ static int init_read(struct conn *c, uv_buf_t *buf, size_t size) { @@ -20,46 +18,34 @@ static int init_read(struct conn *c, uv_buf_t *buf, size_t size) } static int read_message(struct conn *c); -static void conn_write_cb(uv_write_t *req, int status) +static void conn_write_cb(struct transport *transport, int status) { - struct transport *t = req->data; - assert(t != NULL); - struct conn *c = t->data; - assert(c != NULL); + struct conn *c = transport->data; bool finished; int rv; if (status != 0) { tracef("write cb status %d", status); goto abort; } - if (c->closed) { - tracef("connection closing"); - goto abort; - } buffer__reset(&c->write); buffer__advance(&c->write, message__sizeof(&c->response)); /* Header */ rv = gateway__resume(&c->gateway, &finished); - tracef("request finished: %d", finished); if (rv != 0) { goto abort; } - - /* Start reading the next message if we're not doing that already. */ - if (c->reading_message) { - free(req); + if (!finished) { return; } + + /* Start reading the next request */ rv = read_message(c); if (rv != 0) { goto abort; } - - free(req); return; abort: - free(req); conn__stop(c); } @@ -205,7 +191,6 @@ static void read_message_cb(struct transport *transport, int status) struct cursor cursor; int rv; - c->reading_message = false; if (status != 0) { // errorf(c->logger, "read error"); tracef("read error %d", status); @@ -237,7 +222,6 @@ static int read_message(struct conn *c) tracef("init read failed %d", rv); return rv; } - c->reading_message = true; rv = transport__read(&c->transport, &buf, read_message_cb); if (rv != 0) { tracef("transport read failed %d", rv); @@ -336,7 +320,6 @@ int conn__start(struct conn *c, } c->handle.data = c; c->closed = false; - c->reading_message = false; /* First, we expect the client to send us the protocol version. */ rv = read_protocol(c); if (rv != 0) { diff --git a/src/conn.h b/src/conn.h index 2ce383dc8..fb93c8f27 100644 --- a/src/conn.h +++ b/src/conn.h @@ -35,7 +35,6 @@ struct conn struct message response; /* Response message meta data */ struct handle handle; bool closed; - bool reading_message; /* Conn is waiting for a message */ queue queue; }; diff --git a/src/gateway.c b/src/gateway.c index cc8bb873b..61288a1c1 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -1394,8 +1394,11 @@ int gateway__handle(struct gateway *g, goto handle; } - /* Request in progress, the only time we allow interleaving requests is - * when the second request tries to interrupt a query yielding rows. */ + /* Request in progress. TODO The current implementation doesn't allow + * reading a new request while a query is yielding rows, in that case + * gateway__resume in write_cb will indicate it has not finished + * returning results and a new request (in this case, the interrupt) + * will not be read. */ if (g->req->type == DQLITE_REQUEST_QUERY && type == DQLITE_REQUEST_INTERRUPT) { goto handle; @@ -1407,8 +1410,9 @@ int gateway__handle(struct gateway *g, } /* Receiving a request when one is ongoing on the same connection - * is an error, unless it's an interrupt request. The connection will be - * stopped due to the non-0 return value. */ + * is a hard error. The connection will be stopped due to the non-0 + * return code in case asserts are off. */ + assert(false); return SQLITE_BUSY; handle: @@ -1441,9 +1445,11 @@ int gateway__resume(struct gateway *g, bool *finished) { if (g->req == NULL || (g->req->type != DQLITE_REQUEST_QUERY && g->req->type != DQLITE_REQUEST_QUERY_SQL)) { + tracef("gateway resume - finished"); *finished = true; return 0; } + tracef("gateway resume - not finished"); *finished = false; query_batch(g); return 0; diff --git a/src/lib/transport.c b/src/lib/transport.c index 2a81cd52c..f833266cb 100644 --- a/src/lib/transport.c +++ b/src/lib/transport.c @@ -1,7 +1,5 @@ #include -#include - #include "../../include/dqlite.h" #include "assert.h" @@ -124,7 +122,9 @@ int transport__init(struct transport *t, struct uv_stream_s *stream) t->stream->data = t; t->read.base = NULL; t->read.len = 0; + t->write.data = t; t->read_cb = NULL; + t->write_cb = NULL; t->close_cb = NULL; return 0; @@ -161,15 +161,23 @@ int transport__read(struct transport *t, uv_buf_t *buf, transport_read_cb cb) return 0; } -int transport__write(struct transport *t, uv_buf_t *buf, uv_write_cb cb) +static void write_cb(uv_write_t *req, int status) +{ + struct transport *t = req->data; + transport_write_cb cb = t->write_cb; + + assert(cb != NULL); + t->write_cb = NULL; + + cb(t, status); +} + +int transport__write(struct transport *t, uv_buf_t *buf, transport_write_cb cb) { int rv; - uv_write_t *req = malloc(sizeof(*req)); - if (req == NULL) { - return DQLITE_NOMEM; - } - req->data = t; - rv = uv_write(req, t->stream, buf, 1, cb); + assert(t->write_cb == NULL); + t->write_cb = cb; + rv = uv_write(&t->write, t->stream, buf, 1, write_cb); if (rv != 0) { return rv; } diff --git a/src/lib/transport.h b/src/lib/transport.h index 8512754ba..314d7824c 100644 --- a/src/lib/transport.h +++ b/src/lib/transport.h @@ -14,6 +14,7 @@ */ struct transport; typedef void (*transport_read_cb)(struct transport *t, int status); +typedef void (*transport_write_cb)(struct transport *t, int status); typedef void (*transport_close_cb)(struct transport *t); /** @@ -27,6 +28,7 @@ struct transport uv_buf_t read; /* Read buffer */ uv_write_t write; /* Write request */ transport_read_cb read_cb; /* Read callback */ + transport_write_cb write_cb; /* Write callback */ transport_close_cb close_cb; /* Close callback */ }; @@ -47,10 +49,9 @@ void transport__close(struct transport *t, transport_close_cb cb); int transport__read(struct transport *t, uv_buf_t *buf, transport_read_cb cb); /** - * Write the given buffer to the transport. The @cb gains ownership of - * uv_write_t and must `free` it at its own convenience. + * Write the given buffer to the transport. */ -int transport__write(struct transport *t, uv_buf_t *buf, uv_write_cb cb); +int transport__write(struct transport *t, uv_buf_t *buf, transport_write_cb cb); /* Create an UV stream object from the given fd. */ int transport__stream(struct uv_loop_s *loop, diff --git a/test/integration/test_client.c b/test/integration/test_client.c index 4fa0c4668..d919cc5db 100644 --- a/test/integration/test_client.c +++ b/test/integration/test_client.c @@ -123,46 +123,6 @@ TEST(client, query, setUp, tearDown, 0, client_params) return MUNIT_OK; } -TEST(client, queryReuseStmtIdAferInterrupt, setUp, tearDown, 0, client_params) -{ - struct fixture *f = data; - uint32_t stmt_id; - uint64_t last_insert_id; - uint64_t rows_affected; - unsigned i; - struct rows rows; - (void)params; - PREPARE("CREATE TABLE test (n INT)", &stmt_id); - EXEC(stmt_id, &last_insert_id, &rows_affected); - - PREPARE("BEGIN", &stmt_id); - EXEC(stmt_id, &last_insert_id, &rows_affected); - - PREPARE("INSERT INTO test (n) VALUES(123)", &stmt_id); - for (i = 0; i < 4098; i++) { - EXEC(stmt_id, &last_insert_id, &rows_affected); - } - - PREPARE("COMMIT", &stmt_id); - EXEC(stmt_id, &last_insert_id, &rows_affected); - - /* More than 1 response buffer will be needed to return all the rows, so - * we are able to interrupt the query. */ - PREPARE("SELECT * FROM test", &stmt_id); - bool done = true; - QUERY_DONE(stmt_id, &rows, &done); - munit_assert_false(done); - - clientSendInterrupt(f->client, NULL); - clientCloseRows(&rows); - - /* Ensure stmt_id is still valid after interrupt. */ - QUERY(stmt_id, &rows); - - clientCloseRows(&rows); - return MUNIT_OK; -} - TEST(client, querySql, setUp, tearDown, 0, client_params) { struct fixture *f = data; @@ -189,35 +149,3 @@ TEST(client, querySql, setUp, tearDown, 0, client_params) return MUNIT_OK; } - -TEST(client, querySqlInterrupt, setUp, tearDown, 0, client_params) -{ - struct fixture *f = data; - uint32_t stmt_id; - uint64_t last_insert_id; - uint64_t rows_affected; - unsigned i; - struct rows rows; - bool done = true; - (void)params; - EXEC_SQL("CREATE TABLE test (n INT)", &last_insert_id, &rows_affected); - - EXEC_SQL("BEGIN", &last_insert_id, &rows_affected); - - PREPARE("INSERT INTO test (n) VALUES(123)", &stmt_id); - for (i = 0; i < 4098; i++) { - EXEC(stmt_id, &last_insert_id, &rows_affected); - } - - EXEC_SQL("COMMIT", &last_insert_id, &rows_affected); - - /* More than 1 response buffer will be needed to return all the rows, so - * we are able to interrupt the query. */ - QUERY_SQL_DONE("SELECT * FROM test", &rows, &done); - munit_assert_false(done); - - clientSendInterrupt(f->client, NULL); - clientCloseRows(&rows); - - return MUNIT_OK; -} diff --git a/test/lib/client.h b/test/lib/client.h index d21ef65c9..03eab5634 100644 --- a/test/lib/client.h +++ b/test/lib/client.h @@ -150,29 +150,23 @@ munit_assert_int(rv_, ==, 0); \ } -/* Perform a query, DONE is a pointer to a bool that will be true when the query - * is done. */ -#define QUERY_DONE(STMT_ID, ROWS, DONE) \ +/* Perform a query. */ +#define QUERY(STMT_ID, ROWS) \ { \ int rv_; \ rv_ = clientSendQuery(f->client, STMT_ID, NULL, 0, NULL); \ munit_assert_int(rv_, ==, 0); \ - rv_ = clientRecvRows(f->client, ROWS, DONE, NULL); \ + rv_ = clientRecvRows(f->client, ROWS, NULL, NULL); \ munit_assert_int(rv_, ==, 0); \ } -/* Perform a query. */ -#define QUERY(STMT_ID, ROWS) QUERY_DONE(STMT_ID, ROWS, NULL) - -#define QUERY_SQL_DONE(SQL, ROWS, DONE) \ +#define QUERY_SQL(SQL, ROWS) \ { \ int rv_; \ rv_ = clientSendQuerySQL(f->client, SQL, NULL, 0, NULL); \ munit_assert_int(rv_, ==, 0); \ - rv_ = clientRecvRows(f->client, ROWS, DONE, NULL); \ + rv_ = clientRecvRows(f->client, ROWS, NULL, NULL); \ munit_assert_int(rv_, ==, 0); \ } -#define QUERY_SQL(SQL, ROWS) QUERY_SQL_DONE(SQL, ROWS, NULL) - #endif /* TEST_CLIENT_H */ diff --git a/test/unit/lib/test_transport.c b/test/unit/lib/test_transport.c index 1961e9755..2a66ea94f 100644 --- a/test/unit/lib/test_transport.c +++ b/test/unit/lib/test_transport.c @@ -39,14 +39,11 @@ static void read_cb(struct transport *transport, int status) f->read.status = status; } -static void write_cb(uv_write_t *req, int status) +static void write_cb(struct transport *transport, int status) { - struct transport *t = req->data; - munit_assert_ptr_not_null(t); - struct fixture *f = t->data; + struct fixture *f = transport->data; f->write.invoked = true; f->write.status = status; - free(req); } static void *setup(const MunitParameter params[], void *user_data)