Skip to content

Commit

Permalink
daemon/defer: add subpriorities by prefix length
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukáš Ondráček authored and vcunat committed Dec 30, 2024
1 parent ae1a3b2 commit d1b9ed9
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 41 deletions.
109 changes: 68 additions & 41 deletions daemon/defer.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,30 @@

#define V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 }
#define V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 }
#define V4_SUBPRIO (uint8_t[]) { 0, 1, 3, 7 }

#define V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 }
#define V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 }
#define V6_SUBPRIO (uint8_t[]) { 2, 4, 5, 6, 7 }

#define SUBPRIO_CNT 8
#define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES))
#define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES))
#define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT)

struct kru_conf {
uint8_t namespace;
size_t prefixes_cnt;
uint8_t *prefixes;
const kru_price_t *rate_mult;
const uint8_t *subprio;
} const
V4_CONF = {0, V4_PREFIXES_CNT, V4_PREFIXES, V4_RATE_MULT, V4_SUBPRIO},
V6_CONF = {1, V6_PREFIXES_CNT, V6_PREFIXES, V6_RATE_MULT, V6_SUBPRIO};

#define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<12, -1} // the last one should be UINT16_MAX
#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified
#define QUEUES_CNT ((sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1) * SUBPRIO_CNT + 2)
// priority 0 has no subpriorities, +1 for unverified
#define PRIORITY_SYNC (-1) // no queue
#define PRIORITY_UDP (QUEUES_CNT - 1) // last queue

Expand Down Expand Up @@ -118,7 +132,8 @@ static bool using_avx2(void)
}

