Skip to content

Commit

Permalink
Replaced macro-based QUEUE__* interfaces by function-based in raft
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoliy Bilenko <[email protected]>
  • Loading branch information
just-now committed Mar 1, 2024
1 parent 4643bf3 commit 27e7835
Show file tree
Hide file tree
Showing 30 changed files with 236 additions and 286 deletions.
2 changes: 1 addition & 1 deletion bt/request
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ struct request
void *data;
int type;
raft_index index;
void *queue[2];
queue queue;
};
uprobe:$libraft_path:lifecycleRequestStart
Expand Down
9 changes: 7 additions & 2 deletions src/lib/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ struct queue

typedef struct queue queue;

#define QUEUE_DATA(pointer, type, field) \
((type *)((char *)(pointer)-offsetof(type, field)))
#define QUEUE_DATA(e, type, field) \
((type *)((void *)((char *)(e)-offsetof(type, field))))

#define QUEUE_FOREACH(q, h) for ((q) = (h)->next; (q) != (h); (q) = (q)->next)

Expand All @@ -37,6 +37,11 @@ static inline struct queue *queue_next(const struct queue *q)
return q->next;
}

static inline struct queue *queue_tail(const struct queue *q)
{
return q->prev;
}

static inline void queue_add(struct queue *h, struct queue *n)
{
h->prev->next = n->next;
Expand Down
24 changes: 12 additions & 12 deletions src/lib/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,25 @@ static inline void w_unregister(pool_t *pool, pool_work_t *w)

static bool empty(const queue *q)
{
return QUEUE__IS_EMPTY(q);
return queue_empty(q);
}

static queue *head(const queue *q)
{
return QUEUE__HEAD(q);
return queue_head(q);
}

static void push(queue *to, queue *what)
{
QUEUE__INSERT_TAIL(to, what);
queue_insert_tail(to, what);
}

static queue *pop(queue *from)
{
queue *q = QUEUE__HEAD(from);
queue *q = queue_head(from);
PRE(q != NULL);
QUEUE__REMOVE(q);
QUEUE__INIT(q);
queue_remove(q);
queue_init(q);
return q;
}

Expand All @@ -180,7 +180,7 @@ static queue *qos_pop(pool_impl_t *pi, queue *first, queue *second)

static pool_work_t *q_to_w(const queue *q)
{
return QUEUE__DATA(q, pool_work_t, link);
return QUEUE_DATA(q, pool_work_t, link);
}

static enum pool_work_type q_type(const queue *q)
Expand Down Expand Up @@ -413,7 +413,7 @@ static void pool_threads_init(pool_t *pool)
.idx = i,
};

QUEUE__INIT(&ts[i].inq);
queue_init(&ts[i].inq);
if (uv_cond_init(&ts[i].cond)) {
abort();
}
Expand Down Expand Up @@ -462,7 +462,7 @@ void work_done(uv_async_t *handle)
pool_impl_t *pi = CONTAINER_OF(handle, pool_impl_t, outq_async);

uv_mutex_lock(&pi->outq_mutex);
QUEUE__MOVE(&pi->outq, &q);
queue_move(&pi->outq, &q);
uv_mutex_unlock(&pi->outq_mutex);

