Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: first iteration of C client library #636

Closed
wants to merge 12 commits into from
2 changes: 2 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ include_HEADERS = include/dqlite.h

basic_dqlite_sources = \
src/bind.c \
src/client.c \
src/client/protocol.c \
src/command.c \
src/conn.c \
Expand Down Expand Up @@ -179,6 +180,7 @@ endif

integration_test_SOURCES = \
test/integration/test_client.c \
test/integration/test_client_protocol.c \
test/integration/test_cluster.c \
test/integration/test_fsm.c \
test/integration/test_membership.c \
Expand Down
67 changes: 67 additions & 0 deletions include/dqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ DQLITE_API int dqlite_version_number(void);
typedef unsigned long long dqlite_node_id;

DQLITE_EXPERIMENTAL typedef struct dqlite_server dqlite_server;
DQLITE_EXPERIMENTAL typedef struct dqlite dqlite;
DQLITE_EXPERIMENTAL typedef struct dqlite_stmt dqlite_stmt;

/**
* Signature of a custom callback used to establish network connections
Expand Down Expand Up @@ -730,4 +732,69 @@ DQLITE_API int dqlite_vfs_restore_disk(sqlite3_vfs *vfs,
const void *data,
size_t main_size,
size_t wal_size);


/**
* Open a database connection on the dqlite cluster.
*
* This request will be transparently forwarded to the cluster leader as needed.
*
* This is the analogue of sqlite3_open. @name is the name of the database
* to open, which will be created if it does not exist. All servers in the
* cluster share a "namespace" for databases.
*
* The @flags argument is currently ignored.
*/
DQLITE_API DQLITE_EXPERIMENTAL int dqlite_open(dqlite_server *server,
const char *name,
dqlite **db,
int flags);

/**
* Close a database after all associated prepared statements have been
* finalized.
*
* This is analogous to sqlite3_close (note, not sqlite3_close_v2). In
* particular, it will fail with SQLITE_BUSY if some dqlite_stmt objects
* associated with this database have not yet been finalized.
*/
DQLITE_API DQLITE_EXPERIMENTAL int dqlite_close(dqlite *db);

/**
* Create a prepared statement to be executed on the cluster.
*
* This is the analogue of sqlite3_prepare_v2.
*/
DQLITE_API DQLITE_EXPERIMENTAL int dqlite_prepare(dqlite *db,
const char *sql,
int sql_len,
dqlite_stmt **stmt,
const char **tail);

/**
* Create a prepared statement to be executed on the cluster.
*
* This is the analogue of sqlite3_prepare_v2.
*/
DQLITE_API DQLITE_EXPERIMENTAL int dqlite_prepare(dqlite *db,
const char *sql,
int sql_len,
dqlite_stmt **stmt,
const char **tail);

/**
* Execute a prepared statement for one "step".
*
* This is the analogue of sqlite3_step.
*/
DQLITE_API DQLITE_EXPERIMENTAL int dqlite_step(dqlite_stmt *stmt);

/**
* Release all resources associated with a prepared statement.
*
* This ends the statement's lifecycle, rendering it invalid for further use. It
* is the analogue of sqlite3_finalize.
*/
DQLITE_API DQLITE_EXPERIMENTAL int dqlite_finalize(dqlite_stmt *stmt);

#endif /* DQLITE_H */
184 changes: 184 additions & 0 deletions src/client.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>

#include "client.h"
#include "client/protocol.h"
#include "src/tracing.h"

static int client_connect_to_some_server(struct client_proto *proto,
struct node_store_cache *cache,
struct client_context *context)
{
int rv;

for (unsigned int i = 0; i < cache->len; i++) {
letFunny marked this conversation as resolved.
Show resolved Hide resolved
struct client_node_info node = cache->nodes[i];
if (clientOpen(proto, node.addr, node.id) != 0) {
continue;

Check warning on line 19 in src/client.c

View check run for this annotation

Codecov / codecov/patch

src/client.c#L19

Added line #L19 was not covered by tests
}
rv = clientSendHandshake(proto, context);
if (rv != SQLITE_OK) {
clientClose(proto);
continue;

Check warning on line 24 in src/client.c

View check run for this annotation

Codecov / codecov/patch

src/client.c#L23-L24

Added lines #L23 - L24 were not covered by tests
} else {
return SQLITE_OK;
}
}

return SQLITE_ERROR;
}

static int client_get_leader_and_open(struct client_proto *proto,
char *db_name,
struct client_context *context)
{
int rv;
// Get the leader from the server we connected to.
letFunny marked this conversation as resolved.
Show resolved Hide resolved
rv = clientSendLeader(proto, context);
if (rv != SQLITE_OK) {
return rv;
}
uint64_t server_id;
char *address;
rv = clientRecvServer(proto, &server_id, &address, context);
if (rv != SQLITE_OK) {
return rv;
}
clientClose(proto);

// Connect to the leader and open the db.
rv = clientOpen(proto, address, server_id);
if (rv != SQLITE_OK) {
free(address);
return rv;

Check warning on line 55 in src/client.c

View check run for this annotation

Codecov / codecov/patch

src/client.c#L54-L55

Added lines #L54 - L55 were not covered by tests
}
free(address);
address = NULL;
rv = clientSendHandshake(proto, context);
if (rv != SQLITE_OK) {
return rv;
}
rv = clientSendOpen(proto, db_name, context);
if (rv != SQLITE_OK) {
return rv;
}
rv = clientRecvDb(proto, context);
if (rv != SQLITE_OK) {
return rv;

Check warning on line 69 in src/client.c

View check run for this annotation

Codecov / codecov/patch

src/client.c#L69

Added line #L69 was not covered by tests
}

return SQLITE_OK;
}