/// Print configuration into desc array.
void defer_str_conf(char *desc, int desc_len) {
void defer_str_conf(char *desc, int desc_len)
{
int len = 0;
#define append(...) len += snprintf(desc + len, desc_len > len ? desc_len - len : 0, __VA_ARGS__)
#define append_time(prefix, ms, suffix) { \
Expand All @@ -127,12 +142,13 @@ void defer_str_conf(char *desc, int desc_len) {
else append(prefix "%7.1f s " suffix, ms / 1000); }
append( " Expected cpus/procs: %5d\n", defer->cpus);

const size_t thresholds = sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS);
append( " Max waiting requests:%7.1f MiB\n", MAX_WAITING_REQS_SIZE / 1024.0 / 1024.0);
append_time(" Request timeout: ", REQ_TIMEOUT / 1000000.0, "\n");
append_time(" Idle: ", IDLE_TIMEOUT / 1000000.0, "\n");
append_time(" UDP phase: ", PHASE_UDP_TIMEOUT / 1000000.0, "\n");
append_time(" Non-UDP phase: ", PHASE_NON_UDP_TIMEOUT / 1000000.0, "\n");
append( " Priority levels: %5ld + UDP\n", QUEUES_CNT - 1);
append( " Priority levels: %5ld (%ld main levels, %d sublevels) + UDP\n", QUEUES_CNT - 1, thresholds, SUBPRIO_CNT);

size_t capacity_log = 0;
for (size_t c = defer->capacity - 1; c > 0; c >>= 1) capacity_log++;
Expand All @@ -141,16 +157,16 @@ void defer_str_conf(char *desc, int desc_len) {
append( " KRU capacity: %7.1f k (%0.1f MiB)\n", (1 << capacity_log) / 1000.0, size / 1000000.0);

bool uniform_thresholds = true;
for (int i = 1; i < QUEUES_CNT - 2; i++)
uniform_thresholds &= (LOADS_THRESHOLDS[i] == LOADS_THRESHOLDS[i-1] * LOADS_THRESHOLDS[0]);
uniform_thresholds &= ((1<<16) == (int)LOADS_THRESHOLDS[QUEUES_CNT - 3] * LOADS_THRESHOLDS[0]);
for (int i = 1; i < thresholds - 1; i++)
uniform_thresholds &= (LOADS_THRESHOLDS[i] == LOADS_THRESHOLDS[i - 1] * LOADS_THRESHOLDS[0]);
uniform_thresholds &= ((1<<16) == (int)LOADS_THRESHOLDS[thresholds - 2] * LOADS_THRESHOLDS[0]);

append( " Decay: %7.3f %% per ms (32-bit max: %d)\n",
100.0 * MAX_DECAY / KRU_LIMIT, (kru_price_t)MAX_DECAY);
float half_life = -1.0 / log2f(1.0 - (float)MAX_DECAY / KRU_LIMIT);
append_time(" Half-life: ", half_life, "\n");
if (uniform_thresholds)
append_time(" Priority rise in: ", half_life * 16 / (QUEUES_CNT - 1), "\n");
append_time(" Priority rise in: ", half_life * 16 / thresholds, "\n");
append_time(" Counter reset in: ", half_life * 16, "\n");

append(" Rate limits for crossing priority levels as single CPU utilization:\n");
Expand All @@ -168,7 +184,7 @@ void defer_str_conf(char *desc, int desc_len) {
for (int v = 0; v < 2; v++) {
for (int i = prefixes_cnt[v] - 1; i >= 0; i--) {
append("%9sv%d/%-3d: ", "", version[v], prefixes[v][i]);
for (int j = 0; j < QUEUES_CNT - 1; j++) {
for (int j = 0; j < thresholds; j++) {
float needed_util = (float)MAX_DECAY / (1<<16) * LOADS_THRESHOLDS[j] / BASE_PRICE(1000000) * rate_mult[v][i];
append("%12.3f %%", needed_util * 100);
}
Expand All @@ -186,7 +202,7 @@ void defer_str_conf(char *desc, int desc_len) {
for (int v = 0; v < 2; v++) {
for (int i = prefixes_cnt[v] - 1; i >= 0; i--) {
append("%9sv%d/%-3d: ", "", version[v], prefixes[v][i]);
for (int j = 0; j < QUEUES_CNT - 1; j++) {
for (int j = 0; j < thresholds; j++) {
float needed_time = (float)KRU_LIMIT / (1<<16) * LOADS_THRESHOLDS[j] / BASE_PRICE(1000000) * rate_mult[v][i];
if (needed_time < 1) {
append("%11.1f us", needed_time * 1000);
Expand All @@ -206,6 +222,27 @@ void defer_str_conf(char *desc, int desc_len) {
}


/// Call KRU, return priority and as params load and prefix.
static inline int kru_charge_classify(const struct kru_conf *kru_conf, uint8_t *key, kru_price_t *prices,
uint16_t *out_load, uint8_t *out_prefix)
{
uint16_t loads[kru_conf->prefixes_cnt];
KRU.load_multi_prefix((struct kru *)defer->kru, kr_now(),
kru_conf->namespace, key, kru_conf->prefixes, prices, kru_conf->prefixes_cnt, loads);

int priority = 0;
int prefix_index = kru_conf->prefixes_cnt - 1;
for (int i = kru_conf->prefixes_cnt - 1, j = 0; i >= 0; i--) {
for (; LOADS_THRESHOLDS[j] < loads[i]; j++) {
prefix_index = i;
priority = 1 + j * SUBPRIO_CNT + kru_conf->subprio[i];
}
}
*out_load = loads[prefix_index];
*out_prefix = kru_conf->prefixes[prefix_index];
return priority;
}

/// Increment KRU counters by given time.
void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
{
Expand All @@ -216,38 +253,30 @@ void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream)
if (!stream) return; // UDP is not accounted in KRU

_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
uint64_t base_price = BASE_PRICE(nsec);

const struct kru_conf *kru_conf;
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);

kru_price_t prices[V6_PREFIXES_CNT];
for (size_t i = 0; i < V6_PREFIXES_CNT; i++) {
uint64_t price = base_price / V6_RATE_MULT[i];
prices[i] = price > (kru_price_t)-1 ? -1 : price;
}

max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix);
kru_conf = &V6_CONF;
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);

kru_price_t prices[V4_PREFIXES_CNT];
for (size_t i = 0; i < V4_PREFIXES_CNT; i++) {
uint64_t price = base_price / V4_RATE_MULT[i];
prices[i] = price > (kru_price_t)-1 ? -1 : price;
}

max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix);
kru_conf = &V4_CONF;
} else {
return;
}

uint64_t base_price = BASE_PRICE(nsec);
kru_price_t prices[kru_conf->prefixes_cnt];
for (size_t i = 0; i < kru_conf->prefixes_cnt; i++) {
uint64_t price = base_price / kru_conf->rate_mult[i];
prices[i] = price > (kru_price_t)-1 ? -1 : price;
}

uint16_t load;
uint8_t prefix;
kru_charge_classify(kru_conf, key, prices, &load, &prefix);

VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n",
kr_straddr(&addr->ip), nsec / 1000000.0, max_load, prefix);
kr_straddr(&addr->ip), nsec / 1000000.0, load, prefix);
}

/// Determine priority of the request in [-1, QUEUES_CNT - 1].
Expand All @@ -266,22 +295,20 @@ static inline int classify(const union kr_sockaddr *addr, bool stream)
}

_Alignas(16) uint8_t key[16] = {0, };
uint16_t max_load = 0;
uint8_t prefix = 0;
const struct kru_conf *kru_conf;
if (addr->ip.sa_family == AF_INET6) {
memcpy(key, &addr->ip6.sin6_addr, 16);
max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix);
kru_conf = &V6_CONF;
} else if (addr->ip.sa_family == AF_INET) {
memcpy(key, &addr->ip4.sin_addr, 4);
max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(),
0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix);
kru_conf = &V4_CONF;
}

int priority = 0;
for (; LOADS_THRESHOLDS[priority] < max_load; priority++);
uint16_t load;
uint8_t prefix;
int priority = kru_charge_classify(kru_conf, key, NULL, &load, &prefix);

VERBOSE_LOG(" load %d on /%d\n", max_load, prefix);
VERBOSE_LOG(" load %d on /%d\n", load, prefix);

if ((phase & PHASE_NON_UDP) && (priority == 0) && (queue_len(queues[0]) == 0)) {
phase_set(PHASE_NON_UDP);
Expand Down
8 changes: 8 additions & 0 deletions lib/kru.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ struct kru_api {
/// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace; as above.
uint16_t (*load_multi_prefix_max)(struct kru *kru, uint32_t time_now,
uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint8_t *prefix_out);


/// Multiple queries based on different prefixes of a single key.
/// Stores the final values of the involved counters normalized to the limit 2^16 to *loads_out (unless NULL).
/// Set prices to NULL to skip updating; otherwise, KRU is always updated, using maximal allowed value on overflow.
/// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace; as above.
void (*load_multi_prefix)(struct kru *kru, uint32_t time_now,
uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *loads_out);
};

// The functions are stored this way to make it easier to switch
Expand Down
28 changes: 28 additions & 0 deletions lib/kru.inc.c
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,33 @@ static uint8_t kru_limited_multi_prefix_or(struct kru *kru, uint32_t time_now, u
return 0;
}

static void kru_load_multi_prefix(struct kru *kru, uint32_t time_now, uint8_t namespace,
uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *loads_out)
{
struct query_ctx ctx[queries_cnt];

for (size_t i = 0; i < queries_cnt; i++) {
kru_limited_prefetch_prefix(kru, time_now, namespace, key, prefixes[i], (prices ? prices[i] : 0), ctx + i);
}

for (size_t i = 0; i < queries_cnt; i++) {
kru_limited_fetch(kru, ctx + i);
}

if (prices) {
for (int i = queries_cnt - 1; i >= 0; i--) {
kru_limited_update(kru, ctx + i, true);
}
}

if (loads_out) {
for (size_t i = 0; i < queries_cnt; i++) {
loads_out[i] = ctx[i].final_load_value;
}
}
}


static uint16_t kru_load_multi_prefix_max(struct kru *kru, uint32_t time_now, uint8_t namespace,
uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint8_t *prefix_out)
{
Expand Down Expand Up @@ -612,5 +639,6 @@ static bool kru_limited(struct kru *kru, uint32_t time_now, uint8_t key[static 1
.limited_multi_or = kru_limited_multi_or, \
.limited_multi_or_nobreak = kru_limited_multi_or_nobreak, \
.limited_multi_prefix_or = kru_limited_multi_prefix_or, \
.load_multi_prefix = kru_load_multi_prefix, \
.load_multi_prefix_max = kru_load_multi_prefix_max, \
}

0 comments on commit d1b9ed9

Please sign in to comment.