Skip to content

Commit

Permalink
daemon/defer: fix charging time to UDP and non-UDP phases
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček committed Nov 28, 2024
1 parent f80a872 commit 6fc8a3e
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions daemon/defer.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
{
if (phase_accounting) {
phase_charge(nsec);
phase_accounting = false;
}

if (!stream) return; // UDP is not accounted in KRU
Expand Down Expand Up @@ -320,7 +319,6 @@ static inline void process_single_deferred(void)
if (kr_fails_assert(ctx)) return;

defer_sample_addr((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);
phase_accounting = true; // TODO check there are no suspensions of sampling

struct pl_defer_iter_data *idata = protolayer_iter_data_get_current(ctx);
struct pl_defer_sess_data *sdata = protolayer_sess_data_get_current(ctx);
Expand Down Expand Up @@ -414,6 +412,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
void *sess_data, void *iter_data,
struct protolayer_iter_ctx *ctx)
{
phase_accounting = false;
if (!defer || ctx->session->outgoing)
return protolayer_continue(ctx);

Expand All @@ -435,7 +434,7 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(

int priority = classify((const union kr_sockaddr *)ctx->comm->src_addr, ctx->session->stream);

if (priority == -1) {
if (priority == PRIORITY_SYNC) {
VERBOSE_LOG(" CONTINUE\n");
phase_accounting = true;
return protolayer_continue(ctx);
Expand All @@ -453,12 +452,14 @@ static enum protolayer_iter_cb_result pl_defer_unwrap(
if (waiting_requests_size > MAX_WAITING_REQS_SIZE) {
defer_sample_state_t prev_sample_state;
defer_sample_start(&prev_sample_state);
phase_accounting = true;
do {
process_single_deferred(); // possibly defers again without decreasing waiting_requests_size
// If the unwrapped query is to be processed here,
// it is the last iteration and the query is processed after returning.
defer_sample_restart();
} while (waiting_requests_size > MAX_WAITING_REQS_SIZE);
phase_accounting = false;
defer_sample_stop(&prev_sample_state, true);
}

Expand All @@ -470,6 +471,10 @@ static enum protolayer_event_cb_result pl_defer_event_unwrap(
enum protolayer_event_type event, void **baton,
struct session2 *session, void *sess_data)
{
if ((event == PROTOLAYER_EVENT_EOF) || (event == PROTOLAYER_EVENT_GENERAL_TIMEOUT)) {
// disable accounting only for events that cannot occur during incoming data processing
phase_accounting = false;
}
if (!defer || session->outgoing)
return PROTOLAYER_EVENT_PROPAGATE;

Expand Down Expand Up @@ -499,10 +504,12 @@ static void defer_queues_idle(uv_idle_t *handle)
VERBOSE_LOG(" %d waiting\n", waiting_requests);
defer_sample_start(NULL);
uint64_t idle_stamp = defer_sample_state.stamp;
phase_accounting = true;
do {
process_single_deferred();
defer_sample_restart();
} while ((waiting_requests > 0) && (defer_sample_state.stamp < idle_stamp + IDLE_TIMEOUT));
phase_accounting = false;
defer_sample_stop(NULL, true);
cleanup_queues();
udp_queue_send_all();
Expand Down

0 comments on commit 6fc8a3e

Please sign in to comment.