// TODO remove DQLITE_VISIBLE_TO_TESTS from the client* functions.
// TODO it acceps the dqlite_server in order to tie the lifetime of the client
// and server. Why though? If we are not freeing any of them when we finish.
// TODO why have here flags if they are not used?
int dqlite_open(dqlite_server *server, const char *name, dqlite **db, int flags)
{
(void)flags;
*db = callocChecked(1, sizeof(dqlite));
letFunny marked this conversation as resolved.
Show resolved Hide resolved
(*db)->name = strdupChecked(name);
(*db)->server = server;
return SQLITE_OK;
}

int dqlite_close(dqlite *db)
{
free(db->name);
free(db);
return SQLITE_OK;
}

// TODO what happens if the leader changes, do we have a retry strategy?
int dqlite_prepare(dqlite *db,
const char *sql,
int sql_len,
dqlite_stmt **stmt,
const char **tail)
{
int rv;
struct client_proto proto = { 0 };
// TODO update client_proto in db->server?
proto.connect = db->server->connect;
proto.connect_arg = db->server->connect_arg;
struct client_context context = { 0 };
// TODO Why 5000? Eventually add function to configure it. Maybe add a
// dqlite_options argument.
// TODO CLOCK_MONOTONIC
clientContextMillis(&context, 5000);

bool connected = false;
while (!connected) {
struct timespec now = { 0 };
rv = clock_gettime(CLOCK_REALTIME, &now);
assert(rv == 0);
long long millis =
(context.deadline.tv_sec - now.tv_sec) * 1000 +
(context.deadline.tv_nsec - now.tv_nsec) / 1000000;
if (millis <= 0) {
break;
}
rv = pthread_mutex_lock(&db->server->mutex);
assert(rv == 0);
// Connect to any server.
connected =
client_connect_to_some_server(&proto, &db->server->cache,
&context) == SQLITE_OK;
rv = pthread_mutex_unlock(&db->server->mutex);
assert(rv == 0);
if (client_get_leader_and_open(&proto, db->name, &context) !=
SQLITE_OK) {
clientClose(&proto);
return SQLITE_ERROR;

Check warning on line 135 in src/client.c

View check run for this annotation

Codecov / codecov/patch

src/client.c#L135

Added line #L135 was not covered by tests
}
}

// Run the statement in the leader node.
// TODO check zero length.
size_t sql_owned_len = sql_len >= 0 ? (size_t)sql_len : strlen(sql);
const char *sql_owned = strndupChecked(sql, sql_owned_len);
if (tail != NULL) {
*tail = sql + sql_owned_len;

Check warning on line 144 in src/client.c

View check run for this annotation

Codecov / codecov/patch

src/client.c#L144

Added line #L144 was not covered by tests
}
rv = clientSendPrepare(&proto, sql_owned, &context);
free((void *)sql_owned);
if (rv != SQLITE_OK) {
clientClose(&proto);
return SQLITE_ERROR;
}
*stmt = callocChecked(1, sizeof(**stmt));
rv = clientRecvStmt(&proto, &(*stmt)->stmt_id, &(*stmt)->n_params,
&(*stmt)->offset, &context);
if (rv != SQLITE_OK) {
free(*stmt);

Check warning on line 156 in src/client.c

View check run for this annotation

Codecov / codecov/patch

src/client.c#L156

Added line #L156 was not covered by tests
clientClose(&proto);
return SQLITE_ERROR;
}
(*stmt)->proto = proto;

return SQLITE_OK;
}

int dqlite_finalize(dqlite_stmt *stmt)
{
struct client_context context;

if (stmt == NULL) {
return SQLITE_OK;
}
// TODO add options.
clientContextMillis(&context, 5000);
if (clientSendFinalize(&stmt->proto, stmt->stmt_id, &context) !=
letFunny marked this conversation as resolved.
Show resolved Hide resolved
SQLITE_OK) {
return SQLITE_ERROR;
}
if (clientRecvEmpty(&stmt->proto, &context) != SQLITE_OK) {
return SQLITE_ERROR;
}
clientClose(&stmt->proto);
free(stmt);
return SQLITE_OK;
}
20 changes: 20 additions & 0 deletions src/client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#ifndef DQLITE_CLIENT_H_
#define DQLITE_CLIENT_H_

#include "client/protocol.h"
#include "server.h"

struct dqlite {
struct dqlite_server *server;
char *name; /* owned */
};

struct dqlite_stmt {
uint32_t stmt_id;
uint64_t n_params;
struct client_proto proto;

uint64_t offset;
};

#endif /* DQLITE_CLIENT_H_ */
1 change: 1 addition & 0 deletions src/client/protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ static ssize_t doWrite(int fd,

n = write(fd, (char *)buf + (size_t)total,
buf_len - (size_t)total);
tracef("write n == %zd", n);
if (n < 0) {
if (errno == EINTR) {
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/client/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ struct client_file
void *mallocChecked(size_t n);
void *callocChecked(size_t nmemb, size_t size);
char *strdupChecked(const char *s);
char *strndupCheck(const char *s, size_t n);
char *strndupChecked(const char *s, size_t n);

/* Initialize a context whose deadline will fall after the given duration
* in milliseconds. */
Expand Down
Loading
Loading