Skip to content

Commit

Permalink
Add FIFO queue option
Browse files Browse the repository at this point in the history
  • Loading branch information
javalikescript committed Nov 20, 2024
1 parent 84436f3 commit 8a68607
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 13 deletions.
97 changes: 84 additions & 13 deletions src/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,18 @@
*/
#include "private.h"

typedef struct luv_async_send_s {
luv_thread_arg_t targ;
struct luv_async_send_s* next;
} luv_async_send_t;

typedef struct {
luv_thread_arg_t targ;
uv_mutex_t mutex;
int max; // FIFO queue in case of max > 1
int count;
luv_async_send_t* first;
luv_async_send_t* last;
} luv_async_arg_t;

#define luv_get_async_arg_from_handle(H) ((luv_async_arg_t *) ((luv_handle_t*) (H)->data)->extra)
Expand All @@ -29,29 +38,71 @@ static uv_async_t* luv_check_async(lua_State* L, int index) {
return handle;
}

#define luv_is_async_queue(AA) ((AA)->max > 1)

static luv_async_send_t* luv_async_pop(luv_async_arg_t* asarg) {
luv_async_send_t* sendarg = asarg->first;
if (sendarg != NULL) {
asarg->count--;
asarg->first = sendarg->next;
if (asarg->first == NULL) {
asarg->last = NULL;
}
}
return sendarg;
}

static luv_async_send_t* luv_async_push(luv_async_arg_t* asarg) {
luv_async_send_t* sendarg = (luv_async_send_t*)malloc(sizeof(luv_async_send_t));
memset(sendarg, 0, sizeof(luv_async_send_t));
asarg->count++;
if (asarg->last != NULL) {
asarg->last->next = sendarg;
}
asarg->last = sendarg;
if (asarg->first == NULL) {
asarg->first = sendarg;
}
return sendarg;
}

static void luv_async_cb(uv_async_t* handle) {
luv_handle_t* data = (luv_handle_t*)handle->data;
lua_State* L = data->ctx->L;
luv_async_arg_t* asarg = luv_get_async_arg_from_handle(handle);
uv_mutex_t *argmutex = &asarg->mutex;
luv_thread_arg_t targcpy;
luv_thread_arg_t targcpy; // work on a copy of the arguments
int n;
uv_mutex_lock(argmutex);
targcpy = asarg->targ; // work on a copy of the arguments
asarg->targ.argc = 0; // empty the original, nothing to clear
uv_mutex_unlock(argmutex);
n = luv_thread_arg_push(L, &targcpy, LUVF_THREAD_SIDE_MAIN);
if (n >= 0) {
luv_call_callback(L, data, LUV_ASYNC, n);
}
luv_thread_arg_clear(L, &targcpy, LUVF_THREAD_SIDE_MAIN); // clear the copy
int q = luv_is_async_queue(asarg);
do {
uv_mutex_lock(argmutex);
if (q) {
luv_async_send_t* sendarg = luv_async_pop(asarg);
if (sendarg == NULL) {
uv_mutex_unlock(argmutex);
return;
}
targcpy = sendarg->targ;
free(sendarg);
} else {
targcpy = asarg->targ;
asarg->targ.argc = 0; // empty the shared original, nothing to clear
}
uv_mutex_unlock(argmutex);
n = luv_thread_arg_push(L, &targcpy, LUVF_THREAD_SIDE_MAIN);
if (n >= 0) {
luv_call_callback(L, data, LUV_ASYNC, n);
}
luv_thread_arg_clear(L, &targcpy, LUVF_THREAD_SIDE_MAIN); // clear the copy
} while (q);
}

static int luv_new_async(lua_State* L) {
uv_async_t* handle;
luv_handle_t* data;
int ret;
luv_ctx_t* ctx = luv_context(L);
int max = luaL_optinteger(L, 2, 0);
luaL_checktype(L, 1, LUA_TFUNCTION);
handle = (uv_async_t*)luv_newuserdata(L, uv_handle_size(UV_ASYNC));
ret = uv_async_init(ctx->loop, handle, luv_async_cb);
Expand All @@ -62,6 +113,7 @@ static int luv_new_async(lua_State* L) {
data = luv_setup_handle(L, ctx);
luv_async_arg_t* asarg = (luv_async_arg_t*)malloc(sizeof(luv_async_arg_t));
memset(asarg, 0, sizeof(luv_async_arg_t));
asarg->max = max;
ret = uv_mutex_init(&asarg->mutex);
if (ret < 0) { // unlikely
abort();
Expand All @@ -83,7 +135,15 @@ static int luv_async_gc(lua_State* L) {
luv_async_arg_t* asarg = luv_get_async_arg_from_handle(handle);
uv_mutex_t *argmutex = &asarg->mutex;
uv_mutex_lock(argmutex);
luv_thread_arg_free(&asarg->targ); // in case of a pending send
if (luv_is_async_queue(asarg)) {
luv_async_send_t* sendarg;
while ((sendarg = luv_async_pop(asarg)) != NULL) {
luv_thread_arg_free(&sendarg->targ);
free(sendarg);
}
} else {
luv_thread_arg_free(&asarg->targ); // in case of a pending send
}
uv_mutex_unlock(argmutex);
uv_mutex_destroy(argmutex);
return luv_handle_gc(L);
Expand All @@ -94,10 +154,21 @@ static int luv_async_send(lua_State* L) {
uv_async_t* handle = luv_check_async(L, 1);
luv_async_arg_t* asarg = luv_get_async_arg_from_handle(handle);
uv_mutex_t *argmutex = &asarg->mutex;
luv_thread_arg_t* args;
int n;
uv_mutex_lock(argmutex);
luv_thread_arg_free(&asarg->targ); // in case of a pending send
n = luv_thread_arg_set(L, &asarg->targ, 2, lua_gettop(L), LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD);
if (luv_is_async_queue(asarg)) {
if (asarg->count >= asarg->max) {
uv_mutex_unlock(argmutex);
return luv_error(L, UV_ENOSPC);
}
luv_async_send_t* sendarg = luv_async_push(asarg);
args = &sendarg->targ;
} else {
luv_thread_arg_free(&asarg->targ); // in case of a pending send
args = &asarg->targ;
}
n = luv_thread_arg_set(L, args, 2, lua_gettop(L), LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD);
uv_mutex_unlock(argmutex);
if (n < 0) {
return luv_thread_arg_error(L);
Expand Down
21 changes: 21 additions & 0 deletions tests/test-async.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,27 @@ return require('lib/tap')(function (test)
end, async):join()
end)

test("test async queue send", function(p, p, expect, uv)
local async
async = uv.new_async(expect(function (v)
p('in async notify callback')
if v == 'close' then
async:close()
else
assert(v=='ok')
end
end, 3), 3)
uv.new_thread(function(asy)
local uv = require('luv')
assert(type(asy)=='userdata')
assert(asy:send('ok')==0)
assert(asy:send('ok')==0)
assert(asy:send('close')==0)
assert(select(3, asy:send('not ok'))=='ENOSPC')
uv.sleep(10)
end, async):join()
end)

test("test async send from same thread", function(p, p, expect, uv)
local async
async = uv.new_async(expect(function (v)
Expand Down

0 comments on commit 8a68607

Please sign in to comment.