Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
just-now committed Jan 17, 2024
1 parent 70dcb12 commit 86e32cd
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 52 deletions.
88 changes: 44 additions & 44 deletions src/lib/threadpool.c
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#include "src/lib/threadpool.h"
#include "src/utils.h"
#include <assert.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>

#include "src/utils.h"

static struct xx_loop_s *xx_loop(struct uv_loop_s *loop)
{
Expand All @@ -27,14 +27,15 @@ static struct xx_loop_s *xx_loop(struct uv_loop_s *loop)
enum {
MAX_THREADPOOL_SIZE = 1024,
XX_THREADPOOL_SIZE = 4,
NOT_AFFINED = ~0u,
};

struct thread_args {
uv_sem_t *sem;
unsigned int idx;
struct thread_args
{
uv_sem_t *sem;
unsigned int idx;
};


static uv_cond_t cond;
static uv_mutex_t mutex;
static unsigned int idle_threads;
Expand All @@ -53,7 +54,7 @@ static void xx__cancelled(struct xx__work *w UNUSED)

unsigned int xx__thread_id(void)
{
return *(unsigned int *) uv_key_get(&thread_key);
return *(unsigned int *)uv_key_get(&thread_key);
}

/* To avoid deadlock with uv_cancel() it's crucial that the worker
Expand All @@ -65,7 +66,6 @@ static void worker(void *arg)
queue *q;
struct thread_args *ta = arg;


uv_key_set(&thread_key, &ta->idx);
uv_sem_post(ta->sem);
uv_mutex_lock(&mutex);
Expand All @@ -74,20 +74,17 @@ static void worker(void *arg)
/* Keep waiting while either no work is present */
while (QUEUE__IS_EMPTY(&wq) &&
QUEUE__IS_EMPTY(&thread_queues[ta->idx])) {
//XXX printf("worker_e: idx=%u\n", ta->idx);
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}

//XXX printf("worker: idx=%u\n", ta->idx);
/* Process work item affinity */
/* Process work item thread affinity */
if (!QUEUE__IS_EMPTY(&thread_queues[ta->idx])) {
q = QUEUE__HEAD(&thread_queues[ta->idx]);
//XXX printf("worker: q=%p idx=%u\n", q, ta->idx);
QUEUE__REMOVE(q);
QUEUE__INIT(q);
QUEUE__INSERT_HEAD(&wq, q);
q = QUEUE__HEAD(&thread_queues[ta->idx]);
QUEUE__REMOVE(q);
QUEUE__INIT(q);
QUEUE__INSERT_HEAD(&wq, q);
}

q = QUEUE__HEAD(&wq);
Expand Down Expand Up @@ -123,14 +120,13 @@ static void post(queue *q, unsigned int idx)
{
uv_mutex_lock(&mutex);

//XXX printf("post: q=%p idx=%u\n", q, idx);
/* Assing work item affinity */
QUEUE__INSERT_TAIL(idx == ~0u ? &wq : &thread_queues[idx], q);
/* Assign work item thread affinity */
QUEUE__INSERT_TAIL(idx == NOT_AFFINED ? &wq : &thread_queues[idx], q);

/* TODO: Worth thinking how not to broadcast and use
uv_cond_signal(&cond) instead. */
if (idle_threads > 0) {
//XXX uv_cond_signal(&cond);
//XXX Worth thinking how not to broadcast
uv_cond_broadcast(&cond);
uv_cond_broadcast(&cond);
}
uv_mutex_unlock(&mutex);
}
Expand All @@ -142,7 +138,7 @@ static void xx__threadpool_cleanup(void)
if (nthreads == 0)
return;

post(&exit_message, ~0u);
post(&exit_message, NOT_AFFINED);

for (i = 0; i < nthreads; i++)
if (uv_thread_join(threads + i))
Expand Down Expand Up @@ -173,11 +169,11 @@ static void init_threads(void)
nthreads = MAX_THREADPOOL_SIZE;

if (uv_key_create(&thread_key))
abort();
abort();

threads = malloc(nthreads * sizeof(threads[0]));
thread_args = malloc(nthreads * sizeof(thread_args[0]));
thread_queues = malloc(nthreads * sizeof(thread_queues[0]));
threads = calloc(nthreads, sizeof(threads[0]));
thread_args = calloc(nthreads, sizeof(thread_args[0]));
thread_queues = calloc(nthreads, sizeof(thread_queues[0]));
if (threads == NULL || thread_args == NULL || thread_queues == NULL)
abort();

Expand All @@ -196,13 +192,14 @@ static void init_threads(void)
config.stack_size = 8u << 20; /* 8 MB */

for (i = 0; i < nthreads; i++) {
QUEUE__INIT(&thread_queues[i]);
thread_args[i] = (struct thread_args) {
.sem = &sem,
.idx = i,
};
if (uv_thread_create_ex(threads + i, &config, worker, &thread_args[i]))
abort();
QUEUE__INIT(&thread_queues[i]);
thread_args[i] = (struct thread_args){
.sem = &sem,
.idx = i,
};
if (uv_thread_create_ex(threads + i, &config, worker,
&thread_args[i]))
abort();
}

for (i = 0; i < nthreads; i++)
Expand All @@ -211,13 +208,19 @@ static void init_threads(void)
uv_sem_destroy(&sem);
}

static bool threads__invariant(void)
{
return nthreads > 0 && threads != NULL && thread_args != NULL &&
thread_queues != NULL && !IS0(&wq) && !ARE0(threads, nthreads) &&
!ARE0(thread_args, nthreads) && !ARE0(thread_queues, nthreads);
}

void xx__work_submit(uv_loop_t *loop,
struct xx__work *w,
void (*work)(struct xx__work *w),
void (*done)(struct xx__work *w, int status))
{
// XXX: PRE(threads_are_inited());
// use calloc() for allocation, check thread_mem_is_0
PRE(threads__invariant());
w->loop = loop;
w->work = work;
w->done = done;
Expand All @@ -228,8 +231,7 @@ static int xx__work_cancel(uv_loop_t *loop, struct xx__work *w)
{
int cancelled;

// XXX: PRE(threads_are_inited());
// use calloc() for allocation, check thread_mem_is_0
PRE(threads__invariant());
uv_mutex_lock(&mutex);
uv_mutex_lock(&xx_loop(w->loop)->wq_mutex);

Expand Down Expand Up @@ -309,9 +311,7 @@ int xx_queue_work(uv_loop_t *loop,
req->loop = loop;
req->work_cb = work_cb;
req->after_work_cb = after_work_cb;
//XXX: This is NOT the right place to do this calculation
req->work_req.thread_idx = cookie;
//XXX printf("xx_queue_work: req=%p idx=%u\n", req, req->work_req.thread_idx);
req->work_req.thread_idx = cookie % nthreads;
xx__work_submit(loop, &req->work_req, xx__queue_work, xx__queue_done);
return 0;
}
Expand All @@ -329,12 +329,12 @@ int xx_loop_init(struct xx_loop_s *loop)

err = uv_mutex_init(&loop->wq_mutex);
if (err != 0)
return err;
return err;

err = uv_async_init(&loop->loop, &loop->wq_async, xx__work_done);
if (err != 0) {
uv_mutex_destroy(&loop->wq_mutex);
return err;
uv_mutex_destroy(&loop->wq_mutex);
return err;
}

init_threads();
Expand Down
20 changes: 20 additions & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef DQLITE_UTILS_H_
#define DQLITE_UTILS_H_

#include <stdbool.h>
#include <stdint.h>

/* Various utility functions and macros */
Expand All @@ -18,4 +19,23 @@
#define container_of(ptr, type, member) \
((type *)((char *)(ptr)-offsetof(type, member)))

static inline bool _is0(const char *p, size_t s)
{
size_t i;

for (i = 0; i < s; ++i) {
if (p[i] != 0) {
return false;
}
}

return true;
}

#define ARE0(s, n) _is0((const char *)(s), (n))
#define IS0(s) _is0((const char *)(s), sizeof(*(s)))

#define PRE(cond) assert((cond))
#define POST(cond) assert((cond))

#endif /* DQLITE_UTILS_H_ */
13 changes: 5 additions & 8 deletions test/unit/ext/test_uv_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,15 @@ static void after_work_cb(xx_work_t *req, int status UNUSED)

for (i = 0; i <= WORK_ITEMS_NR; i++) {
work = malloc(sizeof(*work));
//XXX: parameterize i%4
rc = xx_queue_work(req->loop, work, i % 4, bottom_work_cb,
rc = xx_queue_work(req->loop, work, i, bottom_work_cb,
bottom_after_work_cb);
munit_assert_int(rc, ==, 0);
}
}

static void work_cb(xx_work_t *req)
{
munit_assert_int(req->work_req.thread_idx, ==, xx__thread_id());
munit_assert_int(req->work_req.thread_idx, ==, xx__thread_id());
}

static void threadpool_tear_down(void *data)
Expand All @@ -90,9 +89,7 @@ static void threadpool_tear_down(void *data)
static void *threadpool_setup(const MunitParameter params[] UNUSED,
void *user_data UNUSED)
{
struct fixture *f = munit_malloc(sizeof *f);
memset(f, 0x00, sizeof(*f));

struct fixture *f = calloc(1, sizeof *f);
loop_setup(f);
return f;
}
Expand All @@ -106,8 +103,8 @@ TEST_CASE(threadpool, sync, NULL)
struct fixture *f = data;
int rc;

rc = xx_queue_work(&f->xx_loop.loop, &f->work_req, 0,
work_cb, after_work_cb);
rc = xx_queue_work(&f->xx_loop.loop, &f->work_req, 0, work_cb,
after_work_cb);
munit_assert_int(rc, ==, 0);

rc = uv_run(&f->xx_loop.loop, UV_RUN_DEFAULT);
Expand Down

0 comments on commit 86e32cd

Please sign in to comment.