diff --git a/config b/config index 4b32d38318..06eb99a168 100644 --- a/config +++ b/config @@ -296,6 +296,7 @@ HTTP_LUA_SRCS=" \ $ngx_addon_dir/src/ngx_http_lua_log_ringbuf.c \ $ngx_addon_dir/src/ngx_http_lua_input_filters.c \ $ngx_addon_dir/src/ngx_http_lua_pipe.c \ + $ngx_addon_dir/src/event/ngx_http_lua_kqueue.c \ " HTTP_LUA_DEPS=" \ @@ -355,6 +356,7 @@ HTTP_LUA_DEPS=" \ $ngx_addon_dir/src/ngx_http_lua_log_ringbuf.h \ $ngx_addon_dir/src/ngx_http_lua_input_filters.h \ $ngx_addon_dir/src/ngx_http_lua_pipe.h \ + $ngx_addon_dir/src/ngx_http_lua_event.h \ " # ---------------------------------------- diff --git a/src/event/ngx_http_lua_kqueue.c b/src/event/ngx_http_lua_kqueue.c new file mode 100644 index 0000000000..87eee1eeba --- /dev/null +++ b/src/event/ngx_http_lua_kqueue.c @@ -0,0 +1,87 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#include +#include +#include + +int ngx_lua_kqueue = -1; +struct kevent change_list[1]; +struct kevent event_list[1]; + +ngx_int_t +ngx_http_lua_kqueue_init(ngx_conf_t *cf) +{ + if (ngx_lua_kqueue == -1) { + ngx_lua_kqueue = kqueue(); + + if (ngx_lua_kqueue == -1) { + ngx_conf_log_error(NGX_LOG_ALERT, cf, 0, "kqueue() failed"); + + return NGX_ERROR; + } + } + + return NGX_OK; +} + + +void +ngx_http_lua_kqueue_set_event(ngx_event_t *ev, ngx_int_t filter) +{ + struct kevent *kev; + ngx_connection_t *c; + + c = ev->data; + + kev = &change_list[0]; + + kev->ident = c->fd; + kev->filter = (short) filter; + kev->flags = EV_ADD|EV_ENABLE; + kev->udata = NGX_KQUEUE_UDATA_T ((uintptr_t) ev | ev->instance); +} + + +ngx_int_t +ngx_http_lua_kqueue_process_events(ngx_http_request_t *r, ngx_msec_t timer) +{ + int events; + struct timespec ts; + ngx_event_t *ev; + ngx_int_t instance; + ngx_err_t err; + + ts.tv_sec = timer / 1000; + ts.tv_nsec = (timer % 1000) * 1000000; + + events = kevent(ngx_lua_kqueue, change_list, 1, event_list, 1, &ts); + + err = (events == -1) ? ngx_errno : 0; + + if (err) { + ngx_log_error(NGX_LOG_ALERT, r->connection->log, err, + "kevent() failed"); + + return NGX_ERROR; + } + + if (events == 0) { + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, + "kevent() returned no events without timeout"); + + return NGX_ERROR; + } + + ev = (ngx_event_t *) event_list[0].udata; + instance = (uintptr_t) ev & 1; + ev = (ngx_event_t *) ((uintptr_t) ev & (uintptr_t) ~1); + + ev->available = event_list[0].data; + ev->ready = 1; + + return NGX_OK; +} diff --git a/src/ngx_http_lua_event.h b/src/ngx_http_lua_event.h new file mode 100644 index 0000000000..9f3b660bef --- /dev/null +++ b/src/ngx_http_lua_event.h @@ -0,0 +1,24 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef _NGX_HTTP_LUA_EVENT_H_INCLUDED_ +#define _NGX_HTTP_LUA_EVENT_H_INCLUDED_ + + +#include "ngx_http_lua_common.h" + + +ngx_int_t ngx_http_lua_kqueue_init(ngx_conf_t *cf); + +void ngx_http_lua_kqueue_set_event(ngx_event_t *ev, ngx_int_t filter); + +ngx_int_t ngx_http_lua_kqueue_process_events(ngx_http_request_t *r, + ngx_msec_t timer); + + +#endif /* _NGX_HTTP_LUA_EVENT_H_INCLUDED_ */ + +/* vi:set ft=c ts=4 sw=4 et fdm=marker: */ diff --git a/src/ngx_http_lua_module.c b/src/ngx_http_lua_module.c index 7358a95639..5487778ac5 100644 --- a/src/ngx_http_lua_module.c +++ b/src/ngx_http_lua_module.c @@ -31,6 +31,7 @@ #include "ngx_http_lua_ssl_session_fetchby.h" #include "ngx_http_lua_headers.h" #include "ngx_http_lua_pipe.h" +#include "ngx_http_lua_event.h" static void *ngx_http_lua_create_main_conf(ngx_conf_t *cf); @@ -786,6 +787,11 @@ ngx_http_lua_init(ngx_conf_t *cf) cln->handler = ngx_http_lua_ngx_raw_header_cleanup; #endif + rc = ngx_http_lua_kqueue_init(cf); + if (rc == NGX_ERROR) { + return rc; + } + if (lmcf->lua == NULL) { dd("initializing lua vm"); diff --git a/src/ngx_http_lua_socket_tcp.c b/src/ngx_http_lua_socket_tcp.c index e2c2cf2f01..875eb4a721 100644 --- a/src/ngx_http_lua_socket_tcp.c +++ b/src/ngx_http_lua_socket_tcp.c @@ -17,6 +17,7 @@ #include "ngx_http_lua_output.h" #include "ngx_http_lua_contentby.h" #include "ngx_http_lua_probe.h" +#include "ngx_http_lua_event.h" static int ngx_http_lua_socket_tcp(lua_State *L); @@ -159,6 +160,12 @@ static void ngx_http_lua_ssl_handshake_handler(ngx_connection_t *c); static int ngx_http_lua_ssl_free_session(lua_State *L); #endif static void ngx_http_lua_socket_tcp_close_connection(ngx_connection_t *c); +static ngx_int_t ngx_http_lua_socket_tcp_block_conn(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u); +static ngx_int_t ngx_http_lua_socket_tcp_block_write(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u); +static ngx_int_t ngx_http_lua_socket_tcp_block_read(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u); enum { @@ -446,7 +453,7 @@ ngx_http_lua_socket_tcp(lua_State *L) return luaL_error(L, "no ctx found"); } - ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE); + /* ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE); */ lua_createtable(L, 5 /* narr */, 1 /* nrec */); lua_pushlightuserdata(L, ngx_http_lua_lightudata_mask( @@ -888,7 +895,7 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) return luaL_error(L, "no ctx found"); } - ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE); + /* ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE); */ luaL_checktype(L, 1, LUA_TTABLE); @@ -1477,11 +1484,16 @@ ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r, u->writer.last = &u->writer.out; #endif - ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + dd("setting data to %p", u); - coctx = ctx->cur_co_ctx; + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); - dd("setting data to %p", u); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + rc = ngx_http_lua_socket_tcp_block_conn(r, u); + if (rc == NGX_ERROR) { + return ngx_http_lua_socket_conn_error_retval_handler(r, u, L); + } + } if (rc == NGX_OK) { ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, @@ -1517,6 +1529,8 @@ ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r, /* rc == NGX_AGAIN */ + coctx = ctx->cur_co_ctx; + ngx_http_lua_cleanup_pending_operation(coctx); coctx->cleanup = ngx_http_lua_coctx_cleanup; coctx->data = u; @@ -1780,6 +1794,10 @@ ngx_http_lua_socket_tcp_sslhandshake(lua_State *L) dd("ngx_ssl_handshake returned %d", (int) rc); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + /* Do something */ + } + if (rc == NGX_AGAIN) { if (c->write->timer_set) { ngx_del_timer(c->write); @@ -2105,6 +2123,10 @@ ngx_http_lua_socket_tcp_receive_helper(ngx_http_request_t *r, rc = ngx_http_lua_socket_tcp_read(r, u); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + rc = ngx_http_lua_socket_tcp_block_read(r, u); + } + if (rc == NGX_ERROR) { dd("read failed: %d", (int) u->ft_type); rc = ngx_http_lua_socket_tcp_receive_retval_handler(r, u, L); @@ -2917,6 +2939,10 @@ ngx_http_lua_socket_tcp_send(lua_State *L) dd("socket send returned %d", (int) rc); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + rc = ngx_http_lua_socket_tcp_block_write(r, u); + } + if (rc == NGX_ERROR) { return ngx_http_lua_socket_write_error_retval_handler(r, u, L); } @@ -4499,6 +4525,10 @@ ngx_http_lua_socket_receiveuntil_iterator(lua_State *L) rc = ngx_http_lua_socket_tcp_read(r, u); + if (rc == NGX_AGAIN && !(ctx->context & NGX_HTTP_LUA_CONTEXT_YIELDABLE)) { + rc = ngx_http_lua_socket_tcp_block_read(r, u); + } + if (rc == NGX_ERROR) { dd("read failed: %d", (int) u->ft_type); rc = ngx_http_lua_socket_tcp_receive_retval_handler(r, u, L); @@ -6115,6 +6145,143 @@ ngx_http_lua_coctx_cleanup(void *data) } +static ngx_int_t +ngx_http_lua_socket_tcp_block_conn(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u) +{ + ngx_int_t rc; + ngx_msec_t delta; + ngx_connection_t *c = u->peer.connection; + ngx_msec_t timer = u->connect_timeout; + + ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT); + ngx_http_lua_kqueue_set_event(c->write, NGX_WRITE_EVENT); + + delta = ngx_current_msec; + + rc = ngx_http_lua_kqueue_process_events(r, timer); + + if (rc == NGX_ERROR) { + ngx_http_lua_socket_handle_conn_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); + + return rc; + } + + ngx_time_update(); + + if (ngx_current_msec - delta >= timer) { + ngx_http_lua_socket_handle_conn_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); + + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_lua_socket_tcp_block_write(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u) +{ + int rc; + ngx_msec_t delta; + ngx_connection_t *c = u->peer.connection; + ngx_msec_t timer = u->connect_timeout; + + ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT); + ngx_http_lua_kqueue_set_event(c->write, NGX_WRITE_EVENT); + + delta = ngx_current_msec; + + rc = ngx_http_lua_kqueue_process_events(r, timer); + + if (rc == NGX_ERROR) { + ngx_http_lua_socket_handle_write_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); + + return rc; + } + + ngx_time_update(); + + if (ngx_current_msec - delta >= timer) { + ngx_http_lua_socket_handle_write_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); + + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_lua_socket_tcp_block_read(ngx_http_request_t *r, + ngx_http_lua_socket_tcp_upstream_t *u) +{ + int rc; + ngx_msec_t delta; + ngx_event_t *rev; + ngx_connection_t *c = u->peer.connection; + ngx_msec_t timer = u->connect_timeout; + + ngx_http_lua_kqueue_set_event(c->read, NGX_READ_EVENT); + + delta = ngx_current_msec; + + for (;;) { + + ngx_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT); + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + + rc = ngx_http_lua_kqueue_process_events(r, timer); + if (rc == NGX_ERROR) { + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_ERROR); + + return rc; + } + + ngx_time_update(); + + /* timeout */ + if (ngx_current_msec - delta >= timer) { + ngx_http_lua_socket_handle_read_error(r, u, + NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); + + return NGX_ERROR; + } + + timer -= ngx_current_msec - delta; + + if (u->buffer.start != NULL) { + rev = c->read; + + rev->ready = 1; +#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP) + rev->available = 1; +#endif + + rc = ngx_http_lua_socket_tcp_read(r, u); + + if (rc == NGX_ERROR || rc == NGX_OK) { + return rc; + } + + /* NGX_AGAIN, continue in loop*/ + } + } + + /* unreachable */ + return NGX_ERROR; +} + + #if (NGX_HTTP_SSL) static int