From fc573b08988969257d8774c9dba707737f4879ec Mon Sep 17 00:00:00 2001 From: Anatoliy Bilenko Date: Wed, 21 Feb 2024 10:03:26 +0000 Subject: [PATCH 1/2] Added thread-pool library Signed-off-by: Anatoliy Bilenko --- Makefile.am | 2 + src/lib/queue.h | 32 +++ src/lib/threadpool.c | 529 +++++++++++++++++++++++++++++++++++ src/lib/threadpool.h | 83 ++++++ test/unit/ext/test_uv_pool.c | 112 ++++++++ 5 files changed, 758 insertions(+) create mode 100644 src/lib/threadpool.c create mode 100644 src/lib/threadpool.h create mode 100644 test/unit/ext/test_uv_pool.c diff --git a/Makefile.am b/Makefile.am index 091cd15bb..53b5abd46 100644 --- a/Makefile.am +++ b/Makefile.am @@ -44,6 +44,7 @@ basic_dqlite_sources = \ src/lib/buffer.c \ src/lib/fs.c \ src/lib/sm.c \ + src/lib/threadpool.c \ src/lib/transport.c \ src/logger.c \ src/message.c \ @@ -149,6 +150,7 @@ unit_test_SOURCES += \ test/test_error.c \ test/test_integration.c \ test/unit/ext/test_uv.c \ + test/unit/ext/test_uv_pool.c \ test/unit/lib/test_addr.c \ test/unit/lib/test_buffer.c \ test/unit/lib/test_byte.c \ diff --git a/src/lib/queue.h b/src/lib/queue.h index 7b903dbdf..f538f18ff 100644 --- a/src/lib/queue.h +++ b/src/lib/queue.h @@ -37,6 +37,19 @@ typedef void *queue[2]; QUEUE__PREV(q) = (e); \ } +#define QUEUE__INSERT_TAIL(q, e) QUEUE__PUSH(q, e) + +/** + * Insert an element at the front of a queue. + */ +#define QUEUE__INSERT_HEAD(h, q) \ + { \ + QUEUE__NEXT(q) = QUEUE__NEXT(h); \ + QUEUE__PREV(q) = (h); \ + QUEUE__NEXT_PREV(q) = (q); \ + QUEUE__NEXT(h) = (q); \ + } + /** * Remove the given element from the queue. Any element can be removed at any * time. @@ -47,6 +60,25 @@ typedef void *queue[2]; QUEUE__NEXT_PREV(e) = QUEUE__PREV(e); \ } +/** + * Moves elements from queue @h to queue @n + * Note: Removed QUEUE__SPLIT() and merged it into QUEUE__MOVE(). + */ +#define QUEUE__MOVE(h, n) \ + { \ + if (QUEUE__IS_EMPTY(h)) { \ + QUEUE__INIT(n); \ + } else { \ + queue *__q = QUEUE__HEAD(h); \ + QUEUE__PREV(n) = QUEUE__PREV(h); \ + QUEUE__PREV_NEXT(n) = (n); \ + QUEUE__NEXT(n) = (__q); \ + QUEUE__PREV(h) = QUEUE__PREV(__q); \ + QUEUE__PREV_NEXT(h) = (h); \ + QUEUE__PREV(__q) = (n); \ + } \ + } + /** * Return the element at the front of the queue. */ diff --git a/src/lib/threadpool.c b/src/lib/threadpool.c new file mode 100644 index 000000000..23c9bf58a --- /dev/null +++ b/src/lib/threadpool.c @@ -0,0 +1,529 @@ +#include "src/lib/threadpool.h" +#include "src/lib/sm.h" +#include +#include +#include +#include +#include +#include "src/lib/queue.h" +#include "src/utils.h" + + +/** + * Planner thread state machine. + * + * signal() && + * empty(o) && signal() && exiting + * empty(u) && +-----> NOTHING ----------------> EXITED + * !exiting +------- ^ | + * | | + * empty(o) && | | signal() + * empty(u) | | !empty(o) || !empty(u) + * | | + * | | + * | V + * !empty(o) && +-----> DRAINING + * !empty(u) && +------- ^ | + * type(head(o)) != BAR | | + * | | type(head(o)) == BAR + * ord_in_flight == 0 | | + * | V + * BARRIER --------+ signal() + * ^ | <-------+ ord_in_flight == 0 + * | | + * empty(u) | | !empty(u) + * | V + * DRAINING_UNORD + */ + +enum planner_states { + PS_NOTHING, + PS_DRAINING, + PS_BARRIER, + PS_DRAINING_UNORD, + PS_EXITED, +}; + +static const struct sm_conf planner_states[SM_STATES_MAX] = { + [PS_NOTHING] = { + .flags = SM_INITIAL, + .name = "nothing", + .allowed = BITS(PS_DRAINING) + | BITS(PS_EXITED), + }, + [PS_DRAINING] = { + .name = "draining", + .allowed = BITS(PS_DRAINING) + | BITS(PS_NOTHING) + | BITS(PS_BARRIER), + }, + [PS_BARRIER] = { + .name = "barrier", + .allowed = BITS(PS_DRAINING_UNORD) + | BITS(PS_DRAINING) + | BITS(PS_BARRIER), + }, + [PS_DRAINING_UNORD] = { + .name = "draining-unord", + .allowed = BITS(PS_BARRIER) + }, + [PS_EXITED] = { + .flags = SM_FINAL, + .name = "exited", + .allowed = 0, + }, +}; + +enum { + THREADPOOL_SIZE_MAX = 1024, +}; + +typedef struct pool_thread pool_thread_t; +typedef struct pool_impl pool_impl_t; + +struct targs { + pool_impl_t *pi; + uv_sem_t *sem; + uint32_t idx; /* Thread's index */ +}; + +/* Worker thread of the pool */ +struct pool_thread { + queue inq; /* Thread's input queue */ + uv_cond_t cond; /* Signalled when work item appears in @inq */ + uv_thread_t thread; /* Pool's worker thread */ + struct targs arg; +}; + +struct pool_impl { + uv_mutex_t mutex; /* Input queue, planner_sm, + worker and planner threads lock */ + uint32_t threads_nr; + pool_thread_t *threads; + + queue outq; /* Output queue used by libuv part */ + uv_mutex_t outq_mutex; /* Output queue lock */ + uv_async_t outq_async; /* Signalled when output queue is not + empty and libuv loop has to process + items from it */ + uint64_t active_ws; /* Number of all work items in flight, + accessed from the main thread only */ + + queue ordered; /* Queue of WT_ORD{N} items */ + queue unordered; /* Queue of WT_UNORD items */ + struct sm planner_sm; /* State machine of the scheduler */ + uv_cond_t planner_cond; + uv_thread_t planner_thread; /* Scheduler's thread */ + + uint32_t ord_in_flight; /* Number of WT_ORD{N} in flight */ + bool exiting; /* True when the pool is being stopped */ + enum pool_work_type /* Type of the previous work item, */ + ord_prev; /* used in WT_ORD{N} ivariants */ + uint32_t qos; /* QoS token */ + uint32_t qos_prio; /* QoS prio */ +}; + +static inline bool has_active_ws(pool_t *pool) +{ + return pool->pi->active_ws > 0; +} + +static inline void w_register(pool_t *pool, pool_work_t *w) +{ + if (w->type != WT_BAR) + pool->pi->active_ws++; +} + +static inline void w_unregister(pool_t *pool, pool_work_t *) +{ + PRE(has_active_ws(pool)); + pool->pi->active_ws--; +} + +static bool empty(const queue *q) +{ + return QUEUE__IS_EMPTY(q); +} + +static queue *head(const queue *q) +{ + return QUEUE__HEAD(q); +} + +static void push(queue *to, queue *what) +{ + QUEUE__INSERT_TAIL(to, what); +} + +static queue *pop(queue *from) +{ + queue *q = QUEUE__HEAD(from); + PRE(q != NULL); + QUEUE__REMOVE(q); + QUEUE__INIT(q); + return q; +} + +static queue *qos_pop(pool_impl_t *pi, queue *first, queue *second) +{ + PRE(!empty(first) || !empty(second)); + + if (empty(first)) + return pop(second); + else if (empty(second)) + return pop(first); + + return pop(pi->qos++ % pi->qos_prio ? first : second); +} + +static pool_work_t *q_to_w(const queue *q) +{ + return QUEUE__DATA(q, pool_work_t, link); +} + +static enum pool_work_type q_type(const queue *q) +{ + return q_to_w(q)->type; +} + +static uint32_t q_tid(const queue *q) +{ + return q_to_w(q)->thread_id; +} + +static bool planner_invariant(const struct sm *m, int prev_state) +{ + pool_impl_t *pi = container_of(m, pool_impl_t, planner_sm); + queue *o = &pi->ordered; + queue *u = &pi->unordered; + + return ERGO(sm_state(m) == PS_NOTHING, empty(o) && empty(u)) && + ERGO(sm_state(m) == PS_DRAINING, + ERGO(prev_state == PS_BARRIER, + pi->ord_in_flight == 0 && empty(u)) && + ERGO(prev_state == PS_NOTHING, + !empty(u) || !empty(o))) && + ERGO(sm_state(m) == PS_EXITED, + pi->exiting && empty(o) && empty(u)) && + ERGO(sm_state(m) == PS_BARRIER, + ERGO(prev_state == PS_DRAINING, + q_type(head(o)) == WT_BAR) && + ERGO(prev_state == PS_DRAINING_UNORD, empty(u))) && + ERGO(sm_state(m) == PS_DRAINING_UNORD, !empty(u)); +} + +static void planner(void *arg) +{ + struct targs *ta = arg; + uv_sem_t *sem = ta->sem; + pool_impl_t *pi = ta->pi; + uv_mutex_t *mutex = &pi->mutex; + pool_thread_t *ts = pi->threads; + struct sm *planner_sm = &pi->planner_sm; + queue *o = &pi->ordered; + queue *u = &pi->unordered; + queue *q; + + sm_init(planner_sm, planner_invariant, NULL, planner_states, PS_NOTHING); + uv_sem_post(sem); + uv_mutex_lock(mutex); + for (;;) { + switch(sm_state(planner_sm)) { + case PS_NOTHING: + while (empty(o) && empty(u) && !pi->exiting) + uv_cond_wait(&pi->planner_cond, mutex); + sm_move(planner_sm, + pi->exiting ? PS_EXITED : PS_DRAINING); + break; + case PS_DRAINING: + while (!(empty(o) && empty(u))) { + sm_move(planner_sm, PS_DRAINING); + if (!empty(o) && q_type(head(o)) == WT_BAR) { + sm_move(planner_sm, PS_BARRIER); + goto ps_barrier; + } + q = qos_pop(pi, o, u); + push(&ts[q_tid(q)].inq, q); + uv_cond_signal(&ts[q_tid(q)].cond); + if (q_type(q) >= WT_ORD1) + pi->ord_in_flight++; + } + sm_move(planner_sm, PS_NOTHING); + ps_barrier: + break; + case PS_BARRIER: + if (!empty(u)) { + sm_move(planner_sm, PS_DRAINING_UNORD); + break; + } + if (pi->ord_in_flight == 0) { + q = pop(o); + PRE(q_to_w(q)->type == WT_BAR); + free(q_to_w(q)); + sm_move(planner_sm, PS_DRAINING); + break; + } + uv_cond_wait(&pi->planner_cond, mutex); + sm_move(planner_sm, PS_BARRIER); + break; + case PS_DRAINING_UNORD: + while (!empty(u)) { + q = pop(u); + push(&ts[q_tid(q)].inq, q); + uv_cond_signal(&ts[q_tid(q)].cond); + } + sm_move(planner_sm, PS_BARRIER); + break; + case PS_EXITED: + sm_fini(planner_sm); + uv_mutex_unlock(mutex); + return; + default: + POST(false && "Impossible!"); + } + } +} + +static void queue_work(pool_work_t *w) +{ + w->work_cb(w); +} + +static void queue_done(pool_work_t *w) +{ + w_unregister(w->pool, w); + if (w->after_work_cb != NULL) + w->after_work_cb(w); +} + +static void worker(void *arg) +{ + struct targs *ta = arg; + pool_impl_t *pi = ta->pi; + uv_mutex_t *mutex = &pi->mutex; + pool_thread_t *ts = pi->threads; + enum pool_work_type wtype; + pool_work_t *w; + queue *q; + + uv_sem_post(ta->sem); + uv_mutex_lock(mutex); + for (;;) { + while (empty(&ts[ta->idx].inq)) { + if (pi->exiting) { + uv_mutex_unlock(mutex); + return; + } + uv_cond_wait(&ts[ta->idx].cond, mutex); + } + + q = pop(&ts[ta->idx].inq); + uv_mutex_unlock(mutex); + + w = q_to_w(q); + wtype = w->type; + queue_work(w); + + uv_mutex_lock(&pi->outq_mutex); + push(&pi->outq, &w->link); + uv_async_send(&pi->outq_async); + uv_mutex_unlock(&pi->outq_mutex); + + uv_mutex_lock(mutex); + if (wtype > WT_BAR) { + assert(pi->ord_in_flight > 0); + if (--pi->ord_in_flight == 0) + uv_cond_signal(&pi->planner_cond); + } + } +} + +static void pool_cleanup(pool_t *pool) +{ + pool_impl_t *pi = pool->pi; + pool_thread_t *ts = pi->threads; + uint32_t i; + + if (pi->threads_nr == 0) + return; + + pi->exiting = true; + uv_cond_signal(&pi->planner_cond); + + if (uv_thread_join(&pi->planner_thread)) + abort(); + uv_cond_destroy(&pi->planner_cond); + POST(empty(&pi->ordered) && empty(&pi->unordered)); + + for (i = 0; i < pi->threads_nr; i++) { + uv_cond_signal(&ts[i].cond); + if (uv_thread_join(&ts[i].thread)) + abort(); + POST(empty(&ts[i].inq)); + uv_cond_destroy(&ts[i].cond); + } + + free(pi->threads); + uv_mutex_destroy(&pi->mutex); + pi->threads_nr = 0; +} + +static void pool_threads_init(pool_t *pool) +{ + uint32_t i; + uv_sem_t sem; + pool_impl_t *pi = pool->pi; + pool_thread_t *ts; + struct targs pa = { + .sem = &sem, + .pi = pi, + }; + uv_thread_options_t config = { + .flags = UV_THREAD_HAS_STACK_SIZE, + .stack_size = 8u << 20, + }; + + if (uv_mutex_init(&pi->mutex)) + abort(); + if (uv_sem_init(&sem, 0)) + abort(); + + pi->threads = calloc(pi->threads_nr, sizeof(pi->threads[0])); + if (pi->threads == NULL) + abort(); + + for (i = 0, ts = pi->threads; i < pi->threads_nr; i++) { + ts[i].arg = (struct targs) { + .pi = pi, + .sem = &sem, + .idx = i, + }; + + QUEUE__INIT(&ts[i].inq); + if (uv_cond_init(&ts[i].cond)) + abort(); + if (uv_thread_create_ex(&ts[i].thread, &config, worker, + &ts[i].arg)) + abort(); + } + + if (uv_cond_init(&pi->planner_cond)) + abort(); + if (uv_thread_create_ex(&pi->planner_thread, &config, planner, &pa)) + abort(); + for (i = 0; i < pi->threads_nr + 1 /* +planner */; i++) + uv_sem_wait(&sem); + + uv_sem_destroy(&sem); +} + +static void pool_work_submit(pool_t *pool, pool_work_t *w) +{ + pool_impl_t *pi = pool->pi; + queue *o = &pi->ordered; + queue *u = &pi->unordered; + + if (w->type > WT_UNORD) { + /* Make sure that elements in the ordered queue come in order. */ + PRE(ERGO(pi->ord_prev != WT_BAR && w->type != WT_BAR, + pi->ord_prev == w->type)); + pi->ord_prev = w->type; + } + + uv_mutex_lock(&pi->mutex); + push(w->type == WT_UNORD ? u : o, &w->link); + uv_cond_signal(&pi->planner_cond); + uv_mutex_unlock(&pi->mutex); +} + +void work_done(uv_async_t *handle) +{ + queue q = {}; + pool_impl_t *pi = container_of(handle, pool_impl_t, outq_async); + + uv_mutex_lock(&pi->outq_mutex); + QUEUE__MOVE(&pi->outq, &q); + uv_mutex_unlock(&pi->outq_mutex); + + while (!empty(&q)) + queue_done(q_to_w(pop(&q))); +} + +void pool_queue_work(pool_t *pool, + pool_work_t *w, + uint32_t cookie, + enum pool_work_type type, + void (*work_cb)(pool_work_t *w), + void (*after_work_cb)(pool_work_t *w)) +{ + PRE(work_cb != NULL && type < WT_NR); + + *w = (pool_work_t) { + .pool = pool, + .type = type, + .thread_id = cookie % pool->pi->threads_nr, + .work_cb = work_cb, + .after_work_cb = after_work_cb, + }; + w_register(pool, w); + pool_work_submit(pool, w); +} + +int pool_init(pool_t *pool, uv_loop_t *loop, uint32_t threads_nr) +{ + int rc; + pool_impl_t *pi = pool->pi; + + PRE(threads_nr <= THREADPOOL_SIZE_MAX); + + pi = pool->pi = calloc(1, sizeof(*pool->pi)); + if (pi == NULL) + return UV_ENOMEM; + + *pi = (pool_impl_t) { + .qos = 0, + .qos_prio = 2, + .exiting = false, + .ord_prev = WT_BAR, + .threads_nr = threads_nr, + .ord_in_flight = 0, + }; + QUEUE__INIT(&pi->outq); + QUEUE__INIT(&pi->ordered); + QUEUE__INIT(&pi->unordered); + + rc = uv_mutex_init(&pi->outq_mutex); + if (rc != 0) { + free(pi); + return rc; + } + + rc = uv_async_init(loop, &pi->outq_async, work_done); + if (rc != 0) { + free(pi); + uv_mutex_destroy(&pi->outq_mutex); + return rc; + } + + pool_threads_init(pool); + return 0; +} + +void pool_fini(pool_t *pool) +{ + pool_impl_t *pi = pool->pi; + + pool_cleanup(pool); + + uv_mutex_lock(&pi->outq_mutex); + POST(empty(&pi->outq) && !has_active_ws(pool)); + uv_mutex_unlock(&pi->outq_mutex); + + uv_mutex_destroy(&pi->outq_mutex); + free(pi); +} + +void pool_close(pool_t *pool) +{ + uv_close((uv_handle_t *)&pool->pi->outq_async, NULL); +} diff --git a/src/lib/threadpool.h b/src/lib/threadpool.h new file mode 100644 index 000000000..b4677230c --- /dev/null +++ b/src/lib/threadpool.h @@ -0,0 +1,83 @@ +#ifndef __THREAD_POOL__ +#define __THREAD_POOL__ + +#include +#include "queue.h" + +/** + Thread pool + + - Use-cases: + + - Move sqlite3-, IO- related blocking operations from libuv + loop's thread to pool's threads in order to unblock serving + incoming dqlite requests during sqlite3 IO. + Multiple sqlite3_step()-s can be in flight and executed + concurrently, while thread's loop is not IO blocked. + + - Introduced pool's work item thread affinity to serve sqlite3- + related items of each database in a "dedicated" thread which + allows not to make any assumption on sqlite3 threading model. + @see https://www.sqlite.org/threadsafe.html + + - The pool supports servicing of the following types of work items: + + - WT_UNORD - items, which can be processed by the pool in any + order, concurrency assumptions of this type of work are + guaranteed by other layers of the application. Read and write + transactions executed by sqlite3_step() are good examples for + such work item type. + + - WT_ORD_N - items, which can NOT be processed by the pool in + any order. The pool's logic shall guarantee that servicing + all WT_ORD_{N}s happens before WT_ORD_{N + 1}s. WT_ORD_{N}s + and WT_ORD_{N + 1}s operations can't be put into the pool + interleaved. Sqlite3 checkpoints is an example of WT_ORD_{N} + and InstallSnapshot(CP(), MV()) is an example of WT_ORD_{N + 1}. + + - WT_BAR - special purpose item, barrier. Delimits WT_ORD_{N}s + from WT_ORD_{N + 1}s. + + - The pool supports servicing of work items with a given quality + of service (QoS) considerations. For example, the priority of + serving read/write sqlite3 transactions (WT_UNORD) can be set + higher then snapshot installation (WT_ORD{N}). + */ + +struct pool_impl; +typedef struct pool_s pool_t; +typedef struct pool_work_s pool_work_t; + +enum pool_work_type { + WT_UNORD, + WT_BAR, + WT_ORD1, + WT_ORD2, + WT_NR, +}; + +struct pool_work_s { + queue link; /* Link into ordered, unordered and outq */ + uint32_t thread_id; /* Identifier of the thread the item is affined */ + pool_t *pool; /* The pool, item is being associated with */ + enum pool_work_type type; + + void (*work_cb)(pool_work_t *w); + void (*after_work_cb)(pool_work_t *w); +}; + +struct pool_s { + struct pool_impl *pi; +}; + +int pool_init(pool_t *pool, uv_loop_t *loop, uint32_t threads_nr); +void pool_fini(pool_t *pool); +void pool_close(pool_t *pool); +void pool_queue_work(pool_t *pool, + pool_work_t *w, + uint32_t cookie, + enum pool_work_type type, + void (*work_cb)(pool_work_t *w), + void (*after_work_cb)(pool_work_t *w)); + +#endif /* __THREAD_POOL__ */ diff --git a/test/unit/ext/test_uv_pool.c b/test/unit/ext/test_uv_pool.c new file mode 100644 index 000000000..1699da955 --- /dev/null +++ b/test/unit/ext/test_uv_pool.c @@ -0,0 +1,112 @@ +#include "../../../src/lib/threadpool.h" +#include "../../lib/runner.h" +#include "../../lib/uv.h" +#include "src/utils.h" + +TEST_MODULE(ext_uv_pool); + +/****************************************************************************** + * + * threadpool + * + ******************************************************************************/ + +enum { WORK_ITEMS_NR = 50000 }; + +struct fixture +{ + pool_work_t w; + uv_loop_t loop; + pool_t pool; +}; + +static void loop_setup(struct fixture *f) +{ + int rc; + + rc = uv_loop_init(&f->loop); + munit_assert_int(rc, ==, 0); + + rc = pool_init(&f->pool, &f->loop, 4); + munit_assert_int(rc, ==, 0); +} + +static void bottom_work_cb(pool_work_t *) +{ +} + +static void bottom_after_work_cb(pool_work_t *w) +{ + static int count = 0; + + if (count == WORK_ITEMS_NR) + pool_close(w->pool); + + count++; + assert(w->type != WT_BAR); + free(w); +} + +static void after_work_cb(pool_work_t *w) +{ + enum pool_work_type pwt; + pool_work_t *work; + unsigned int wt; + unsigned int i; + + for (i = 0; i <= WORK_ITEMS_NR + 1 /* +WT_BAR */; i++) { + work = malloc(sizeof(*work)); + + if (i < WORK_ITEMS_NR / 2) + wt = WT_ORD1; + else if (i == WORK_ITEMS_NR / 2) + wt = WT_BAR; + else + wt = WT_ORD2; + + pwt = i % 2 == 0 ? wt : WT_UNORD; + pool_queue_work(w->pool, + work, i, pwt, bottom_work_cb, + bottom_after_work_cb); + } +} + +static void work_cb(pool_work_t *) +{ +} + +static void threadpool_tear_down(void *data) +{ + int rc; + struct fixture *f = data; + + pool_fini(&f->pool); + rc = uv_loop_close(&f->loop); + munit_assert_int(rc, ==, 0); + free(f); +} + +static void *threadpool_setup(const MunitParameter params[] UNUSED, + void *user_data UNUSED) +{ + struct fixture *f = calloc(1, sizeof *f); + loop_setup(f); + return f; +} + +TEST_SUITE(threadpool); +TEST_SETUP(threadpool, threadpool_setup); +TEST_TEAR_DOWN(threadpool, threadpool_tear_down); +TEST_CASE(threadpool, sync, NULL) +{ + (void)params; + struct fixture *f = data; + int rc; + + pool_queue_work(&f->pool, &f->w, 0, WT_UNORD, work_cb, after_work_cb); + + rc = uv_run(&f->loop, UV_RUN_DEFAULT); + munit_assert_int(rc, ==, 0); + + return MUNIT_OK; +} From 616e1a35ea7cb12b924ea0107e8de624592a08c6 Mon Sep 17 00:00:00 2001 From: Anatoliy Bilenko Date: Wed, 21 Feb 2024 10:09:59 +0000 Subject: [PATCH 2/2] Thread pool's code reformatted Signed-off-by: Anatoliy Bilenko --- .clang-format | 3 +- src/lib/queue.h | 26 +-- src/lib/threadpool.c | 367 +++++++++++++++++++---------------- src/lib/threadpool.h | 43 ++-- test/unit/ext/test_uv_pool.c | 31 +-- 5 files changed, 253 insertions(+), 217 deletions(-) diff --git a/.clang-format b/.clang-format index 15538430b..5ad2cd768 100644 --- a/.clang-format +++ b/.clang-format @@ -2,7 +2,8 @@ BasedOnStyle: Chromium BreakBeforeBraces: Custom BraceWrapping: AfterFunction: true - AfterStruct: true + AfterStruct: false +Cpp11BracedListStyle: false IndentWidth: 8 UseTab: ForContinuationAndIndentation PointerAlignment: Right diff --git a/src/lib/queue.h b/src/lib/queue.h index f538f18ff..bbaffe802 100644 --- a/src/lib/queue.h +++ b/src/lib/queue.h @@ -64,19 +64,19 @@ typedef void *queue[2]; * Moves elements from queue @h to queue @n * Note: Removed QUEUE__SPLIT() and merged it into QUEUE__MOVE(). */ -#define QUEUE__MOVE(h, n) \ - { \ - if (QUEUE__IS_EMPTY(h)) { \ - QUEUE__INIT(n); \ - } else { \ - queue *__q = QUEUE__HEAD(h); \ - QUEUE__PREV(n) = QUEUE__PREV(h); \ - QUEUE__PREV_NEXT(n) = (n); \ - QUEUE__NEXT(n) = (__q); \ - QUEUE__PREV(h) = QUEUE__PREV(__q); \ - QUEUE__PREV_NEXT(h) = (h); \ - QUEUE__PREV(__q) = (n); \ - } \ +#define QUEUE__MOVE(h, n) \ + { \ + if (QUEUE__IS_EMPTY(h)) { \ + QUEUE__INIT(n); \ + } else { \ + queue *__q = QUEUE__HEAD(h); \ + QUEUE__PREV(n) = QUEUE__PREV(h); \ + QUEUE__PREV_NEXT(n) = (n); \ + QUEUE__NEXT(n) = (__q); \ + QUEUE__PREV(h) = QUEUE__PREV(__q); \ + QUEUE__PREV_NEXT(h) = (h); \ + QUEUE__PREV(__q) = (n); \ + } \ } /** diff --git a/src/lib/threadpool.c b/src/lib/threadpool.c index 23c9bf58a..9498a6985 100644 --- a/src/lib/threadpool.c +++ b/src/lib/threadpool.c @@ -1,13 +1,12 @@ -#include "src/lib/threadpool.h" -#include "src/lib/sm.h" +#include "threadpool.h" #include #include #include #include #include -#include "src/lib/queue.h" -#include "src/utils.h" - +#include "../../src/lib/queue.h" +#include "../../src/lib/sm.h" +#include "../../src/utils.h" /** * Planner thread state machine. @@ -42,40 +41,40 @@ enum planner_states { PS_BARRIER, PS_DRAINING_UNORD, PS_EXITED, + PS_NR, }; -static const struct sm_conf planner_states[SM_STATES_MAX] = { - [PS_NOTHING] = { - .flags = SM_INITIAL, - .name = "nothing", - .allowed = BITS(PS_DRAINING) - | BITS(PS_EXITED), - }, - [PS_DRAINING] = { - .name = "draining", - .allowed = BITS(PS_DRAINING) - | BITS(PS_NOTHING) - | BITS(PS_BARRIER), - }, - [PS_BARRIER] = { - .name = "barrier", - .allowed = BITS(PS_DRAINING_UNORD) - | BITS(PS_DRAINING) - | BITS(PS_BARRIER), - }, - [PS_DRAINING_UNORD] = { - .name = "draining-unord", - .allowed = BITS(PS_BARRIER) - }, - [PS_EXITED] = { - .flags = SM_FINAL, - .name = "exited", - .allowed = 0, - }, +static const struct sm_conf planner_states[PS_NR] = { + [PS_NOTHING] = { + .flags = SM_INITIAL, + .name = "nothing", + .allowed = BITS(PS_DRAINING) | BITS(PS_EXITED), + }, + [PS_DRAINING] = { + .name = "draining", + .allowed = BITS(PS_DRAINING) + | BITS(PS_NOTHING) + | BITS(PS_BARRIER), + }, + [PS_BARRIER] = { + .name = "barrier", + .allowed = BITS(PS_DRAINING_UNORD) + | BITS(PS_DRAINING) + | BITS(PS_BARRIER), + }, + [PS_DRAINING_UNORD] = { + .name = "draining-unord", + .allowed = BITS(PS_BARRIER) + }, + [PS_EXITED] = { + .flags = SM_FINAL, + .name = "exited", + .allowed = 0, + }, }; enum { - THREADPOOL_SIZE_MAX = 1024, + THREADPOOL_SIZE_MAX = 1024, }; typedef struct pool_thread pool_thread_t; @@ -83,44 +82,44 @@ typedef struct pool_impl pool_impl_t; struct targs { pool_impl_t *pi; - uv_sem_t *sem; - uint32_t idx; /* Thread's index */ + uv_sem_t *sem; + uint32_t idx; /* Thread's index */ }; /* Worker thread of the pool */ struct pool_thread { - queue inq; /* Thread's input queue */ - uv_cond_t cond; /* Signalled when work item appears in @inq */ - uv_thread_t thread; /* Pool's worker thread */ + queue inq; /* Thread's input queue */ + uv_cond_t cond; /* Signalled when work item appears in @inq */ + uv_thread_t thread; /* Pool's worker thread */ struct targs arg; }; struct pool_impl { - uv_mutex_t mutex; /* Input queue, planner_sm, - worker and planner threads lock */ - uint32_t threads_nr; + uv_mutex_t mutex; /* Input queue, planner_sm, + worker and planner threads lock */ + uint32_t threads_nr; pool_thread_t *threads; - queue outq; /* Output queue used by libuv part */ - uv_mutex_t outq_mutex; /* Output queue lock */ - uv_async_t outq_async; /* Signalled when output queue is not - empty and libuv loop has to process - items from it */ - uint64_t active_ws; /* Number of all work items in flight, - accessed from the main thread only */ - - queue ordered; /* Queue of WT_ORD{N} items */ - queue unordered; /* Queue of WT_UNORD items */ - struct sm planner_sm; /* State machine of the scheduler */ - uv_cond_t planner_cond; - uv_thread_t planner_thread; /* Scheduler's thread */ - - uint32_t ord_in_flight; /* Number of WT_ORD{N} in flight */ - bool exiting; /* True when the pool is being stopped */ - enum pool_work_type /* Type of the previous work item, */ - ord_prev; /* used in WT_ORD{N} ivariants */ - uint32_t qos; /* QoS token */ - uint32_t qos_prio; /* QoS prio */ + queue outq; /* Output queue used by libuv part */ + uv_mutex_t outq_mutex; /* Output queue lock */ + uv_async_t outq_async; /* Signalled when output queue is not + empty and libuv loop has to process + items from it */ + uint64_t active_ws; /* Number of all work items in flight, + accessed from the main thread only */ + + queue ordered; /* Queue of WT_ORD{N} items */ + queue unordered; /* Queue of WT_UNORD items */ + struct sm planner_sm; /* State machine of the scheduler */ + uv_cond_t planner_cond; + uv_thread_t planner_thread; /* Scheduler's thread */ + + uint32_t ord_in_flight; /* Number of WT_ORD{N} in flight */ + bool exiting; /* True when the pool is being stopped */ + enum pool_work_type /* Type of the previous work item, */ + ord_prev; /* used in WT_ORD{N} ivariants */ + uint32_t qos; /* QoS token */ + uint32_t qos_prio; /* QoS prio */ }; static inline bool has_active_ws(pool_t *pool) @@ -130,12 +129,14 @@ static inline bool has_active_ws(pool_t *pool) static inline void w_register(pool_t *pool, pool_work_t *w) { - if (w->type != WT_BAR) + if (w->type != WT_BAR) { pool->pi->active_ws++; + } } -static inline void w_unregister(pool_t *pool, pool_work_t *) +static inline void w_unregister(pool_t *pool, pool_work_t *w) { + (void)w; PRE(has_active_ws(pool)); pool->pi->active_ws--; } @@ -168,10 +169,11 @@ static queue *qos_pop(pool_impl_t *pi, queue *first, queue *second) { PRE(!empty(first) || !empty(second)); - if (empty(first)) + if (empty(first)) { return pop(second); - else if (empty(second)) + } else if (empty(second)) { return pop(first); + } return pop(pi->qos++ % pi->qos_prio ? first : second); } @@ -193,93 +195,97 @@ static uint32_t q_tid(const queue *q) static bool planner_invariant(const struct sm *m, int prev_state) { - pool_impl_t *pi = container_of(m, pool_impl_t, planner_sm); + pool_impl_t *pi = CONTAINER_OF(m, pool_impl_t, planner_sm); queue *o = &pi->ordered; queue *u = &pi->unordered; return ERGO(sm_state(m) == PS_NOTHING, empty(o) && empty(u)) && - ERGO(sm_state(m) == PS_DRAINING, - ERGO(prev_state == PS_BARRIER, - pi->ord_in_flight == 0 && empty(u)) && - ERGO(prev_state == PS_NOTHING, - !empty(u) || !empty(o))) && - ERGO(sm_state(m) == PS_EXITED, - pi->exiting && empty(o) && empty(u)) && - ERGO(sm_state(m) == PS_BARRIER, - ERGO(prev_state == PS_DRAINING, - q_type(head(o)) == WT_BAR) && - ERGO(prev_state == PS_DRAINING_UNORD, empty(u))) && - ERGO(sm_state(m) == PS_DRAINING_UNORD, !empty(u)); + ERGO(sm_state(m) == PS_DRAINING, + ERGO(prev_state == PS_BARRIER, + pi->ord_in_flight == 0 && empty(u)) && + ERGO(prev_state == PS_NOTHING, + !empty(u) || !empty(o))) && + ERGO(sm_state(m) == PS_EXITED, + pi->exiting && empty(o) && empty(u)) && + ERGO( + sm_state(m) == PS_BARRIER, + ERGO(prev_state == PS_DRAINING, q_type(head(o)) == WT_BAR) && + ERGO(prev_state == PS_DRAINING_UNORD, empty(u))) && + ERGO(sm_state(m) == PS_DRAINING_UNORD, !empty(u)); } static void planner(void *arg) { - struct targs *ta = arg; - uv_sem_t *sem = ta->sem; - pool_impl_t *pi = ta->pi; - uv_mutex_t *mutex = &pi->mutex; + struct targs *ta = arg; + uv_sem_t *sem = ta->sem; + pool_impl_t *pi = ta->pi; + uv_mutex_t *mutex = &pi->mutex; pool_thread_t *ts = pi->threads; - struct sm *planner_sm = &pi->planner_sm; - queue *o = &pi->ordered; - queue *u = &pi->unordered; - queue *q; + struct sm *planner_sm = &pi->planner_sm; + queue *o = &pi->ordered; + queue *u = &pi->unordered; + queue *q; - sm_init(planner_sm, planner_invariant, NULL, planner_states, PS_NOTHING); + sm_init(planner_sm, planner_invariant, NULL, planner_states, + PS_NOTHING); uv_sem_post(sem); uv_mutex_lock(mutex); for (;;) { - switch(sm_state(planner_sm)) { - case PS_NOTHING: - while (empty(o) && empty(u) && !pi->exiting) - uv_cond_wait(&pi->planner_cond, mutex); - sm_move(planner_sm, - pi->exiting ? PS_EXITED : PS_DRAINING); - break; - case PS_DRAINING: - while (!(empty(o) && empty(u))) { - sm_move(planner_sm, PS_DRAINING); - if (!empty(o) && q_type(head(o)) == WT_BAR) { - sm_move(planner_sm, PS_BARRIER); - goto ps_barrier; + switch (sm_state(planner_sm)) { + case PS_NOTHING: + while (empty(o) && empty(u) && !pi->exiting) { + uv_cond_wait(&pi->planner_cond, mutex); } - q = qos_pop(pi, o, u); - push(&ts[q_tid(q)].inq, q); - uv_cond_signal(&ts[q_tid(q)].cond); - if (q_type(q) >= WT_ORD1) - pi->ord_in_flight++; - } - sm_move(planner_sm, PS_NOTHING); - ps_barrier: - break; - case PS_BARRIER: - if (!empty(u)) { - sm_move(planner_sm, PS_DRAINING_UNORD); + sm_move(planner_sm, + pi->exiting ? PS_EXITED : PS_DRAINING); break; - } - if (pi->ord_in_flight == 0) { - q = pop(o); - PRE(q_to_w(q)->type == WT_BAR); - free(q_to_w(q)); - sm_move(planner_sm, PS_DRAINING); + case PS_DRAINING: + while (!(empty(o) && empty(u))) { + sm_move(planner_sm, PS_DRAINING); + if (!empty(o) && + q_type(head(o)) == WT_BAR) { + sm_move(planner_sm, PS_BARRIER); + goto ps_barrier; + } + q = qos_pop(pi, o, u); + push(&ts[q_tid(q)].inq, q); + uv_cond_signal(&ts[q_tid(q)].cond); + if (q_type(q) >= WT_ORD1) { + pi->ord_in_flight++; + } + } + sm_move(planner_sm, PS_NOTHING); + ps_barrier: break; - } - uv_cond_wait(&pi->planner_cond, mutex); - sm_move(planner_sm, PS_BARRIER); - break; - case PS_DRAINING_UNORD: - while (!empty(u)) { - q = pop(u); - push(&ts[q_tid(q)].inq, q); - uv_cond_signal(&ts[q_tid(q)].cond); - } - sm_move(planner_sm, PS_BARRIER); - break; - case PS_EXITED: - sm_fini(planner_sm); - uv_mutex_unlock(mutex); - return; - default: - POST(false && "Impossible!"); + case PS_BARRIER: + if (!empty(u)) { + sm_move(planner_sm, PS_DRAINING_UNORD); + break; + } + if (pi->ord_in_flight == 0) { + q = pop(o); + PRE(q_to_w(q)->type == WT_BAR); + free(q_to_w(q)); + sm_move(planner_sm, PS_DRAINING); + break; + } + uv_cond_wait(&pi->planner_cond, mutex); + sm_move(planner_sm, PS_BARRIER); + break; + case PS_DRAINING_UNORD: + while (!empty(u)) { + q = pop(u); + push(&ts[q_tid(q)].inq, q); + uv_cond_signal(&ts[q_tid(q)].cond); + } + sm_move(planner_sm, PS_BARRIER); + break; + case PS_EXITED: + sm_fini(planner_sm); + uv_mutex_unlock(mutex); + return; + default: + POST(false && "Impossible!"); } } } @@ -292,19 +298,20 @@ static void queue_work(pool_work_t *w) static void queue_done(pool_work_t *w) { w_unregister(w->pool, w); - if (w->after_work_cb != NULL) + if (w->after_work_cb != NULL) { w->after_work_cb(w); + } } static void worker(void *arg) { - struct targs *ta = arg; - pool_impl_t *pi = ta->pi; - uv_mutex_t *mutex = &pi->mutex; - pool_thread_t *ts = pi->threads; - enum pool_work_type wtype; - pool_work_t *w; - queue *q; + struct targs *ta = arg; + pool_impl_t *pi = ta->pi; + uv_mutex_t *mutex = &pi->mutex; + pool_thread_t *ts = pi->threads; + enum pool_work_type wtype; + pool_work_t *w; + queue *q; uv_sem_post(ta->sem); uv_mutex_lock(mutex); @@ -332,33 +339,37 @@ static void worker(void *arg) uv_mutex_lock(mutex); if (wtype > WT_BAR) { assert(pi->ord_in_flight > 0); - if (--pi->ord_in_flight == 0) + if (--pi->ord_in_flight == 0) { uv_cond_signal(&pi->planner_cond); + } } } } static void pool_cleanup(pool_t *pool) { - pool_impl_t *pi = pool->pi; + pool_impl_t *pi = pool->pi; pool_thread_t *ts = pi->threads; uint32_t i; - if (pi->threads_nr == 0) + if (pi->threads_nr == 0) { return; + } pi->exiting = true; uv_cond_signal(&pi->planner_cond); - if (uv_thread_join(&pi->planner_thread)) + if (uv_thread_join(&pi->planner_thread)) { abort(); + } uv_cond_destroy(&pi->planner_cond); POST(empty(&pi->ordered) && empty(&pi->unordered)); for (i = 0; i < pi->threads_nr; i++) { uv_cond_signal(&ts[i].cond); - if (uv_thread_join(&ts[i].thread)) + if (uv_thread_join(&ts[i].thread)) { abort(); + } POST(empty(&ts[i].inq)); uv_cond_destroy(&ts[i].cond); } @@ -370,11 +381,11 @@ static void pool_cleanup(pool_t *pool) static void pool_threads_init(pool_t *pool) { - uint32_t i; - uv_sem_t sem; - pool_impl_t *pi = pool->pi; + uint32_t i; + uv_sem_t sem; + pool_impl_t *pi = pool->pi; pool_thread_t *ts; - struct targs pa = { + struct targs pa = { .sem = &sem, .pi = pi, }; @@ -383,36 +394,44 @@ static void pool_threads_init(pool_t *pool) .stack_size = 8u << 20, }; - if (uv_mutex_init(&pi->mutex)) + if (uv_mutex_init(&pi->mutex)) { abort(); - if (uv_sem_init(&sem, 0)) + } + if (uv_sem_init(&sem, 0)) { abort(); + } pi->threads = calloc(pi->threads_nr, sizeof(pi->threads[0])); - if (pi->threads == NULL) + if (pi->threads == NULL) { abort(); + } for (i = 0, ts = pi->threads; i < pi->threads_nr; i++) { - ts[i].arg = (struct targs) { + ts[i].arg = (struct targs){ .pi = pi, .sem = &sem, .idx = i, }; QUEUE__INIT(&ts[i].inq); - if (uv_cond_init(&ts[i].cond)) + if (uv_cond_init(&ts[i].cond)) { abort(); + } if (uv_thread_create_ex(&ts[i].thread, &config, worker, - &ts[i].arg)) + &ts[i].arg)) { abort(); + } } - if (uv_cond_init(&pi->planner_cond)) + if (uv_cond_init(&pi->planner_cond)) { abort(); - if (uv_thread_create_ex(&pi->planner_thread, &config, planner, &pa)) + } + if (uv_thread_create_ex(&pi->planner_thread, &config, planner, &pa)) { abort(); - for (i = 0; i < pi->threads_nr + 1 /* +planner */; i++) + } + for (i = 0; i < pi->threads_nr + 1 /* +planner */; i++) { uv_sem_wait(&sem); + } uv_sem_destroy(&sem); } @@ -424,7 +443,8 @@ static void pool_work_submit(pool_t *pool, pool_work_t *w) queue *u = &pi->unordered; if (w->type > WT_UNORD) { - /* Make sure that elements in the ordered queue come in order. */ + /* Make sure that elements in the ordered queue come in order. + */ PRE(ERGO(pi->ord_prev != WT_BAR && w->type != WT_BAR, pi->ord_prev == w->type)); pi->ord_prev = w->type; @@ -439,14 +459,15 @@ static void pool_work_submit(pool_t *pool, pool_work_t *w) void work_done(uv_async_t *handle) { queue q = {}; - pool_impl_t *pi = container_of(handle, pool_impl_t, outq_async); + pool_impl_t *pi = CONTAINER_OF(handle, pool_impl_t, outq_async); uv_mutex_lock(&pi->outq_mutex); QUEUE__MOVE(&pi->outq, &q); uv_mutex_unlock(&pi->outq_mutex); - while (!empty(&q)) + while (!empty(&q)) { queue_done(q_to_w(pop(&q))); + } } void pool_queue_work(pool_t *pool, @@ -458,7 +479,7 @@ void pool_queue_work(pool_t *pool, { PRE(work_cb != NULL && type < WT_NR); - *w = (pool_work_t) { + *w = (pool_work_t){ .pool = pool, .type = type, .thread_id = cookie % pool->pi->threads_nr, @@ -469,7 +490,10 @@ void pool_queue_work(pool_t *pool, pool_work_submit(pool, w); } -int pool_init(pool_t *pool, uv_loop_t *loop, uint32_t threads_nr) +int pool_init(pool_t *pool, + uv_loop_t *loop, + uint32_t threads_nr, + uint32_t qos_prio) { int rc; pool_impl_t *pi = pool->pi; @@ -477,12 +501,13 @@ int pool_init(pool_t *pool, uv_loop_t *loop, uint32_t threads_nr) PRE(threads_nr <= THREADPOOL_SIZE_MAX); pi = pool->pi = calloc(1, sizeof(*pool->pi)); - if (pi == NULL) + if (pi == NULL) { return UV_ENOMEM; + } - *pi = (pool_impl_t) { + *pi = (pool_impl_t){ .qos = 0, - .qos_prio = 2, + .qos_prio = qos_prio, .exiting = false, .ord_prev = WT_BAR, .threads_nr = threads_nr, diff --git a/src/lib/threadpool.h b/src/lib/threadpool.h index b4677230c..96e8c1e10 100644 --- a/src/lib/threadpool.h +++ b/src/lib/threadpool.h @@ -23,20 +23,20 @@ - The pool supports servicing of the following types of work items: - WT_UNORD - items, which can be processed by the pool in any - order, concurrency assumptions of this type of work are - guaranteed by other layers of the application. Read and write - transactions executed by sqlite3_step() are good examples for - such work item type. + order, concurrency assumptions of this type of work are + guaranteed by other layers of the application. Read and write + transactions executed by sqlite3_step() are good examples for + such work item type. - WT_ORD_N - items, which can NOT be processed by the pool in - any order. The pool's logic shall guarantee that servicing - all WT_ORD_{N}s happens before WT_ORD_{N + 1}s. WT_ORD_{N}s - and WT_ORD_{N + 1}s operations can't be put into the pool - interleaved. Sqlite3 checkpoints is an example of WT_ORD_{N} - and InstallSnapshot(CP(), MV()) is an example of WT_ORD_{N + 1}. + any order. The pool's logic shall guarantee that servicing + all WT_ORD_{N}s happens before WT_ORD_{N + 1}s. WT_ORD_{N}s + and WT_ORD_{N + 1}s operations can't be put into the pool + interleaved. Sqlite3 checkpoints is an example of WT_ORD_{N} + and InstallSnapshot(CP(), MV()) is an example of WT_ORD_{N + 1}. - WT_BAR - special purpose item, barrier. Delimits WT_ORD_{N}s - from WT_ORD_{N + 1}s. + from WT_ORD_{N + 1}s. - The pool supports servicing of work items with a given quality of service (QoS) considerations. For example, the priority of @@ -56,21 +56,30 @@ enum pool_work_type { WT_NR, }; -struct pool_work_s { - queue link; /* Link into ordered, unordered and outq */ - uint32_t thread_id; /* Identifier of the thread the item is affined */ - pool_t *pool; /* The pool, item is being associated with */ +struct pool_work_s +{ + queue link; /* Link into ordered, unordered and outq */ + uint32_t thread_id; /* Identifier of the thread the item is affined */ + pool_t *pool; /* The pool, item is being associated with */ enum pool_work_type type; void (*work_cb)(pool_work_t *w); void (*after_work_cb)(pool_work_t *w); }; -struct pool_s { +struct pool_s +{ struct pool_impl *pi; }; -int pool_init(pool_t *pool, uv_loop_t *loop, uint32_t threads_nr); +enum { + POOL_QOS_PRIO_FAIR = 2, +}; + +int pool_init(pool_t *pool, + uv_loop_t *loop, + uint32_t threads_nr, + uint32_t qos_prio); void pool_fini(pool_t *pool); void pool_close(pool_t *pool); void pool_queue_work(pool_t *pool, @@ -80,4 +89,4 @@ void pool_queue_work(pool_t *pool, void (*work_cb)(pool_work_t *w), void (*after_work_cb)(pool_work_t *w)); -#endif /* __THREAD_POOL__ */ +#endif /* __THREAD_POOL__ */ diff --git a/test/unit/ext/test_uv_pool.c b/test/unit/ext/test_uv_pool.c index 1699da955..5f2f82a75 100644 --- a/test/unit/ext/test_uv_pool.c +++ b/test/unit/ext/test_uv_pool.c @@ -1,7 +1,7 @@ #include "../../../src/lib/threadpool.h" +#include "../../../src/utils.h" #include "../../lib/runner.h" #include "../../lib/uv.h" -#include "src/utils.h" TEST_MODULE(ext_uv_pool); @@ -13,11 +13,10 @@ TEST_MODULE(ext_uv_pool); enum { WORK_ITEMS_NR = 50000 }; -struct fixture -{ +struct fixture { pool_work_t w; - uv_loop_t loop; - pool_t pool; + uv_loop_t loop; + pool_t pool; }; static void loop_setup(struct fixture *f) @@ -27,12 +26,13 @@ static void loop_setup(struct fixture *f) rc = uv_loop_init(&f->loop); munit_assert_int(rc, ==, 0); - rc = pool_init(&f->pool, &f->loop, 4); + rc = pool_init(&f->pool, &f->loop, 4, POOL_QOS_PRIO_FAIR); munit_assert_int(rc, ==, 0); } -static void bottom_work_cb(pool_work_t *) +static void bottom_work_cb(pool_work_t *w) { + (void)w; } static void bottom_after_work_cb(pool_work_t *w) @@ -58,21 +58,21 @@ static void after_work_cb(pool_work_t *w) work = malloc(sizeof(*work)); if (i < WORK_ITEMS_NR / 2) - wt = WT_ORD1; + wt = WT_ORD1; else if (i == WORK_ITEMS_NR / 2) - wt = WT_BAR; + wt = WT_BAR; else - wt = WT_ORD2; + wt = WT_ORD2; pwt = i % 2 == 0 ? wt : WT_UNORD; - pool_queue_work(w->pool, - work, i, pwt, bottom_work_cb, + pool_queue_work(w->pool, work, i, pwt, bottom_work_cb, bottom_after_work_cb); } } -static void work_cb(pool_work_t *) +static void work_cb(pool_work_t *w) { + (void)w; } static void threadpool_tear_down(void *data) @@ -86,9 +86,10 @@ static void threadpool_tear_down(void *data) free(f); } -static void *threadpool_setup(const MunitParameter params[] UNUSED, - void *user_data UNUSED) +static void *threadpool_setup(const MunitParameter params[], void *user_data) { + (void)params; + (void)user_data; struct fixture *f = calloc(1, sizeof *f); loop_setup(f); return f;