while (!empty(&q)) {
Expand Down Expand Up @@ -513,9 +513,9 @@ int pool_init(pool_t *pool,
.threads_nr = threads_nr,
.ord_in_flight = 0,
};
QUEUE__INIT(&pi->outq);
QUEUE__INIT(&pi->ordered);
QUEUE__INIT(&pi->unordered);
queue_init(&pi->outq);
queue_init(&pi->ordered);
queue_init(&pi->unordered);

rc = uv_mutex_init(&pi->outq_mutex);
if (rc != 0) {
Expand Down
6 changes: 4 additions & 2 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include <uv.h>

#include "lib/queue.h"

#ifndef RAFT_API
#define RAFT_API __attribute__((visibility("default")))
#endif
Expand Down Expand Up @@ -802,7 +804,7 @@ struct raft
raft_index
round_index; /* Target of the current round. */
raft_time round_start; /* Start of current round. */
void *requests[2]; /* Outstanding client requests. */
queue requests; /* Outstanding client requests. */
uint32_t
voter_contacts; /* Current number of voting nodes we
are in contact with */
Expand Down Expand Up @@ -1029,7 +1031,7 @@ RAFT_API int raft_voter_contacts(struct raft *r);
void *data; \
int type; \
raft_index index; \
void *queue[2]; \
queue queue; \
uint8_t req_id[16]; \
uint8_t client_id[16]; \
uint8_t unique_id[16]; \
Expand Down
6 changes: 3 additions & 3 deletions src/raft/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "log.h"
#include "membership.h"
#include "progress.h"
#include "queue.h"
#include "../lib/queue.h"
#include "replication.h"
#include "request.h"

Expand Down Expand Up @@ -57,7 +57,7 @@ int raft_apply(struct raft *r,

err_after_log_append:
logDiscard(r->log, index);
QUEUE_REMOVE(&req->queue);
queue_remove(&req->queue);
err:
assert(rv != 0);
return rv;
Expand Down Expand Up @@ -106,7 +106,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb)

err_after_log_append:
logDiscard(r->log, index);
QUEUE_REMOVE(&req->queue);
queue_remove(&req->queue);
err_after_buf_alloc:
raft_free(buf.base);
err:
Expand Down
10 changes: 5 additions & 5 deletions src/raft/convert.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "log.h"
#include "membership.h"
#include "progress.h"
#include "queue.h"
#include "../lib/queue.h"
#include "replication.h"
#include "request.h"

Expand Down Expand Up @@ -93,11 +93,11 @@ static void convertClearLeader(struct raft *r)
}

/* Fail all outstanding requests */
while (!QUEUE_IS_EMPTY(&r->leader_state.requests)) {
while (!queue_empty(&r->leader_state.requests)) {
struct request *req;
queue *head;
head = QUEUE_HEAD(&r->leader_state.requests);
QUEUE_REMOVE(head);
head = queue_head(&r->leader_state.requests);
queue_remove(head);
req = QUEUE_DATA(head, struct request, queue);
assert(req->type == RAFT_COMMAND || req->type == RAFT_BARRIER);
switch (req->type) {
Expand Down Expand Up @@ -209,7 +209,7 @@ int convertToLeader(struct raft *r)
r->election_timer_start = r->io->time(r->io);

/* Reset apply requests queue */
QUEUE_INIT(&r->leader_state.requests);
queue_init(&r->leader_state.requests);

/* Allocate and initialize the progress array. */
rv = progressBuildArray(r);
Expand Down
24 changes: 12 additions & 12 deletions src/raft/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "convert.h"
#include "entry.h"
#include "log.h"
#include "queue.h"
#include "../lib/queue.h"
#include "snapshot.h"

/* Defaults */
Expand Down Expand Up @@ -365,7 +365,7 @@ static void ioFlushSend(struct io *io, struct send *send)
src = &send->message;
dst = &transmit->message;

QUEUE_PUSH(&io->requests, &transmit->queue);
queue_insert_tail(&io->requests, &transmit->queue);

*dst = *src;
switch (dst->type) {
Expand Down Expand Up @@ -416,12 +416,12 @@ static void ioDestroyTransmit(struct transmit *transmit)
/* Flush all requests in the queue. */
static void ioFlushAll(struct io *io)
{
while (!QUEUE_IS_EMPTY(&io->requests)) {
while (!queue_empty(&io->requests)) {
queue *head;
struct ioRequest *r;

head = QUEUE_HEAD(&io->requests);
QUEUE_REMOVE(head);
head = queue_head(&io->requests);
queue_remove(head);

r = QUEUE_DATA(head, struct ioRequest, queue);
switch (r->type) {
Expand Down Expand Up @@ -592,7 +592,7 @@ static int ioMethodAppend(struct raft_io *raft_io,

req->cb = cb;

QUEUE_PUSH(&io->requests, &r->queue);
queue_insert_tail(&io->requests, &r->queue);

return 0;
}
Expand Down Expand Up @@ -660,7 +660,7 @@ static int ioMethodSnapshotPut(struct raft_io *raft_io,
r->completion_time = *io->time + io->disk_latency;
r->trailing = trailing;

QUEUE_PUSH(&io->requests, &r->queue);
queue_insert_tail(&io->requests, &r->queue);

return 0;
}
Expand All @@ -680,7 +680,7 @@ static int ioMethodAsyncWork(struct raft_io *raft_io,
r->req->cb = cb;
r->completion_time = *io->time + io->work_duration;

QUEUE_PUSH(&io->requests, &r->queue);
queue_insert_tail(&io->requests, &r->queue);
return 0;
}

Expand All @@ -699,7 +699,7 @@ static int ioMethodSnapshotGet(struct raft_io *raft_io,
r->req->cb = cb;
r->completion_time = *io->time + io->disk_latency;

QUEUE_PUSH(&io->requests, &r->queue);
queue_insert_tail(&io->requests, &r->queue);

return 0;
}
Expand Down Expand Up @@ -749,7 +749,7 @@ static int ioMethodSend(struct raft_io *raft_io,
peer = ioGetPeer(io, message->server_id);
r->completion_time = *io->time + peer->send_latency;

QUEUE_PUSH(&io->requests, &r->queue);
queue_insert_tail(&io->requests, &r->queue);

return 0;
}
Expand Down Expand Up @@ -881,7 +881,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time)
io->snapshot = NULL;
io->entries = NULL;
io->n = 0;
QUEUE_INIT(&io->requests);
queue_init(&io->requests);
io->n_peers = 0;
io->randomized_election_timeout = ELECTION_TIMEOUT + index * 100;
io->network_latency = NETWORK_LATENCY;
Expand Down Expand Up @@ -1400,7 +1400,7 @@ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t)
}
}
assert(found);
QUEUE_REMOVE(head);
queue_remove(head);
switch (r->type) {
case APPEND:
ioFlushAppend(io, (struct append *)r);
Expand Down
6 changes: 3 additions & 3 deletions src/raft/lifecycle.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "lifecycle.h"
#include "../tracing.h"
#include "queue.h"
#include "../lib/queue.h"

#include <inttypes.h>
#include <stdlib.h>
Expand All @@ -23,7 +23,7 @@ void lifecycleRequestStart(struct raft *r, struct request *req)
if (reqIdIsSet(req)) {
tracef("request start id:%" PRIu64, extractReqId(req));
}
QUEUE_PUSH(&r->leader_state.requests, &req->queue);
queue_insert_tail(&r->leader_state.requests, &req->queue);
}

void lifecycleRequestEnd(struct raft *r, struct request *req)
Expand All @@ -32,5 +32,5 @@ void lifecycleRequestEnd(struct raft *r, struct request *req)
if (reqIdIsSet(req)) {
tracef("request end id:%" PRIu64, extractReqId(req));
}
QUEUE_REMOVE(&req->queue);
queue_remove(&req->queue);
}
57 changes: 0 additions & 57 deletions src/raft/queue.h

This file was deleted.

2 changes: 1 addition & 1 deletion src/raft/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "log.h"
#include "membership.h"
#include "progress.h"
#include "queue.h"
#include "../lib/queue.h"
#include "replication.h"
#include "request.h"
#include "snapshot.h"
Expand Down
2 changes: 1 addition & 1 deletion src/raft/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ struct request
void *data;
int type;
raft_index index;
void *queue[2];
queue queue;
uint8_t req_id[16];
uint8_t client_id[16];
uint8_t unique_id[16];
Expand Down
2 changes: 1 addition & 1 deletion src/raft/state.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "configuration.h"
#include "election.h"
#include "log.h"
#include "queue.h"
#include "../lib/queue.h"

int raft_state(struct raft *r)
{
Expand Down
Loading

0 comments on commit 27e7835

Please sign in to comment.