From 27e78352cf736105fec664c3b7d7b1068201546f Mon Sep 17 00:00:00 2001 From: Anatoliy Bilenko Date: Thu, 22 Feb 2024 10:46:28 +0000 Subject: [PATCH] Replaced macro-based QUEUE__* interfaces by function-based in raft Signed-off-by: Anatoliy Bilenko --- bt/request | 2 +- src/lib/queue.h | 9 +++- src/lib/threadpool.c | 24 +++++----- src/raft.h | 6 ++- src/raft/client.c | 6 +-- src/raft/convert.c | 10 ++-- src/raft/fixture.c | 24 +++++----- src/raft/lifecycle.c | 6 +-- src/raft/queue.h | 57 ---------------------- src/raft/replication.c | 2 +- src/raft/request.h | 2 +- src/raft/state.c | 2 +- src/raft/uv.c | 32 ++++++------- src/raft/uv.h | 2 +- src/raft/uv_append.c | 94 ++++++++++++++++++------------------- src/raft/uv_finalize.c | 10 ++-- src/raft/uv_prepare.c | 42 ++++++++--------- src/raft/uv_recv.c | 12 ++--- src/raft/uv_send.c | 32 ++++++------- src/raft/uv_snapshot.c | 6 +-- src/raft/uv_tcp.c | 12 ++--- src/raft/uv_tcp.h | 2 +- src/raft/uv_tcp_connect.c | 14 +++--- src/raft/uv_tcp_listen.c | 16 +++---- src/raft/uv_truncate.c | 4 +- src/raft/uv_work.c | 6 +-- src/raft/uv_writer.c | 30 ++++++------ src/raft/uv_writer.h | 2 +- test/raft/unit/test_queue.c | 52 ++++++++++---------- test/unit/test_conn.c | 4 +- 30 files changed, 236 insertions(+), 286 deletions(-) delete mode 100644 src/raft/queue.h diff --git a/bt/request b/bt/request index 8f855fdc6..129f91cd8 100755 --- a/bt/request +++ b/bt/request @@ -11,7 +11,7 @@ struct request void *data; int type; raft_index index; - void *queue[2]; + queue queue; }; uprobe:$libraft_path:lifecycleRequestStart diff --git a/src/lib/queue.h b/src/lib/queue.h index 84fb6c022..10217213c 100644 --- a/src/lib/queue.h +++ b/src/lib/queue.h @@ -11,8 +11,8 @@ struct queue typedef struct queue queue; -#define QUEUE_DATA(pointer, type, field) \ - ((type *)((char *)(pointer)-offsetof(type, field))) +#define QUEUE_DATA(e, type, field) \ + ((type *)((void *)((char *)(e)-offsetof(type, field)))) #define QUEUE_FOREACH(q, h) for ((q) = (h)->next; (q) != (h); (q) = (q)->next) @@ -37,6 +37,11 @@ static inline struct queue *queue_next(const struct queue *q) return q->next; } +static inline struct queue *queue_tail(const struct queue *q) +{ + return q->prev; +} + static inline void queue_add(struct queue *h, struct queue *n) { h->prev->next = n->next; diff --git a/src/lib/threadpool.c b/src/lib/threadpool.c index 3321a5615..28275036d 100644 --- a/src/lib/threadpool.c +++ b/src/lib/threadpool.c @@ -143,25 +143,25 @@ static inline void w_unregister(pool_t *pool, pool_work_t *w) static bool empty(const queue *q) { - return QUEUE__IS_EMPTY(q); + return queue_empty(q); } static queue *head(const queue *q) { - return QUEUE__HEAD(q); + return queue_head(q); } static void push(queue *to, queue *what) { - QUEUE__INSERT_TAIL(to, what); + queue_insert_tail(to, what); } static queue *pop(queue *from) { - queue *q = QUEUE__HEAD(from); + queue *q = queue_head(from); PRE(q != NULL); - QUEUE__REMOVE(q); - QUEUE__INIT(q); + queue_remove(q); + queue_init(q); return q; } @@ -180,7 +180,7 @@ static queue *qos_pop(pool_impl_t *pi, queue *first, queue *second) static pool_work_t *q_to_w(const queue *q) { - return QUEUE__DATA(q, pool_work_t, link); + return QUEUE_DATA(q, pool_work_t, link); } static enum pool_work_type q_type(const queue *q) @@ -413,7 +413,7 @@ static void pool_threads_init(pool_t *pool) .idx = i, }; - QUEUE__INIT(&ts[i].inq); + queue_init(&ts[i].inq); if (uv_cond_init(&ts[i].cond)) { abort(); } @@ -462,7 +462,7 @@ void work_done(uv_async_t *handle) pool_impl_t *pi = CONTAINER_OF(handle, pool_impl_t, outq_async); uv_mutex_lock(&pi->outq_mutex); - QUEUE__MOVE(&pi->outq, &q); + queue_move(&pi->outq, &q); uv_mutex_unlock(&pi->outq_mutex); while (!empty(&q)) { @@ -513,9 +513,9 @@ int pool_init(pool_t *pool, .threads_nr = threads_nr, .ord_in_flight = 0, }; - QUEUE__INIT(&pi->outq); - QUEUE__INIT(&pi->ordered); - QUEUE__INIT(&pi->unordered); + queue_init(&pi->outq); + queue_init(&pi->ordered); + queue_init(&pi->unordered); rc = uv_mutex_init(&pi->outq_mutex); if (rc != 0) { diff --git a/src/raft.h b/src/raft.h index 7f8496c58..c9c7dad9c 100644 --- a/src/raft.h +++ b/src/raft.h @@ -9,6 +9,8 @@ #include +#include "lib/queue.h" + #ifndef RAFT_API #define RAFT_API __attribute__((visibility("default"))) #endif @@ -802,7 +804,7 @@ struct raft raft_index round_index; /* Target of the current round. */ raft_time round_start; /* Start of current round. */ - void *requests[2]; /* Outstanding client requests. */ + queue requests; /* Outstanding client requests. */ uint32_t voter_contacts; /* Current number of voting nodes we are in contact with */ @@ -1029,7 +1031,7 @@ RAFT_API int raft_voter_contacts(struct raft *r); void *data; \ int type; \ raft_index index; \ - void *queue[2]; \ + queue queue; \ uint8_t req_id[16]; \ uint8_t client_id[16]; \ uint8_t unique_id[16]; \ diff --git a/src/raft/client.c b/src/raft/client.c index cd88f1d2b..3a7173049 100644 --- a/src/raft/client.c +++ b/src/raft/client.c @@ -7,7 +7,7 @@ #include "log.h" #include "membership.h" #include "progress.h" -#include "queue.h" +#include "../lib/queue.h" #include "replication.h" #include "request.h" @@ -57,7 +57,7 @@ int raft_apply(struct raft *r, err_after_log_append: logDiscard(r->log, index); - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); err: assert(rv != 0); return rv; @@ -106,7 +106,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb) err_after_log_append: logDiscard(r->log, index); - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); err_after_buf_alloc: raft_free(buf.base); err: diff --git a/src/raft/convert.c b/src/raft/convert.c index 1c4d52d25..e29adc5ee 100644 --- a/src/raft/convert.c +++ b/src/raft/convert.c @@ -9,7 +9,7 @@ #include "log.h" #include "membership.h" #include "progress.h" -#include "queue.h" +#include "../lib/queue.h" #include "replication.h" #include "request.h" @@ -93,11 +93,11 @@ static void convertClearLeader(struct raft *r) } /* Fail all outstanding requests */ - while (!QUEUE_IS_EMPTY(&r->leader_state.requests)) { + while (!queue_empty(&r->leader_state.requests)) { struct request *req; queue *head; - head = QUEUE_HEAD(&r->leader_state.requests); - QUEUE_REMOVE(head); + head = queue_head(&r->leader_state.requests); + queue_remove(head); req = QUEUE_DATA(head, struct request, queue); assert(req->type == RAFT_COMMAND || req->type == RAFT_BARRIER); switch (req->type) { @@ -209,7 +209,7 @@ int convertToLeader(struct raft *r) r->election_timer_start = r->io->time(r->io); /* Reset apply requests queue */ - QUEUE_INIT(&r->leader_state.requests); + queue_init(&r->leader_state.requests); /* Allocate and initialize the progress array. */ rv = progressBuildArray(r); diff --git a/src/raft/fixture.c b/src/raft/fixture.c index cef8cfc41..9678196e0 100644 --- a/src/raft/fixture.c +++ b/src/raft/fixture.c @@ -10,7 +10,7 @@ #include "convert.h" #include "entry.h" #include "log.h" -#include "queue.h" +#include "../lib/queue.h" #include "snapshot.h" /* Defaults */ @@ -365,7 +365,7 @@ static void ioFlushSend(struct io *io, struct send *send) src = &send->message; dst = &transmit->message; - QUEUE_PUSH(&io->requests, &transmit->queue); + queue_insert_tail(&io->requests, &transmit->queue); *dst = *src; switch (dst->type) { @@ -416,12 +416,12 @@ static void ioDestroyTransmit(struct transmit *transmit) /* Flush all requests in the queue. */ static void ioFlushAll(struct io *io) { - while (!QUEUE_IS_EMPTY(&io->requests)) { + while (!queue_empty(&io->requests)) { queue *head; struct ioRequest *r; - head = QUEUE_HEAD(&io->requests); - QUEUE_REMOVE(head); + head = queue_head(&io->requests); + queue_remove(head); r = QUEUE_DATA(head, struct ioRequest, queue); switch (r->type) { @@ -592,7 +592,7 @@ static int ioMethodAppend(struct raft_io *raft_io, req->cb = cb; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -660,7 +660,7 @@ static int ioMethodSnapshotPut(struct raft_io *raft_io, r->completion_time = *io->time + io->disk_latency; r->trailing = trailing; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -680,7 +680,7 @@ static int ioMethodAsyncWork(struct raft_io *raft_io, r->req->cb = cb; r->completion_time = *io->time + io->work_duration; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -699,7 +699,7 @@ static int ioMethodSnapshotGet(struct raft_io *raft_io, r->req->cb = cb; r->completion_time = *io->time + io->disk_latency; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -749,7 +749,7 @@ static int ioMethodSend(struct raft_io *raft_io, peer = ioGetPeer(io, message->server_id); r->completion_time = *io->time + peer->send_latency; - QUEUE_PUSH(&io->requests, &r->queue); + queue_insert_tail(&io->requests, &r->queue); return 0; } @@ -881,7 +881,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) io->snapshot = NULL; io->entries = NULL; io->n = 0; - QUEUE_INIT(&io->requests); + queue_init(&io->requests); io->n_peers = 0; io->randomized_election_timeout = ELECTION_TIMEOUT + index * 100; io->network_latency = NETWORK_LATENCY; @@ -1400,7 +1400,7 @@ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t) } } assert(found); - QUEUE_REMOVE(head); + queue_remove(head); switch (r->type) { case APPEND: ioFlushAppend(io, (struct append *)r); diff --git a/src/raft/lifecycle.c b/src/raft/lifecycle.c index bd6d618c7..49d214ef7 100644 --- a/src/raft/lifecycle.c +++ b/src/raft/lifecycle.c @@ -1,6 +1,6 @@ #include "lifecycle.h" #include "../tracing.h" -#include "queue.h" +#include "../lib/queue.h" #include #include @@ -23,7 +23,7 @@ void lifecycleRequestStart(struct raft *r, struct request *req) if (reqIdIsSet(req)) { tracef("request start id:%" PRIu64, extractReqId(req)); } - QUEUE_PUSH(&r->leader_state.requests, &req->queue); + queue_insert_tail(&r->leader_state.requests, &req->queue); } void lifecycleRequestEnd(struct raft *r, struct request *req) @@ -32,5 +32,5 @@ void lifecycleRequestEnd(struct raft *r, struct request *req) if (reqIdIsSet(req)) { tracef("request end id:%" PRIu64, extractReqId(req)); } - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); } diff --git a/src/raft/queue.h b/src/raft/queue.h deleted file mode 100644 index 1262cf554..000000000 --- a/src/raft/queue.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef QUEUE_H_ -#define QUEUE_H_ - -#include - -typedef void *queue[2]; - -/* Private macros. */ -#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0])) -#define QUEUE_PREV(q) (*(queue **)&((*(q))[1])) - -#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) -#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) - -/* Initialize an empty queue. */ -#define QUEUE_INIT(q) \ - { \ - QUEUE_NEXT(q) = (q); \ - QUEUE_PREV(q) = (q); \ - } - -/* Return true if the queue has no element. */ -#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q)) - -/* Insert an element at the back of a queue. */ -#define QUEUE_PUSH(q, e) \ - { \ - QUEUE_NEXT(e) = (q); \ - QUEUE_PREV(e) = QUEUE_PREV(q); \ - QUEUE_PREV_NEXT(e) = (e); \ - QUEUE_PREV(q) = (e); \ - } - -/* Remove the given element from the queue. Any element can be removed at any * - * time. */ -#define QUEUE_REMOVE(e) \ - { \ - QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ - QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ - } - -/* Return the element at the front of the queue. */ -#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) - -/* Return the element at the back of the queue. */ -#define QUEUE_TAIL(q) (QUEUE_PREV(q)) - -/* Iterate over the element of a queue. * Mutating the queue while iterating - * results in undefined behavior. */ -#define QUEUE_FOREACH(q, e) \ - for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) - -/* Return the structure holding the given element. */ -#define QUEUE_DATA(e, type, field) \ - ((type *)((void *)((char *)(e)-offsetof(type, field)))) - -#endif /* QUEUE_H_*/ diff --git a/src/raft/replication.c b/src/raft/replication.c index 8310feb8b..feecfd8f7 100644 --- a/src/raft/replication.c +++ b/src/raft/replication.c @@ -15,7 +15,7 @@ #include "log.h" #include "membership.h" #include "progress.h" -#include "queue.h" +#include "../lib/queue.h" #include "replication.h" #include "request.h" #include "snapshot.h" diff --git a/src/raft/request.h b/src/raft/request.h index 08ad4a36b..86dc21e3d 100644 --- a/src/raft/request.h +++ b/src/raft/request.h @@ -10,7 +10,7 @@ struct request void *data; int type; raft_index index; - void *queue[2]; + queue queue; uint8_t req_id[16]; uint8_t client_id[16]; uint8_t unique_id[16]; diff --git a/src/raft/state.c b/src/raft/state.c index af46d76d9..ace3955c2 100644 --- a/src/raft/state.c +++ b/src/raft/state.c @@ -2,7 +2,7 @@ #include "configuration.h" #include "election.h" #include "log.h" -#include "queue.h" +#include "../lib/queue.h" int raft_state(struct raft *r) { diff --git a/src/raft/uv.c b/src/raft/uv.c index c0602c5d3..f1450e742 100644 --- a/src/raft/uv.c +++ b/src/raft/uv.c @@ -172,10 +172,10 @@ void uvMaybeFireCloseCb(struct uv *uv) if (uv->timer.data != NULL) { return; } - if (!QUEUE_IS_EMPTY(&uv->append_segments)) { + if (!queue_empty(&uv->append_segments)) { return; } - if (!QUEUE_IS_EMPTY(&uv->finalize_reqs)) { + if (!queue_empty(&uv->finalize_reqs)) { return; } if (uv->finalize_work.data != NULL) { @@ -190,13 +190,13 @@ void uvMaybeFireCloseCb(struct uv *uv) if (uv->snapshot_put_work.data != NULL) { return; } - if (!QUEUE_IS_EMPTY(&uv->snapshot_get_reqs)) { + if (!queue_empty(&uv->snapshot_get_reqs)) { return; } - if (!QUEUE_IS_EMPTY(&uv->async_work_reqs)) { + if (!queue_empty(&uv->async_work_reqs)) { return; } - if (!QUEUE_IS_EMPTY(&uv->aborting)) { + if (!queue_empty(&uv->aborting)) { return; } @@ -698,28 +698,28 @@ int raft_uv_init(struct raft_io *io, #endif uv->segment_size = UV__MAX_SEGMENT_SIZE; uv->block_size = 0; - QUEUE_INIT(&uv->clients); - QUEUE_INIT(&uv->servers); + queue_init(&uv->clients); + queue_init(&uv->servers); uv->connect_retry_delay = CONNECT_RETRY_DELAY; uv->prepare_inflight = NULL; - QUEUE_INIT(&uv->prepare_reqs); - QUEUE_INIT(&uv->prepare_pool); + queue_init(&uv->prepare_reqs); + queue_init(&uv->prepare_pool); uv->prepare_next_counter = 1; uv->append_next_index = 1; - QUEUE_INIT(&uv->append_segments); - QUEUE_INIT(&uv->append_pending_reqs); - QUEUE_INIT(&uv->append_writing_reqs); + queue_init(&uv->append_segments); + queue_init(&uv->append_pending_reqs); + queue_init(&uv->append_writing_reqs); uv->barrier = NULL; - QUEUE_INIT(&uv->finalize_reqs); + queue_init(&uv->finalize_reqs); uv->finalize_work.data = NULL; uv->truncate_work.data = NULL; - QUEUE_INIT(&uv->snapshot_get_reqs); - QUEUE_INIT(&uv->async_work_reqs); + queue_init(&uv->snapshot_get_reqs); + queue_init(&uv->async_work_reqs); uv->snapshot_put_work.data = NULL; uv->timer.data = NULL; uv->tick_cb = NULL; /* Set by raft_io->start() */ uv->recv_cb = NULL; /* Set by raft_io->start() */ - QUEUE_INIT(&uv->aborting); + queue_init(&uv->aborting); uv->closing = false; uv->close_cb = NULL; uv->auto_recovery = true; diff --git a/src/raft/uv.h b/src/raft/uv.h index db4009c64..3d0fd342b 100644 --- a/src/raft/uv.h +++ b/src/raft/uv.h @@ -6,7 +6,7 @@ #include "../raft.h" #include "../tracing.h" #include "err.h" -#include "queue.h" +#include "../lib/queue.h" #include "uv_fs.h" #include "uv_os.h" diff --git a/src/raft/uv_append.c b/src/raft/uv_append.c index 9feb66ca5..42756c451 100644 --- a/src/raft/uv_append.c +++ b/src/raft/uv_append.c @@ -1,7 +1,7 @@ #include "assert.h" #include "byte.h" #include "heap.h" -#include "queue.h" +#include "../lib/queue.h" #include "uv.h" #include "uv_encoding.h" #include "uv_writer.h" @@ -92,7 +92,7 @@ static void uvAliveSegmentFinalize(struct uvAliveSegment *s) * close the file handle and release the segment memory. */ } - QUEUE_REMOVE(&s->queue); + queue_remove(&s->queue); UvWriterClose(&s->writer, uvAliveSegmentWriterCloseCb); } @@ -102,10 +102,10 @@ static void uvAppendFinishRequestsInQueue(struct uv *uv, queue *q, int status) { queue queue_copy; struct uvAppend *append; - QUEUE_INIT(&queue_copy); - while (!QUEUE_IS_EMPTY(q)) { + queue_init(&queue_copy); + while (!queue_empty(q)) { queue *head; - head = QUEUE_HEAD(q); + head = queue_head(q); append = QUEUE_DATA(head, struct uvAppend, queue); /* Rollback the append next index if the result was * unsuccessful. */ @@ -116,15 +116,15 @@ static void uvAppendFinishRequestsInQueue(struct uv *uv, queue *q, int status) tracef("rollback uv->append_next_index now:%llu", uv->append_next_index); } - QUEUE_REMOVE(head); - QUEUE_PUSH(&queue_copy, head); + queue_remove(head); + queue_insert_tail(&queue_copy, head); } - while (!QUEUE_IS_EMPTY(&queue_copy)) { + while (!queue_empty(&queue_copy)) { queue *head; struct raft_io_append *req; - head = QUEUE_HEAD(&queue_copy); + head = queue_head(&queue_copy); append = QUEUE_DATA(head, struct uvAppend, queue); - QUEUE_REMOVE(head); + queue_remove(head); req = append->req; RaftHeapFree(append); req->cb(req, status); @@ -150,10 +150,10 @@ static void uvAppendFinishPendingRequests(struct uv *uv, int status) static struct uvAliveSegment *uvGetCurrentAliveSegment(struct uv *uv) { queue *head; - if (QUEUE_IS_EMPTY(&uv->append_segments)) { + if (queue_empty(&uv->append_segments)) { return NULL; } - head = QUEUE_HEAD(&uv->append_segments); + head = queue_head(&uv->append_segments); return QUEUE_DATA(head, struct uvAliveSegment, queue); } @@ -282,14 +282,14 @@ static void uvAliveSegmentWriteCb(struct UvWriterReq *write, const int status) /* During the closing sequence we should have already canceled all * pending request. */ if (uv->closing) { - assert(QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(queue_empty(&uv->append_pending_reqs)); assert(s->finalize); uvAliveSegmentFinalize(s); return; } /* Possibly process waiting requests. */ - if (!QUEUE_IS_EMPTY(&uv->append_pending_reqs)) { + if (!queue_empty(&uv->append_pending_reqs)) { rv = uvAppendMaybeStart(uv); if (rv != 0) { uv->errored = true; @@ -338,10 +338,10 @@ static int uvAppendMaybeStart(struct uv *uv) int rv; assert(!uv->closing); - assert(!QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(!queue_empty(&uv->append_pending_reqs)); /* If we are already writing, let's wait. */ - if (!QUEUE_IS_EMPTY(&uv->append_writing_reqs)) { + if (!queue_empty(&uv->append_writing_reqs)) { return 0; } @@ -372,18 +372,18 @@ static int uvAppendMaybeStart(struct uv *uv) /* Let's add to the segment's write buffer all pending requests targeted * to this segment. */ - QUEUE_INIT(&q); + queue_init(&q); n_reqs = 0; - while (!QUEUE_IS_EMPTY(&uv->append_pending_reqs)) { - head = QUEUE_HEAD(&uv->append_pending_reqs); + while (!queue_empty(&uv->append_pending_reqs)) { + head = queue_head(&uv->append_pending_reqs); append = QUEUE_DATA(head, struct uvAppend, queue); assert(append->segment != NULL); if (append->segment != segment) { break; /* Not targeted to this segment */ } - QUEUE_REMOVE(head); - QUEUE_PUSH(&q, head); + queue_remove(head); + queue_insert_tail(&q, head); n_reqs++; rv = uvAliveSegmentEncodeEntriesToWriteBuf(segment, append); if (rv != 0) { @@ -397,21 +397,21 @@ static int uvAppendMaybeStart(struct uv *uv) * request, in that case we need to wait for it). Otherwise it must mean * we have exhausted the queue of pending append requests. */ if (n_reqs == 0) { - assert(QUEUE_IS_EMPTY(&uv->append_writing_reqs)); + assert(queue_empty(&uv->append_writing_reqs)); if (segment->finalize) { uvAliveSegmentFinalize(segment); - if (!QUEUE_IS_EMPTY(&uv->append_pending_reqs)) { + if (!queue_empty(&uv->append_pending_reqs)) { goto start; } } - assert(QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(queue_empty(&uv->append_pending_reqs)); return 0; } - while (!QUEUE_IS_EMPTY(&q)) { - head = QUEUE_HEAD(&q); - QUEUE_REMOVE(head); - QUEUE_PUSH(&uv->append_writing_reqs, head); + while (!queue_empty(&q)) { + head = queue_head(&q); + queue_remove(head); + queue_insert_tail(&uv->append_writing_reqs, head); } rv = uvAliveSegmentWrite(segment); @@ -457,7 +457,7 @@ static void uvAliveSegmentPrepareCb(struct uvPrepare *req, int status) /* If we have been closed, let's discard the segment. */ if (uv->closing) { - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); assert(status == RAFT_CANCELED); /* UvPrepare cancels pending reqs */ uvSegmentBufferClose(&segment->pending); @@ -476,7 +476,7 @@ static void uvAliveSegmentPrepareCb(struct uvPrepare *req, int status) /* There must be pending appends that were waiting for this prepare * requests. */ - assert(!QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(!queue_empty(&uv->append_pending_reqs)); rv = uvAliveSegmentReady(uv, req->fd, req->counter, segment); if (rv != 0) { @@ -493,7 +493,7 @@ static void uvAliveSegmentPrepareCb(struct uvPrepare *req, int status) return; err: - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); RaftHeapFree(segment); uv->errored = true; uvAppendFinishPendingRequests(uv, rv); @@ -535,7 +535,7 @@ static int uvAppendPushAliveSegment(struct uv *uv) } uvAliveSegmentInit(segment, uv); - QUEUE_PUSH(&uv->append_segments, &segment->queue); + queue_insert_tail(&uv->append_segments, &segment->queue); rv = UvPrepare(uv, &fd, &counter, &segment->prepare, uvAliveSegmentPrepareCb); @@ -557,7 +557,7 @@ static int uvAppendPushAliveSegment(struct uv *uv) UvOsClose(fd); UvFinalize(uv, counter, 0, 0, 0); err_after_alloc: - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); RaftHeapFree(segment); err: assert(rv != 0); @@ -568,10 +568,10 @@ static int uvAppendPushAliveSegment(struct uv *uv) static struct uvAliveSegment *uvGetLastAliveSegment(struct uv *uv) { queue *tail; - if (QUEUE_IS_EMPTY(&uv->append_segments)) { + if (queue_empty(&uv->append_segments)) { return NULL; } - tail = QUEUE_TAIL(&uv->append_segments); + tail = queue_tail(&uv->append_segments); return QUEUE_DATA(tail, struct uvAliveSegment, queue); } @@ -647,7 +647,7 @@ static int uvAppendEnqueueRequest(struct uv *uv, struct uvAppend *append) uvAliveSegmentReserveSegmentCapacity(segment, size); append->segment = segment; - QUEUE_PUSH(&uv->append_pending_reqs, &append->queue); + queue_insert_tail(&uv->append_pending_reqs, &append->queue); uv->append_next_index += append->n; tracef("set uv->append_next_index %llu", uv->append_next_index); @@ -711,7 +711,7 @@ int UvAppend(struct raft_io *io, } assert(append->segment != NULL); - assert(!QUEUE_IS_EMPTY(&uv->append_pending_reqs)); + assert(!queue_empty(&uv->append_pending_reqs)); /* Try to write immediately. */ rv = uvAppendMaybeStart(uv); @@ -753,7 +753,7 @@ static void uvFinalizeCurrentAliveSegmentOnceIdle(struct uv *uv) break; } } - has_writing_reqs = !QUEUE_IS_EMPTY(&uv->append_writing_reqs); + has_writing_reqs = !queue_empty(&uv->append_writing_reqs); /* If there is no pending append request or inflight write against the * current segment, we can submit a request for it to be closed @@ -792,11 +792,11 @@ bool UvBarrierMaybeTrigger(struct UvBarrier *barrier) return false; } - if (!QUEUE_IS_EMPTY(&barrier->reqs)) { + if (!queue_empty(&barrier->reqs)) { queue *head; struct UvBarrierReq *r; - head = QUEUE_HEAD(&barrier->reqs); - QUEUE_REMOVE(head); + head = queue_head(&barrier->reqs); + queue_remove(head); r = QUEUE_DATA(head, struct UvBarrierReq, queue); r->cb(r); return true; @@ -821,7 +821,7 @@ static struct UvBarrier *uvBarrierCreate(void) return NULL; } barrier->blocking = false; - QUEUE_INIT(&barrier->reqs); + queue_init(&barrier->reqs); return barrier; } @@ -902,8 +902,8 @@ int UvBarrier(struct uv *uv, raft_index next_index, struct UvBarrierReq *req) * the callback immediately. * * TODO: find a way to avoid invoking this synchronously. */ - if (QUEUE_IS_EMPTY(&uv->append_segments) && - QUEUE_IS_EMPTY(&uv->finalize_reqs) && + if (queue_empty(&uv->append_segments) && + queue_empty(&uv->finalize_reqs) && uv->finalize_work.data == NULL) { /* Not interested in return value. */ UvBarrierMaybeTrigger(barrier); @@ -930,7 +930,7 @@ void UvUnblock(struct uv *uv) uvMaybeFireCloseCb(uv); return; } - if (!QUEUE_IS_EMPTY(&uv->append_pending_reqs)) { + if (!queue_empty(&uv->append_pending_reqs)) { int rv; rv = uvAppendMaybeStart(uv); if (rv != 0) { @@ -945,7 +945,7 @@ void UvBarrierAddReq(struct UvBarrier *barrier, struct UvBarrierReq *req) assert(req != NULL); /* Once there's a blocking req, this barrier becomes blocking. */ barrier->blocking |= req->blocking; - QUEUE_PUSH(&barrier->reqs, &req->queue); + queue_insert_tail(&barrier->reqs, &req->queue); } /* Fire all pending barrier requests, the barrier callback will notice that @@ -1022,7 +1022,7 @@ void uvAppendClose(struct uv *uv) /* Also finalize the segments that we didn't write at all and are just * sitting in the append_segments queue waiting for writes against the * current segment to complete. */ - while (!QUEUE_IS_EMPTY(&uv->append_segments)) { + while (!queue_empty(&uv->append_segments)) { segment = uvGetLastAliveSegment(uv); assert(segment != NULL); if (segment == uvGetCurrentAliveSegment(uv)) { diff --git a/src/raft/uv_finalize.c b/src/raft/uv_finalize.c index 638b551ad..beb9d4c00 100644 --- a/src/raft/uv_finalize.c +++ b/src/raft/uv_finalize.c @@ -1,6 +1,6 @@ #include "assert.h" #include "heap.h" -#include "queue.h" +#include "../lib/queue.h" #include "uv.h" #include "uv_os.h" @@ -88,7 +88,7 @@ static void uvFinalizeAfterWorkCb(uv_work_t *work, int status) /* If we have no more dismissed segments to close, check if there's a * barrier to unblock or if we are done closing. */ - if (QUEUE_IS_EMPTY(&uv->finalize_reqs)) { + if (queue_empty(&uv->finalize_reqs)) { tracef("unblock barrier or close"); if (uv->barrier != NULL && UvBarrierReady(uv)) { UvBarrierMaybeTrigger(uv->barrier); @@ -98,9 +98,9 @@ static void uvFinalizeAfterWorkCb(uv_work_t *work, int status) } /* Grab a new dismissed segment to close. */ - head = QUEUE_HEAD(&uv->finalize_reqs); + head = queue_head(&uv->finalize_reqs); segment = QUEUE_DATA(head, struct uvDyingSegment, queue); - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); rv = uvFinalizeStart(segment); if (rv != 0) { @@ -160,7 +160,7 @@ int UvFinalize(struct uv *uv, /* If we're already processing a segment, let's put the request in the * queue and wait. */ if (uv->finalize_work.data != NULL) { - QUEUE_PUSH(&uv->finalize_reqs, &segment->queue); + queue_insert_tail(&uv->finalize_reqs, &segment->queue); return 0; } diff --git a/src/raft/uv_prepare.c b/src/raft/uv_prepare.c index 00355480f..199da6f29 100644 --- a/src/raft/uv_prepare.c +++ b/src/raft/uv_prepare.c @@ -77,12 +77,12 @@ static void uvPrepareWorkCb(uv_work_t *work) * status. */ static void uvPrepareFinishAllRequests(struct uv *uv, int status) { - while (!QUEUE_IS_EMPTY(&uv->prepare_reqs)) { + while (!queue_empty(&uv->prepare_reqs)) { queue *head; struct uvPrepare *req; - head = QUEUE_HEAD(&uv->prepare_reqs); + head = queue_head(&uv->prepare_reqs); req = QUEUE_DATA(head, struct uvPrepare, queue); - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); req->cb(req, status); } } @@ -94,10 +94,10 @@ static void uvPrepareConsume(struct uv *uv, uv_file *fd, uvCounter *counter) queue *head; struct uvIdleSegment *segment; /* Pop a segment from the pool. */ - head = QUEUE_HEAD(&uv->prepare_pool); + head = queue_head(&uv->prepare_pool); segment = QUEUE_DATA(head, struct uvIdleSegment, queue); assert(segment->fd >= 0); - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); *fd = segment->fd; *counter = segment->counter; RaftHeapFree(segment); @@ -111,13 +111,13 @@ static void uvPrepareFinishOldestRequest(struct uv *uv) struct uvPrepare *req; assert(!uv->closing); - assert(!QUEUE_IS_EMPTY(&uv->prepare_reqs)); - assert(!QUEUE_IS_EMPTY(&uv->prepare_pool)); + assert(!queue_empty(&uv->prepare_reqs)); + assert(!queue_empty(&uv->prepare_pool)); /* Pop the head of the prepare requests queue. */ - head = QUEUE_HEAD(&uv->prepare_reqs); + head = queue_head(&uv->prepare_reqs); req = QUEUE_DATA(head, struct uvPrepare, queue); - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); /* Finish the request */ uvPrepareConsume(uv, &req->fd, &req->counter); @@ -199,8 +199,8 @@ static void uvPrepareAfterWorkCb(uv_work_t *work, int status) /* If we are closing, let's discard the segment. All pending requests * have already being fired with RAFT_CANCELED. */ if (uv->closing) { - assert(QUEUE_IS_EMPTY(&uv->prepare_pool)); - assert(QUEUE_IS_EMPTY(&uv->prepare_reqs)); + assert(queue_empty(&uv->prepare_pool)); + assert(queue_empty(&uv->prepare_reqs)); if (segment->status == 0) { char errmsg[RAFT_ERRMSG_BUF_SIZE]; UvOsClose(segment->fd); @@ -218,7 +218,7 @@ static void uvPrepareAfterWorkCb(uv_work_t *work, int status) * Note that if there's no pending request, we don't set the error * message, to avoid overwriting previous errors. */ if (segment->status != 0) { - if (!QUEUE_IS_EMPTY(&uv->prepare_reqs)) { + if (!queue_empty(&uv->prepare_reqs)) { ErrMsgTransferf(segment->errmsg, uv->io->errmsg, "create segment %s", segment->filename); uvPrepareFinishAllRequests(uv, segment->status); @@ -231,10 +231,10 @@ static void uvPrepareAfterWorkCb(uv_work_t *work, int status) assert(segment->fd >= 0); tracef("completed creation of %s", segment->filename); - QUEUE_PUSH(&uv->prepare_pool, &segment->queue); + queue_insert_tail(&uv->prepare_pool, &segment->queue); /* Let's process any pending request. */ - if (!QUEUE_IS_EMPTY(&uv->prepare_reqs)) { + if (!queue_empty(&uv->prepare_reqs)) { uvPrepareFinishOldestRequest(uv); } @@ -249,7 +249,7 @@ static void uvPrepareAfterWorkCb(uv_work_t *work, int status) * above, thus reducing the pool size and making it smaller than the * target size. */ if (uvPrepareCount(uv) >= UV__TARGET_POOL_SIZE) { - assert(QUEUE_IS_EMPTY(&uv->prepare_reqs)); + assert(queue_empty(&uv->prepare_reqs)); return; } @@ -284,7 +284,7 @@ int UvPrepare(struct uv *uv, assert(!uv->closing); - if (!QUEUE_IS_EMPTY(&uv->prepare_pool)) { + if (!queue_empty(&uv->prepare_pool)) { uvPrepareConsume(uv, fd, counter); goto maybe_start; } @@ -292,7 +292,7 @@ int UvPrepare(struct uv *uv, *fd = -1; *counter = 0; req->cb = cb; - QUEUE_PUSH(&uv->prepare_reqs, &req->queue); + queue_insert_tail(&uv->prepare_reqs, &req->queue); maybe_start: /* If we are already creating a segment, let's just wait. */ @@ -311,7 +311,7 @@ int UvPrepare(struct uv *uv, if (*fd != -1) { uvPrepareDiscard(uv, *fd, *counter); } else { - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); } assert(rv != 0); return rv; @@ -325,12 +325,12 @@ void UvPrepareClose(struct uv *uv) uvPrepareFinishAllRequests(uv, RAFT_CANCELED); /* Remove any unused prepared segment. */ - while (!QUEUE_IS_EMPTY(&uv->prepare_pool)) { + while (!queue_empty(&uv->prepare_pool)) { queue *head; struct uvIdleSegment *segment; - head = QUEUE_HEAD(&uv->prepare_pool); + head = queue_head(&uv->prepare_pool); segment = QUEUE_DATA(head, struct uvIdleSegment, queue); - QUEUE_REMOVE(&segment->queue); + queue_remove(&segment->queue); uvPrepareDiscard(uv, segment->fd, segment->counter); RaftHeapFree(segment); } diff --git a/src/raft/uv_recv.c b/src/raft/uv_recv.c index 72d6f5ec6..fe93a21ef 100644 --- a/src/raft/uv_recv.c +++ b/src/raft/uv_recv.c @@ -78,13 +78,13 @@ static int uvServerInit(struct uvServer *s, s->message.type = 0; s->payload.base = NULL; s->payload.len = 0; - QUEUE_PUSH(&uv->servers, &s->queue); + queue_insert_tail(&uv->servers, &s->queue); return 0; } static void uvServerDestroy(struct uvServer *s) { - QUEUE_REMOVE(&s->queue); + queue_remove(&s->queue); if (s->header.base != NULL) { /* This means we were interrupted while reading the header. */ @@ -179,8 +179,8 @@ static void uvServerStreamCloseCb(uv_handle_t *handle) static void uvServerAbort(struct uvServer *s) { struct uv *uv = s->uv; - QUEUE_REMOVE(&s->queue); - QUEUE_PUSH(&uv->aborting, &s->queue); + queue_remove(&s->queue); + queue_insert_tail(&uv->aborting, &s->queue); uv_close((struct uv_handle_s *)s->stream, uvServerStreamCloseCb); } @@ -411,10 +411,10 @@ int UvRecvStart(struct uv *uv) void UvRecvClose(struct uv *uv) { - while (!QUEUE_IS_EMPTY(&uv->servers)) { + while (!queue_empty(&uv->servers)) { queue *head; struct uvServer *server; - head = QUEUE_HEAD(&uv->servers); + head = queue_head(&uv->servers); server = QUEUE_DATA(head, struct uvServer, queue); uvServerAbort(server); } diff --git a/src/raft/uv_send.c b/src/raft/uv_send.c index 86133542c..b4a4aad78 100644 --- a/src/raft/uv_send.c +++ b/src/raft/uv_send.c @@ -95,9 +95,9 @@ static int uvClientInit(struct uvClient *c, rv = uv_timer_init(c->uv->loop, &c->timer); assert(rv == 0); strcpy(c->address, address); - QUEUE_INIT(&c->pending); + queue_init(&c->pending); c->closing = false; - QUEUE_PUSH(&uv->clients, &c->queue); + queue_insert_tail(&uv->clients, &c->queue); return 0; } @@ -119,13 +119,13 @@ static void uvClientMaybeDestroy(struct uvClient *c) return; } - while (!QUEUE_IS_EMPTY(&c->pending)) { + while (!queue_empty(&c->pending)) { queue *head; struct uvSend *send; struct raft_io_send *req; - head = QUEUE_HEAD(&c->pending); + head = queue_head(&c->pending); send = QUEUE_DATA(head, struct uvSend, queue); - QUEUE_REMOVE(head); + queue_remove(head); req = send->req; uvSendDestroy(send); if (req->cb != NULL) { @@ -133,7 +133,7 @@ static void uvClientMaybeDestroy(struct uvClient *c) } } - QUEUE_REMOVE(&c->queue); + queue_remove(&c->queue); assert(c->address != NULL); RaftHeapFree(c->address); @@ -210,7 +210,7 @@ static int uvClientSend(struct uvClient *c, struct uvSend *send) /* If there's no connection available, let's queue the request. */ if (c->stream == NULL) { tracef("no connection available -> enqueue message"); - QUEUE_PUSH(&c->pending, &send->queue); + queue_insert_tail(&c->pending, &send->queue); return 0; } @@ -234,12 +234,12 @@ static void uvClientSendPending(struct uvClient *c) int rv; assert(c->stream != NULL); tracef("send pending messages"); - while (!QUEUE_IS_EMPTY(&c->pending)) { + while (!queue_empty(&c->pending)) { queue *head; struct uvSend *send; - head = QUEUE_HEAD(&c->pending); + head = queue_head(&c->pending); send = QUEUE_DATA(head, struct uvSend, queue); - QUEUE_REMOVE(head); + queue_remove(head); rv = uvClientSend(c, send); if (rv != 0) { if (send->req->cb != NULL) { @@ -322,9 +322,9 @@ static void uvClientConnectCb(struct raft_uv_connect *req, queue *head; struct uvSend *old_send; struct raft_io_send *old_req; - head = QUEUE_HEAD(&c->pending); + head = queue_head(&c->pending); old_send = QUEUE_DATA(head, struct uvSend, queue); - QUEUE_REMOVE(head); + queue_remove(head); old_req = old_send->req; uvSendDestroy(old_send); if (old_req->cb != NULL) { @@ -384,8 +384,8 @@ static void uvClientAbort(struct uvClient *c) uv_is_active((struct uv_handle_s *)&c->timer) || c->connect.data != NULL); - QUEUE_REMOVE(&c->queue); - QUEUE_PUSH(&uv->aborting, &c->queue); + queue_remove(&c->queue); + queue_insert_tail(&uv->aborting, &c->queue); rv = uv_timer_stop(&c->timer); assert(rv == 0); @@ -507,10 +507,10 @@ int UvSend(struct raft_io *io, void UvSendClose(struct uv *uv) { assert(uv->closing); - while (!QUEUE_IS_EMPTY(&uv->clients)) { + while (!queue_empty(&uv->clients)) { queue *head; struct uvClient *client; - head = QUEUE_HEAD(&uv->clients); + head = queue_head(&uv->clients); client = QUEUE_DATA(head, struct uvClient, queue); uvClientAbort(client); } diff --git a/src/raft/uv_snapshot.c b/src/raft/uv_snapshot.c index d4b8910d1..343d1ce07 100644 --- a/src/raft/uv_snapshot.c +++ b/src/raft/uv_snapshot.c @@ -751,7 +751,7 @@ static void uvSnapshotGetAfterWorkCb(uv_work_t *work, int status) int req_status = get->status; struct uv *uv = get->uv; assert(status == 0); - QUEUE_REMOVE(&get->queue); + queue_remove(&get->queue); RaftHeapFree(get); req->cb(req, snapshot, req_status); uvMaybeFireCloseCb(uv); @@ -784,11 +784,11 @@ int UvSnapshotGet(struct raft_io *io, } get->work.data = get; - QUEUE_PUSH(&uv->snapshot_get_reqs, &get->queue); + queue_insert_tail(&uv->snapshot_get_reqs, &get->queue); rv = uv_queue_work(uv->loop, &get->work, uvSnapshotGetWorkCb, uvSnapshotGetAfterWorkCb); if (rv != 0) { - QUEUE_REMOVE(&get->queue); + queue_remove(&get->queue); tracef("get last snapshot: %s", uv_strerror(rv)); rv = RAFT_IOERR; goto err_after_snapshot_alloc; diff --git a/src/raft/uv_tcp.c b/src/raft/uv_tcp.c index 4196b9f56..1455958e0 100644 --- a/src/raft/uv_tcp.c +++ b/src/raft/uv_tcp.c @@ -40,9 +40,9 @@ void UvTcpMaybeFireCloseCb(struct UvTcp *t) return; } - assert(QUEUE_IS_EMPTY(&t->accepting)); - assert(QUEUE_IS_EMPTY(&t->connecting)); - if (!QUEUE_IS_EMPTY(&t->aborting)) { + assert(queue_empty(&t->accepting)); + assert(queue_empty(&t->connecting)); + if (!queue_empty(&t->aborting)) { return; } @@ -82,9 +82,9 @@ int raft_uv_tcp_init(struct raft_uv_transport *transport, t->listeners = NULL; t->n_listeners = 0; t->accept_cb = NULL; - QUEUE_INIT(&t->accepting); - QUEUE_INIT(&t->connecting); - QUEUE_INIT(&t->aborting); + queue_init(&t->accepting); + queue_init(&t->connecting); + queue_init(&t->aborting); t->closing = false; t->close_cb = NULL; diff --git a/src/raft/uv_tcp.h b/src/raft/uv_tcp.h index 924f4f5b3..ed475f89a 100644 --- a/src/raft/uv_tcp.h +++ b/src/raft/uv_tcp.h @@ -2,7 +2,7 @@ #define UV_TCP_H_ #include "../raft.h" -#include "queue.h" +#include "../lib/queue.h" /* Protocol version. */ #define UV__TCP_HANDSHAKE_PROTOCOL 1 diff --git a/src/raft/uv_tcp_connect.c b/src/raft/uv_tcp_connect.c index e493d14a8..57ec3c0ef 100644 --- a/src/raft/uv_tcp_connect.c +++ b/src/raft/uv_tcp_connect.c @@ -77,7 +77,7 @@ static void uvTcpConnectFinish(struct uvTcpConnect *connect) struct uv_stream_s *stream = (struct uv_stream_s *)connect->tcp; struct raft_uv_connect *req = connect->req; int status = connect->status; - QUEUE_REMOVE(&connect->queue); + queue_remove(&connect->queue); RaftHeapFree(connect->handshake.base); uv_freeaddrinfo(connect->getaddrinfo.addrinfo); raft_free(connect); @@ -101,8 +101,8 @@ static void uvTcpConnectUvCloseCb(struct uv_handle_s *handle) /* Abort a connection request. */ static void uvTcpConnectAbort(struct uvTcpConnect *connect) { - QUEUE_REMOVE(&connect->queue); - QUEUE_PUSH(&connect->t->aborting, &connect->queue); + queue_remove(&connect->queue); + queue_insert_tail(&connect->t->aborting, &connect->queue); uv_cancel((struct uv_req_s *)&connect->getaddrinfo); /* Call uv_close on the tcp handle, if there is no getaddrinfo request * in flight and the handle is not currently closed due to next IP @@ -353,7 +353,7 @@ int UvTcpConnect(struct raft_uv_transport *transport, req->cb = cb; /* Keep track of the pending request */ - QUEUE_PUSH(&t->connecting, &r->queue); + queue_insert_tail(&t->connecting, &r->queue); /* Start connecting */ rv = uvTcpConnectStart(r, address); @@ -364,7 +364,7 @@ int UvTcpConnect(struct raft_uv_transport *transport, return 0; err_after_alloc: - QUEUE_REMOVE(&r->queue); + queue_remove(&r->queue); RaftHeapFree(r); err: return rv; @@ -372,10 +372,10 @@ int UvTcpConnect(struct raft_uv_transport *transport, void UvTcpConnectClose(struct UvTcp *t) { - while (!QUEUE_IS_EMPTY(&t->connecting)) { + while (!queue_empty(&t->connecting)) { struct uvTcpConnect *connect; queue *head; - head = QUEUE_HEAD(&t->connecting); + head = queue_head(&t->connecting); connect = QUEUE_DATA(head, struct uvTcpConnect, queue); uvTcpConnectAbort(connect); } diff --git a/src/raft/uv_tcp_listen.c b/src/raft/uv_tcp_listen.c index 41b6ca1ad..c7683d431 100644 --- a/src/raft/uv_tcp_listen.c +++ b/src/raft/uv_tcp_listen.c @@ -69,7 +69,7 @@ static void uvTcpIncomingCloseCb(struct uv_handle_s *handle) { struct uvTcpIncoming *incoming = handle->data; struct UvTcp *t = incoming->t; - QUEUE_REMOVE(&incoming->queue); + queue_remove(&incoming->queue); if (incoming->handshake.address.base != NULL) { RaftHeapFree(incoming->handshake.address.base); } @@ -84,8 +84,8 @@ static void uvTcpIncomingAbort(struct uvTcpIncoming *incoming) struct UvTcp *t = incoming->t; /* After uv_close() returns we are guaranteed that no more alloc_cb or * read_cb will be called. */ - QUEUE_REMOVE(&incoming->queue); - QUEUE_PUSH(&t->aborting, &incoming->queue); + queue_remove(&incoming->queue); + queue_insert_tail(&t->aborting, &incoming->queue); uv_close((struct uv_handle_s *)incoming->tcp, uvTcpIncomingCloseCb); } @@ -143,7 +143,7 @@ static void uvTcpIncomingReadCbAddress(uv_stream_t *stream, assert(rv == 0); id = byteFlip64(incoming->handshake.preamble[1]); address = incoming->handshake.address.base; - QUEUE_REMOVE(&incoming->queue); + queue_remove(&incoming->queue); incoming->t->accept_cb(incoming->t->transport, id, address, (struct uv_stream_s *)incoming->tcp); RaftHeapFree(incoming->handshake.address.base); @@ -274,7 +274,7 @@ static void uvTcpListenCb(struct uv_stream_s *stream, int status) incoming->listener = (struct uv_tcp_s *)stream; incoming->tcp = NULL; - QUEUE_PUSH(&t->accepting, &incoming->queue); + queue_insert_tail(&t->accepting, &incoming->queue); rv = uvTcpIncomingStart(incoming); if (rv != 0) { @@ -284,7 +284,7 @@ static void uvTcpListenCb(struct uv_stream_s *stream, int status) return; err_after_accept_alloc: - QUEUE_REMOVE(&incoming->queue); + queue_remove(&incoming->queue); RaftHeapFree(incoming); err: assert(rv != 0); @@ -411,9 +411,9 @@ void UvTcpListenClose(struct UvTcp *t) queue *head; assert(t->closing); - while (!QUEUE_IS_EMPTY(&t->accepting)) { + while (!queue_empty(&t->accepting)) { struct uvTcpIncoming *incoming; - head = QUEUE_HEAD(&t->accepting); + head = queue_head(&t->accepting); incoming = QUEUE_DATA(head, struct uvTcpIncoming, queue); uvTcpIncomingAbort(incoming); } diff --git a/src/raft/uv_truncate.c b/src/raft/uv_truncate.c index 51bd84fcb..4365a1b96 100644 --- a/src/raft/uv_truncate.c +++ b/src/raft/uv_truncate.c @@ -137,8 +137,8 @@ static void uvTruncateBarrierCb(struct UvBarrierReq *barrier) return; } - assert(QUEUE_IS_EMPTY(&uv->append_writing_reqs)); - assert(QUEUE_IS_EMPTY(&uv->finalize_reqs)); + assert(queue_empty(&uv->append_writing_reqs)); + assert(queue_empty(&uv->finalize_reqs)); assert(uv->finalize_work.data == NULL); assert(uv->truncate_work.data == NULL); diff --git a/src/raft/uv_work.c b/src/raft/uv_work.c index 5b6431b97..42777462f 100644 --- a/src/raft/uv_work.c +++ b/src/raft/uv_work.c @@ -28,7 +28,7 @@ static void uvAsyncAfterWorkCb(uv_work_t *work, int status) struct uv *uv = w->uv; assert(status == 0); - QUEUE_REMOVE(&w->queue); + queue_remove(&w->queue); RaftHeapFree(w); req->cb(req, req_status); uvMaybeFireCloseCb(uv); @@ -56,11 +56,11 @@ int UvAsyncWork(struct raft_io *io, async_work->work.data = async_work; req->cb = cb; - QUEUE_PUSH(&uv->async_work_reqs, &async_work->queue); + queue_insert_tail(&uv->async_work_reqs, &async_work->queue); rv = uv_queue_work(uv->loop, &async_work->work, uvAsyncWorkCb, uvAsyncAfterWorkCb); if (rv != 0) { - QUEUE_REMOVE(&async_work->queue); + queue_remove(&async_work->queue); tracef("async work: %s", uv_strerror(rv)); rv = RAFT_IOERR; goto err_after_req_alloc; diff --git a/src/raft/uv_writer.c b/src/raft/uv_writer.c index b489765b6..840d9a9e9 100644 --- a/src/raft/uv_writer.c +++ b/src/raft/uv_writer.c @@ -33,7 +33,7 @@ static void uvWriterReqSetStatus(struct UvWriterReq *req, int result) * callback if set. */ static void uvWriterReqFinish(struct UvWriterReq *req) { - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); if (req->status != 0) { uvWriterReqTransferErrMsg(req); } @@ -219,10 +219,10 @@ static void uvWriterPollCb(uv_poll_t *poller, int status, int events) return; fail_requests: - while (!QUEUE_IS_EMPTY(&w->poll_queue)) { + while (!queue_empty(&w->poll_queue)) { queue *head; struct UvWriterReq *req; - head = QUEUE_HEAD(&w->poll_queue); + head = queue_head(&w->poll_queue); req = QUEUE_DATA(head, struct UvWriterReq, queue); uvWriterReqSetStatus(req, status); uvWriterReqFinish(req); @@ -251,8 +251,8 @@ int UvWriterInit(struct UvWriter *w, w->event_poller.data = NULL; w->check.data = NULL; w->close_cb = NULL; - QUEUE_INIT(&w->poll_queue); - QUEUE_INIT(&w->work_queue); + queue_init(&w->poll_queue); + queue_init(&w->work_queue); w->closing = false; w->errmsg = errmsg; @@ -352,10 +352,10 @@ static void uvWriterPollerCloseCb(struct uv_handle_s *handle) w->event_poller.data = NULL; /* Cancel all pending requests. */ - while (!QUEUE_IS_EMPTY(&w->poll_queue)) { + while (!queue_empty(&w->poll_queue)) { queue *head; struct UvWriterReq *req; - head = QUEUE_HEAD(&w->poll_queue); + head = queue_head(&w->poll_queue); req = QUEUE_DATA(head, struct UvWriterReq, queue); assert(req->work.data == NULL); req->status = RAFT_CANCELED; @@ -382,7 +382,7 @@ static void uvWriterCheckCloseCb(struct uv_handle_s *handle) static void uvWriterCheckCb(struct uv_check_s *check) { struct UvWriter *w = check->data; - if (!QUEUE_IS_EMPTY(&w->work_queue)) { + if (!queue_empty(&w->work_queue)) { return; } uv_close((struct uv_handle_s *)&w->check, uvWriterCheckCloseCb); @@ -407,7 +407,7 @@ void UvWriterClose(struct UvWriter *w, UvWriterCloseCb cb) /* If we have requests executing in the threadpool, we need to wait for * them. That's done in the check callback. */ - if (!QUEUE_IS_EMPTY(&w->work_queue)) { + if (!queue_empty(&w->work_queue)) { uv_check_start(&w->check, uvWriterCheckCb); } else { uv_close((struct uv_handle_s *)&w->check, uvWriterCheckCloseCb); @@ -440,8 +440,8 @@ int UvWriterSubmit(struct UvWriter *w, * writes, so ensure that we're getting write requests * sequentially. */ if (w->n_events == 1) { - assert(QUEUE_IS_EMPTY(&w->poll_queue)); - assert(QUEUE_IS_EMPTY(&w->work_queue)); + assert(queue_empty(&w->poll_queue)); + assert(queue_empty(&w->work_queue)); } assert(w->fd >= 0); @@ -490,7 +490,7 @@ int UvWriterSubmit(struct UvWriter *w, /* Try to submit the write request asynchronously */ if (w->async) { - QUEUE_PUSH(&w->poll_queue, &req->queue); + queue_insert_tail(&w->poll_queue, &req->queue); rv = UvOsIoSubmit(w->ctx, 1, &iocbs); /* If no error occurred, we're done, the write request was @@ -499,7 +499,7 @@ int UvWriterSubmit(struct UvWriter *w, goto done; } - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); /* Check the reason of the error. */ switch (rv) { @@ -521,7 +521,7 @@ int UvWriterSubmit(struct UvWriter *w, /* If we got here it means we need to run io_submit in the threadpool. */ - QUEUE_PUSH(&w->work_queue, &req->queue); + queue_insert_tail(&w->work_queue, &req->queue); req->work.data = req; rv = uv_queue_work(w->loop, &req->work, uvWriterWorkCb, uvWriterAfterWorkCb); @@ -529,7 +529,7 @@ int UvWriterSubmit(struct UvWriter *w, /* UNTESTED: with the current libuv implementation this can't * fail. */ req->work.data = NULL; - QUEUE_REMOVE(&req->queue); + queue_remove(&req->queue); UvOsErrMsg(w->errmsg, "uv_queue_work", rv); rv = RAFT_IOERR; goto err; diff --git a/src/raft/uv_writer.h b/src/raft/uv_writer.h index db8f5c293..b4cb7fcde 100644 --- a/src/raft/uv_writer.h +++ b/src/raft/uv_writer.h @@ -6,7 +6,7 @@ #include #include "err.h" -#include "queue.h" +#include "../lib/queue.h" #include "uv_os.h" /* Perform asynchronous writes to a single file. */ diff --git a/test/raft/unit/test_queue.c b/test/raft/unit/test_queue.c index aee0f0a4d..7fc9a263b 100644 --- a/test/raft/unit/test_queue.c +++ b/test/raft/unit/test_queue.c @@ -1,4 +1,4 @@ -#include "../../../src/raft/queue.h" +#include "../../../src/lib/queue.h" #include "../lib/runner.h" /****************************************************************************** @@ -23,7 +23,7 @@ static void *setUp(MUNIT_UNUSED const MunitParameter params[], MUNIT_UNUSED void *user_data) { struct fixture *f = munit_malloc(sizeof *f); - QUEUE_INIT(&f->queue); + queue_init(&f->queue); return f; } @@ -47,12 +47,12 @@ static void tearDown(void *data) for (i_ = 0; i_ < N; i_++) { \ struct item *item_ = &f->items[i_]; \ item_->value = i_ + 1; \ - QUEUE_PUSH(&f->queue, &item_->queue); \ + queue_insert_tail(&f->queue, &item_->queue); \ } \ } /* Remove the i'th fixture item from the fixture queue. */ -#define REMOVE(I) QUEUE_REMOVE(&f->items[I].queue) +#define REMOVE(I) queue_remove(&f->items[I].queue) /****************************************************************************** * @@ -64,7 +64,7 @@ static void tearDown(void *data) * value. */ #define ASSERT_HEAD(VALUE) \ { \ - queue *head_ = QUEUE_HEAD(&f->queue); \ + queue *head_ = queue_head(&f->queue); \ struct item *item_; \ item_ = QUEUE_DATA(head_, struct item, queue); \ munit_assert_int(item_->value, ==, VALUE); \ @@ -73,34 +73,34 @@ static void tearDown(void *data) /* Assert that the item at the tail of the queue has the given value. */ #define ASSERT_TAIL(VALUE) \ { \ - queue *tail_ = QUEUE_TAIL(&f->queue); \ + queue *tail_ = queue_tail(&f->queue); \ struct item *item_; \ item_ = QUEUE_DATA(tail_, struct item, queue); \ munit_assert_int(item_->value, ==, VALUE); \ } /* Assert that the fixture's queue is empty. */ -#define ASSERT_EMPTY munit_assert_true(QUEUE_IS_EMPTY(&f->queue)) +#define ASSERT_EMPTY munit_assert_true(queue_empty(&f->queue)) /* Assert that the fixture's queue is not empty. */ -#define ASSERT_NOT_EMPTY munit_assert_false(QUEUE_IS_EMPTY(&f->queue)) +#define ASSERT_NOT_EMPTY munit_assert_false(queue_empty(&f->queue)) /****************************************************************************** * - * QUEUE_IS_EMPTY + * queue_empty * *****************************************************************************/ -SUITE(QUEUE_IS_EMPTY) +SUITE(queue_empty) -TEST(QUEUE_IS_EMPTY, yes, setUp, tearDown, 0, NULL) +TEST(queue_empty, yes, setUp, tearDown, 0, NULL) { struct fixture *f = data; ASSERT_EMPTY; return MUNIT_OK; } -TEST(QUEUE_IS_EMPTY, no, setUp, tearDown, 0, NULL) +TEST(queue_empty, no, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(1); @@ -110,13 +110,13 @@ TEST(QUEUE_IS_EMPTY, no, setUp, tearDown, 0, NULL) /****************************************************************************** * - * QUEUE_PUSH + * queue_insert_tail * *****************************************************************************/ -SUITE(QUEUE_PUSH) +SUITE(queue_insert_tail) -TEST(QUEUE_PUSH, one, setUp, tearDown, 0, NULL) +TEST(queue_insert_tail, one, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(1); @@ -124,7 +124,7 @@ TEST(QUEUE_PUSH, one, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_PUSH, two, setUp, tearDown, 0, NULL) +TEST(queue_insert_tail, two, setUp, tearDown, 0, NULL) { struct fixture *f = data; int i; @@ -139,13 +139,13 @@ TEST(QUEUE_PUSH, two, setUp, tearDown, 0, NULL) /****************************************************************************** * - * QUEUE_REMOVE + * queue_remove * *****************************************************************************/ -SUITE(QUEUE_REMOVE) +SUITE(queue_remove) -TEST(QUEUE_REMOVE, first, setUp, tearDown, 0, NULL) +TEST(queue_remove, first, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(3); @@ -154,7 +154,7 @@ TEST(QUEUE_REMOVE, first, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_REMOVE, second, setUp, tearDown, 0, NULL) +TEST(queue_remove, second, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(3); @@ -163,7 +163,7 @@ TEST(QUEUE_REMOVE, second, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_REMOVE, success, setUp, tearDown, 0, NULL) +TEST(queue_remove, success, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(3); @@ -174,13 +174,13 @@ TEST(QUEUE_REMOVE, success, setUp, tearDown, 0, NULL) /****************************************************************************** * - * QUEUE_TAIL + * queue_tail * *****************************************************************************/ -SUITE(QUEUE_TAIL) +SUITE(queue_tail) -TEST(QUEUE_TAIL, one, setUp, tearDown, 0, NULL) +TEST(queue_tail, one, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(1); @@ -188,7 +188,7 @@ TEST(QUEUE_TAIL, one, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_TAIL, two, setUp, tearDown, 0, NULL) +TEST(queue_tail, two, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(2); @@ -196,7 +196,7 @@ TEST(QUEUE_TAIL, two, setUp, tearDown, 0, NULL) return MUNIT_OK; } -TEST(QUEUE_TAIL, three, setUp, tearDown, 0, NULL) +TEST(queue_tail, three, setUp, tearDown, 0, NULL) { struct fixture *f = data; PUSH(3); diff --git a/test/unit/test_conn.c b/test/unit/test_conn.c index a15708c5b..06bcfd7bc 100644 --- a/test/unit/test_conn.c +++ b/test/unit/test_conn.c @@ -25,7 +25,7 @@ TEST_MODULE(conn); static void connCloseCb(struct conn *conn) { - bool *closed = (bool *)conn->queue.next; + uint64_t *closed = (uint64_t *)conn->queue.next; *closed = true; } @@ -37,7 +37,7 @@ static void connCloseCb(struct conn *conn) FIXTURE_RAFT; \ FIXTURE_CLIENT; \ struct conn conn; \ - bool closed; + uint64_t closed; #define SETUP \ struct uv_stream_s *stream; \