Skip to content

Commit

Permalink
daemon/defer: minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček committed Nov 28, 2024
1 parent 9fd3fac commit f80a872
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
13 changes: 8 additions & 5 deletions daemon/defer.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ static inline int classify(const union kr_sockaddr *addr, bool stream)
}



/// Push query to a queue according to its priority and activate idle.
static inline void push_query(struct protolayer_iter_ctx *ctx, int priority, bool to_head_end)
{
Expand Down Expand Up @@ -341,6 +340,8 @@ static inline void process_single_deferred(void)

if (age_ns >= REQ_TIMEOUT) {
VERBOSE_LOG(" BREAK (timeout)\n");
kr_log_warning(DEFER, "Data from %s too long in queue, dropping.\n",
kr_straddr(ctx->comm->src_addr)); // TODO make it notice as it's intended behavior of defer?
break_query(ctx, ETIME);
return;
}
Expand Down Expand Up @@ -417,16 +418,16 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
return protolayer_continue(ctx);

defer_sample_addr((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
struct pl_defer_iter_data *data = iter_data;
struct pl_defer_iter_data *idata = iter_data;
struct pl_defer_sess_data *sdata = sess_data;
data->req_stamp = defer_sample_state.stamp;
idata->req_stamp = defer_sample_state.stamp;

VERBOSE_LOG(" %s UNWRAP\n",
kr_straddr(ctx->comm->src_addr));

if (queue_len(sdata->queue) > 0) { // stream with preceding packet already deferred
queue_push(sdata->queue, ctx);
waiting_requests_size += data->size = protolayer_iter_size_est(ctx, false);
waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, false);
// payload counted in session wire buffer
VERBOSE_LOG(" PUSH as follow-up\n");
return protolayer_async();
Expand All @@ -446,14 +447,16 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
waiting_requests_size += sdata->size = protolayer_sess_size_est(ctx->session);
}
push_query(ctx, priority, false);
waiting_requests_size += data->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
waiting_requests_size += idata->size = protolayer_iter_size_est(ctx, !ctx->session->stream);
// for stream, payload is counted in session wire buffer

if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
defer_sample_state_t prev_sample_state;
defer_sample_start(&prev_sample_state);
do {
process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
// If the unwrapped query is to be processed here,
// it is the last iteration and the query is processed after returning.
defer_sample_restart();
} while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
defer_sample_stop(&prev_sample_state, true);
Expand Down
10 changes: 8 additions & 2 deletions daemon/defer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ static inline void defer_sample_addr(const union kr_sockaddr *addr, bool stream)

if (defer_sample_state.addr.ip.sa_family != AF_UNSPEC) {
// TODO: this costs performance, so only in some debug mode?
kr_assert(kr_sockaddr_cmp(&addr->ip, &defer_sample_state.addr.ip) == kr_ok());
return;
if (kr_fails_assert(kr_sockaddr_cmp(&addr->ip, &defer_sample_state.addr.ip) == kr_ok())) {
kr_log_error(DEFER, "%s != %s\n",
kr_straddr(&addr->ip),
kr_straddr(&defer_sample_state.addr.ip));
abort(); // TODO change this to warning or remove before releasing
return;
}

}

switch (addr->ip.sa_family) {
Expand Down
2 changes: 1 addition & 1 deletion daemon/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)

static void tcp_accept_internal(uv_stream_t *master, int status, enum kr_proto grp)
{
if (status != 0) {
if (status != 0) {
return;
}

Expand Down
2 changes: 1 addition & 1 deletion daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ int main(int argc, char **argv)
}

if (!defer_initialized) {
kr_log_warning(SYSTEM, "Prioritization not initialized from Lua, using hardcoded default.");
kr_log_warning(SYSTEM, "Prioritization not initialized from Lua, using hardcoded default.\n");
ret = defer_init("defer", 1);
if (ret) {
ret = EXIT_FAILURE;
Expand Down

0 comments on commit f80a872

Please sign in to comment.