From 6fc8a3ee460c5f603fcc0dc591375b464fc1816d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Ondr=C3=A1=C4=8Dek?= Date: Thu, 28 Nov 2024 17:00:59 +0100 Subject: [PATCH] daemon/defer: fix charging time to UDP and non-UDP phases --- daemon/defer.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/daemon/defer.c b/daemon/defer.c index 177f008e1..32e30cb58 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -139,7 +139,6 @@ void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream) { if (phase_accounting) { phase_charge(nsec); - phase_accounting = false; } if (!stream) return; // UDP is not accounted in KRU @@ -320,7 +319,6 @@ static inline void process_single_deferred(void) if (kr_fails_assert(ctx)) return; defer_sample_addr((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream); - phase_accounting = true; // TODO check there are no suspensions of sampling struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx); struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx); @@ -414,6 +412,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx) { + phase_accounting = false; if (!defer || ctx->session->outgoing) return protolayer_continue(ctx); @@ -435,7 +434,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream); - if (priority == -1) { + if (priority == PRIORITY_SYNC) { VERBOSE_LOG(" CONTINUE\n"); phase_accounting = true; return protolayer_continue(ctx); @@ -453,12 +452,14 @@ static enum protolayer_iter_cb_result pl_defer_unwrap( if (waiting_requests_size > MAX_WAITING_REQS_SIZE) { defer_sample_state_t prev_sample_state; defer_sample_start(&prev_sample_state); + phase_accounting = true; do { process_single_deferred(); // possibly defers again without decreasing waiting_requests_size // If the unwrapped query is to be processed here, // it is the last iteration and the query is processed after returning. defer_sample_restart(); } while (waiting_requests_size > MAX_WAITING_REQS_SIZE); + phase_accounting = false; defer_sample_stop(&prev_sample_state, true); } @@ -470,6 +471,10 @@ static enum protolayer_event_cb_result pl_defer_event_unwrap( enum protolayer_event_type event, void **baton, struct session2 *session, void *sess_data) { + if ((event == PROTOLAYER_EVENT_EOF) || (event == PROTOLAYER_EVENT_GENERAL_TIMEOUT)) { + // disable accounting only for events that cannot occur during incoming data processing + phase_accounting = false; + } if (!defer || session->outgoing) return PROTOLAYER_EVENT_PROPAGATE; @@ -499,10 +504,12 @@ static void defer_queues_idle(uv_idle_t *handle) VERBOSE_LOG(" %d waiting\n", waiting_requests); defer_sample_start(NULL); uint64_t idle_stamp = defer_sample_state.stamp; + phase_accounting = true; do { process_single_deferred(); defer_sample_restart(); } while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT)); + phase_accounting = false; defer_sample_stop(NULL, true); cleanup_queues(); udp_queue_send_all();