diff --git a/daemon/defer.c b/daemon/defer.c index 94c1209d3..207926d49 100644 --- a/daemon/defer.c +++ b/daemon/defer.c @@ -13,16 +13,30 @@ #define V4_PREFIXES (uint8_t[]) { 18, 20, 24, 32 } #define V4_RATE_MULT (kru_price_t[]) { 768, 256, 32, 1 } +#define V4_SUBPRIO (uint8_t[]) { 0, 1, 3, 7 } #define V6_PREFIXES (uint8_t[]) { 32, 48, 56, 64, 128 } #define V6_RATE_MULT (kru_price_t[]) { 64, 4, 3, 2, 1 } +#define V6_SUBPRIO (uint8_t[]) { 2, 4, 5, 6, 7 } +#define SUBPRIO_CNT 8 #define V4_PREFIXES_CNT (sizeof(V4_PREFIXES) / sizeof(*V4_PREFIXES)) #define V6_PREFIXES_CNT (sizeof(V6_PREFIXES) / sizeof(*V6_PREFIXES)) #define MAX_PREFIXES_CNT ((V4_PREFIXES_CNT > V6_PREFIXES_CNT) ? V4_PREFIXES_CNT : V6_PREFIXES_CNT) +struct kru_conf { + uint8_t namespace; + size_t prefixes_cnt; + uint8_t *prefixes; + kru_price_t *rate_mult; + uint8_t *subprio; +} const +V4_CONF = {0, V4_PREFIXES_CNT, V4_PREFIXES, V4_RATE_MULT, V4_SUBPRIO}, +V6_CONF = {1, V6_PREFIXES_CNT, V6_PREFIXES, V6_RATE_MULT, V6_SUBPRIO}; + #define LOADS_THRESHOLDS (uint16_t[]) {1<<4, 1<<8, 1<<12, -1} // the last one should be UINT16_MAX -#define QUEUES_CNT (sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) + 1) // +1 for unverified +#define QUEUES_CNT ((sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS) - 1) * SUBPRIO_CNT + 2) + // priority 0 has no subpriorities, +1 for unverified #define PRIORITY_SYNC (-1) // no queue #define PRIORITY_UDP (QUEUES_CNT - 1) // last queue @@ -118,7 +132,8 @@ static bool using_avx2(void) } /// Print configuration into desc array. -void defer_str_conf(char *desc, int desc_len) { +void defer_str_conf(char *desc, int desc_len) +{ int len = 0; #define append(...) len += snprintf(desc + len, desc_len > len ? desc_len - len : 0, __VA_ARGS__) #define append_time(prefix, ms, suffix) { \ @@ -127,12 +142,13 @@ void defer_str_conf(char *desc, int desc_len) { else append(prefix "%7.1f s " suffix, ms / 1000); } append( " Expected cpus/procs: %5d\n", defer->cpus); + const size_t thresholds = sizeof(LOADS_THRESHOLDS) / sizeof(*LOADS_THRESHOLDS); append( " Max waiting requests:%7.1f MiB\n", MAX_WAITING_REQS_SIZE / 1024.0 / 1024.0); append_time(" Request timeout: ", REQ_TIMEOUT / 1000000.0, "\n"); append_time(" Idle: ", IDLE_TIMEOUT / 1000000.0, "\n"); append_time(" UDP phase: ", PHASE_UDP_TIMEOUT / 1000000.0, "\n"); append_time(" Non-UDP phase: ", PHASE_NON_UDP_TIMEOUT / 1000000.0, "\n"); - append( " Priority levels: %5ld + UDP\n", QUEUES_CNT - 1); + append( " Priority levels: %5ld (%ld main levels, %d sublevels) + UDP\n", QUEUES_CNT - 1, thresholds, SUBPRIO_CNT); size_t capacity_log = 0; for (size_t c = defer->capacity - 1; c > 0; c >>= 1) capacity_log++; @@ -141,16 +157,16 @@ void defer_str_conf(char *desc, int desc_len) { append( " KRU capacity: %7.1f k (%0.1f MiB)\n", (1 << capacity_log) / 1000.0, size / 1000000.0); bool uniform_thresholds = true; - for (int i = 1; i < QUEUES_CNT - 2; i++) - uniform_thresholds &= (LOADS_THRESHOLDS[i] == LOADS_THRESHOLDS[i-1] * LOADS_THRESHOLDS[0]); - uniform_thresholds &= ((1<<16) == (int)LOADS_THRESHOLDS[QUEUES_CNT - 3] * LOADS_THRESHOLDS[0]); + for (int i = 1; i < thresholds - 1; i++) + uniform_thresholds &= (LOADS_THRESHOLDS[i] == LOADS_THRESHOLDS[i - 1] * LOADS_THRESHOLDS[0]); + uniform_thresholds &= ((1<<16) == (int)LOADS_THRESHOLDS[thresholds - 2] * LOADS_THRESHOLDS[0]); append( " Decay: %7.3f %% per ms (32-bit max: %d)\n", 100.0 * MAX_DECAY / KRU_LIMIT, (kru_price_t)MAX_DECAY); float half_life = -1.0 / log2f(1.0 - (float)MAX_DECAY / KRU_LIMIT); append_time(" Half-life: ", half_life, "\n"); if (uniform_thresholds) - append_time(" Priority rise in: ", half_life * 16 / (QUEUES_CNT - 1), "\n"); + append_time(" Priority rise in: ", half_life * 16 / thresholds, "\n"); append_time(" Counter reset in: ", half_life * 16, "\n"); append(" Rate limits for crossing priority levels as single CPU utilization:\n"); @@ -168,7 +184,7 @@ void defer_str_conf(char *desc, int desc_len) { for (int v = 0; v < 2; v++) { for (int i = prefixes_cnt[v] - 1; i >= 0; i--) { append("%9sv%d/%-3d: ", "", version[v], prefixes[v][i]); - for (int j = 0; j < QUEUES_CNT - 1; j++) { + for (int j = 0; j < thresholds; j++) { float needed_util = (float)MAX_DECAY / (1<<16) * LOADS_THRESHOLDS[j] / BASE_PRICE(1000000) * rate_mult[v][i]; append("%12.3f %%", needed_util * 100); } @@ -186,7 +202,7 @@ void defer_str_conf(char *desc, int desc_len) { for (int v = 0; v < 2; v++) { for (int i = prefixes_cnt[v] - 1; i >= 0; i--) { append("%9sv%d/%-3d: ", "", version[v], prefixes[v][i]); - for (int j = 0; j < QUEUES_CNT - 1; j++) { + for (int j = 0; j < thresholds; j++) { float needed_time = (float)KRU_LIMIT / (1<<16) * LOADS_THRESHOLDS[j] / BASE_PRICE(1000000) * rate_mult[v][i]; if (needed_time < 1) { append("%11.1f us", needed_time * 1000); @@ -206,6 +222,27 @@ void defer_str_conf(char *desc, int desc_len) { } +/// Call KRU, return priority and as params load and prefix. +static inline int kru_charge_classify(const struct kru_conf *kru_conf, uint8_t *key, kru_price_t *prices, + uint16_t *out_load, uint8_t *out_prefix) +{ + uint16_t loads[kru_conf->prefixes_cnt]; + KRU.load_multi_prefix((struct kru *)defer->kru, kr_now(), + kru_conf->namespace, key, kru_conf->prefixes, prices, kru_conf->prefixes_cnt, loads); + + int priority = 0; + int prefix_index = kru_conf->prefixes_cnt - 1; + for (int i = kru_conf->prefixes_cnt - 1, j = 0; i >= 0; i--) { + for (; LOADS_THRESHOLDS[j] < loads[i]; j++) { + prefix_index = i; + priority = 1 + j * SUBPRIO_CNT + kru_conf->subprio[i]; + } + } + *out_load = loads[prefix_index]; + *out_prefix = kru_conf->prefixes[prefix_index]; + return priority; +} + /// Increment KRU counters by given time. void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream) { @@ -216,38 +253,30 @@ void defer_charge(uint64_t nsec, union kr_sockaddr *addr, bool stream) if (!stream) return; // UDP is not accounted in KRU _Alignas(16) uint8_t key[16] = {0, }; - uint16_t max_load = 0; - uint8_t prefix = 0; - uint64_t base_price = BASE_PRICE(nsec); - + const struct kru_conf *kru_conf; if (addr->ip.sa_family == AF_INET6) { memcpy(key, &addr->ip6.sin6_addr, 16); - - kru_price_t prices[V6_PREFIXES_CNT]; - for (size_t i = 0; i < V6_PREFIXES_CNT; i++) { - uint64_t price = base_price / V6_RATE_MULT[i]; - prices[i] = price > (kru_price_t)-1 ? -1 : price; - } - - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), - 1, key, V6_PREFIXES, prices, V6_PREFIXES_CNT, &prefix); + kru_conf = &V6_CONF; } else if (addr->ip.sa_family == AF_INET) { memcpy(key, &addr->ip4.sin_addr, 4); - - kru_price_t prices[V4_PREFIXES_CNT]; - for (size_t i = 0; i < V4_PREFIXES_CNT; i++) { - uint64_t price = base_price / V4_RATE_MULT[i]; - prices[i] = price > (kru_price_t)-1 ? -1 : price; - } - - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), - 0, key, V4_PREFIXES, prices, V4_PREFIXES_CNT, &prefix); + kru_conf = &V4_CONF; } else { return; } + uint64_t base_price = BASE_PRICE(nsec); + kru_price_t prices[kru_conf->prefixes_cnt]; + for (size_t i = 0; i < kru_conf->prefixes_cnt; i++) { + uint64_t price = base_price / kru_conf->rate_mult[i]; + prices[i] = price > (kru_price_t)-1 ? -1 : price; + } + + uint16_t load; + uint8_t prefix; + kru_charge_classify(kru_conf, key, prices, &load, &prefix); + VERBOSE_LOG(" %s ADD %4.3f ms -> load: %d on /%d\n", - kr_straddr(&addr->ip), nsec / 1000000.0, max_load, prefix); + kr_straddr(&addr->ip), nsec / 1000000.0, load, prefix); } /// Determine priority of the request in [-1, QUEUES_CNT - 1]. @@ -266,22 +295,20 @@ static inline int classify(const union kr_sockaddr *addr, bool stream) } _Alignas(16) uint8_t key[16] = {0, }; - uint16_t max_load = 0; - uint8_t prefix = 0; + const struct kru_conf *kru_conf; if (addr->ip.sa_family == AF_INET6) { memcpy(key, &addr->ip6.sin6_addr, 16); - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), - 1, key, V6_PREFIXES, NULL, V6_PREFIXES_CNT, &prefix); + kru_conf = &V6_CONF; } else if (addr->ip.sa_family == AF_INET) { memcpy(key, &addr->ip4.sin_addr, 4); - max_load = KRU.load_multi_prefix_max((struct kru *)defer->kru, kr_now(), - 0, key, V4_PREFIXES, NULL, V4_PREFIXES_CNT, &prefix); + kru_conf = &V4_CONF; } - int priority = 0; - for (; LOADS_THRESHOLDS[priority] < max_load; priority++); + uint16_t load; + uint8_t prefix; + int priority = kru_charge_classify(kru_conf, key, NULL, &load, &prefix); - VERBOSE_LOG(" load %d on /%d\n", max_load, prefix); + VERBOSE_LOG(" load %d on /%d\n", load, prefix); if ((phase & PHASE_NON_UDP) && (priority == 0) && (queue_len(queues[0]) == 0)) { phase_set(PHASE_NON_UDP); diff --git a/lib/kru.h b/lib/kru.h index ef177ef8d..b3690703d 100644 --- a/lib/kru.h +++ b/lib/kru.h @@ -90,6 +90,14 @@ struct kru_api { /// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace; as above. uint16_t (*load_multi_prefix_max)(struct kru *kru, uint32_t time_now, uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint8_t *prefix_out); + + + /// Multiple queries based on different prefixes of a single key. + /// Stores the final values of the involved counters normalized to the limit 2^16 to *loads_out (unless NULL). + /// Set prices to NULL to skip updating; otherwise, KRU is always updated, using maximal allowed value on overflow. + /// The key of i-th query consists of prefixes[i] bits of key, prefixes[i], and namespace; as above. + void (*load_multi_prefix)(struct kru *kru, uint32_t time_now, + uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *loads_out); }; // The functions are stored this way to make it easier to switch diff --git a/lib/kru.inc.c b/lib/kru.inc.c index 2272adab6..166e10048 100644 --- a/lib/kru.inc.c +++ b/lib/kru.inc.c @@ -565,6 +565,33 @@ static uint8_t kru_limited_multi_prefix_or(struct kru *kru, uint32_t time_now, u return 0; } +static void kru_load_multi_prefix(struct kru *kru, uint32_t time_now, uint8_t namespace, + uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *loads_out) +{ + struct query_ctx ctx[queries_cnt]; + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_prefetch_prefix(kru, time_now, namespace, key, prefixes[i], (prices ? prices[i] : 0), ctx + i); + } + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_fetch(kru, ctx + i); + } + + if (prices) { + for (int i = queries_cnt - 1; i >= 0; i--) { + kru_limited_update(kru, ctx + i, true); + } + } + + if (loads_out) { + for (size_t i = 0; i < queries_cnt; i++) { + loads_out[i] = ctx[i].final_load_value; + } + } +} + + static uint16_t kru_load_multi_prefix_max(struct kru *kru, uint32_t time_now, uint8_t namespace, uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint8_t *prefix_out) { @@ -612,5 +639,6 @@ static bool kru_limited(struct kru *kru, uint32_t time_now, uint8_t key[static 1 .limited_multi_or = kru_limited_multi_or, \ .limited_multi_or_nobreak = kru_limited_multi_or_nobreak, \ .limited_multi_prefix_or = kru_limited_multi_prefix_or, \ + .load_multi_prefix = kru_load_multi_prefix, \ .load_multi_prefix_max = kru_load_multi_prefix_max, \ }