diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index 997f3c646e1..8ef6c143487 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -132,13 +132,9 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon } } -static int canAddNetworkBytesOut(int slot) { - return clusterSlotStatsEnabled() && slot != -1; -} - /* Accumulates egress bytes for the slot. */ void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) { - if (!canAddNetworkBytesOut(slot)) return; + if (!clusterSlotStatsEnabled(slot)) return; serverAssert(slot >= 0 && slot < CLUSTER_SLOTS); server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out; @@ -152,7 +148,7 @@ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { /* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) { client *c = server.current_client; - if (c == NULL || !canAddNetworkBytesOut(c->slot)) return; + if (c == NULL || !clusterSlotStatsEnabled(c->slot)) return; serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); serverAssert(nodeIsPrimary(server.cluster->myself)); @@ -179,7 +175,7 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) { * This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation. * This function covers the internal propagation component. */ void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) { - if (!canAddNetworkBytesOut(slot)) return; + if (!clusterSlotStatsEnabled(slot)) return; serverAssert(slot >= 0 && slot < CLUSTER_SLOTS); server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd; @@ -214,8 +210,7 @@ void clusterSlotStatResetAll(void) { * would equate to repeating the same calculation twice. */ static int canAddCpuDuration(client *c) { - return clusterSlotStatsEnabled() && - c->slot != -1 && /* Command should be slot specific. */ + return clusterSlotStatsEnabled(c->slot) && (!server.execution_nesting || /* Either; */ (server.execution_nesting && /* 1) Command should not be nested, or */ c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */ @@ -242,8 +237,7 @@ static int canAddNetworkBytesIn(client *c) { * Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. * Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of * EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */ - return clusterSlotStatsEnabled() && c->slot != -1 && !(c->flag.blocked) && - !server.in_exec; + return clusterSlotStatsEnabled(c->slot) && !(c->flag.blocked) && !server.in_exec; } /* Adds network ingress bytes of the current command in execution, @@ -338,7 +332,6 @@ void clusterSlotStatsCommand(client *c) { } } -int clusterSlotStatsEnabled(void) { - return server.cluster_slot_stats_enabled && /* Config should be enabled. */ - server.cluster_enabled; /* Cluster mode should be enabled. */ +int clusterSlotStatsEnabled(int slot) { + return server.cluster_slot_stats_enabled && server.cluster_enabled && slot != -1; } diff --git a/src/cluster_slot_stats.h b/src/cluster_slot_stats.h index 3a78fa309f9..f5c103e9ed5 100644 --- a/src/cluster_slot_stats.h +++ b/src/cluster_slot_stats.h @@ -6,7 +6,7 @@ /* General use-cases. */ void clusterSlotStatReset(int slot); void clusterSlotStatResetAll(void); -int clusterSlotStatsEnabled(void); +int clusterSlotStatsEnabled(int slot); /* cpu-usec metric. */ void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration); diff --git a/src/config.c b/src/config.c index a2b33734bf4..fdfa732793f 100644 --- a/src/config.c +++ b/src/config.c @@ -3192,7 +3192,6 @@ standardConfig static_configs[] = { createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), - createBoolConfig("reply-offload", NULL, MODIFIABLE_CONFIG, server.reply_offload_enabled, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), @@ -3256,6 +3255,9 @@ standardConfig static_configs[] = { createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), + createIntConfig("min-io-threads-copy-avoid-on", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_for_copy_avoid, 7, INTEGER_CONFIG, NULL, NULL), + createIntConfig("min-bulk-string-size-copy-avoid-on", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_bulk_string_size_for_copy_avoid, 65536, INTEGER_CONFIG, NULL, NULL), + createIntConfig("min-io-threads-value-prefetch-off", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_value_prefetch_off, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */ diff --git a/src/io_threads.c b/src/io_threads.c index 3b14e7a177f..ffcbd270545 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -618,3 +618,13 @@ int trySendAcceptToIOThreads(connection *conn) { return C_OK; } + +int isCopyAvoidIndicatedByIOThreads(void) { + /* Starting min_io_threads_for_copy_avoid I/O threads copy avoidance should be beneficial for any string size */ + return server.min_io_threads_for_copy_avoid && server.io_threads_num >= server.min_io_threads_for_copy_avoid; +} + +int isValuePrefetchIndicatedByIOThreads(void) { + /* Starting min_io_threads_value_prefetch_off I/O threads copy avoidance should be more efficient without value prefetch */ + return server.io_threads_num < server.min_io_threads_value_prefetch_off; +} diff --git a/src/io_threads.h b/src/io_threads.h index a3ff582a770..abadf096cd5 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -14,5 +14,7 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only); void drainIOThreadsQueue(void); void trySendPollJobToIOThreads(void); int trySendAcceptToIOThreads(connection *conn); +int isCopyAvoidIndicatedByIOThreads(void); +int isValuePrefetchIndicatedByIOThreads(void); #endif /* IO_THREADS_H */ diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index 7726749ad0c..5910226c771 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -9,6 +9,7 @@ #include "memory_prefetch.h" #include "server.h" +#include "io_threads.h" typedef enum { PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */ @@ -119,9 +120,8 @@ static void prefetchEntry(KeyPrefetchInfo *info) { if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) { /* Not done yet */ moveToNextKey(); - /* If reply offload enabled no need to prefetch value because main thread will not access it */ - } else if (server.reply_offload_enabled) { - markKeyAsdone(info); + } else if (!isValuePrefetchIndicatedByIOThreads()) { + markKeyAsdone(info); } else { info->state = PREFETCH_VALUE; } diff --git a/src/networking.c b/src/networking.c index e8c2a332353..8f498db9dd7 100644 --- a/src/networking.c +++ b/src/networking.c @@ -42,7 +42,6 @@ #include #include #include -#include #include /* This struct is used to encapsulate filtering criteria for operations on clients @@ -68,19 +67,25 @@ typedef struct { } clientFilter; typedef enum { - CLIENT_REPLY_PAYLOAD_DATA = 0, - CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD, -} clientReplyPayloadType; + CLIENT_REPLY_PLAIN = 0, + CLIENT_REPLY_BULK_COPY_AVOID, +} clientReplyType; /* Reply payload header */ typedef struct __attribute__((__packed__)) payloadHeader { - size_t len; /* payload length in a reply buffer */ - size_t actual_len; /* actual reply length after offload expanding */ - uint8_t type; /* one of clientReplyPayloadType */ - int16_t slot; /* to report network-bytes-out for offloads */ - + size_t len; /* payload length in a reply buffer */ + size_t actual_len; /* actual reply length for bulk offloads */ + uint8_t type; /* one of clientReplyType */ + int16_t slot; /* to report network-bytes-out for offloads */ } payloadHeader; +/* To avoid copy of whole string object object in reply buffer + * we store store pointers to object and string itself */ +typedef struct __attribute__((__packed__)) bulkCopyAvoid { + robj *obj; /* pointer to object used for reference count management */ + void *str; /* pointer to string to optimize memory access by I/O thread */ +} bulkCopyAvoid; + static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); @@ -164,22 +169,29 @@ static inline int isReplicaReadyForReplData(client *replica) { !(replica->flag.close_asap); } -/* - * Reply offload can be allowed only for regular Valkey clients - * that use _writeToClient handler to write replies to client connection - */ -static bool isReplyOffloadAllowable(client *c) { - if (c->flag.fake) { - return false; - } +static int isCopyAvoidIndicatedBySize(robj *obj) { + if (!obj) return 0; + + if (!server.min_bulk_string_size_for_copy_avoid) return 0; + + return sdslen(obj->ptr) >= (size_t)server.min_bulk_string_size_for_copy_avoid; +} + + +static int isCopyAvoidAllowed(client *c, robj *obj) { + if (c->flag.fake) return 0; - switch (getClientType(c)) { - case CLIENT_TYPE_NORMAL: - case CLIENT_TYPE_PUBSUB: - return true; - default: - return false; + /* Copy avoidance can be allowed only for regular Valkey clients + * that use _writeToClient handler to write replies to client connection */ + int type = getClientType(c); + if (type != CLIENT_TYPE_NORMAL && type != CLIENT_TYPE_PUBSUB) return 0; + + if (obj) { + if (obj->encoding != OBJ_ENCODING_RAW) return 0; + if (obj->refcount == OBJ_STATIC_REFCOUNT) return 0; } + + return isCopyAvoidIndicatedByIOThreads() || isCopyAvoidIndicatedBySize(obj); } client *createClient(connection *conn) { @@ -316,18 +328,6 @@ void putClientInPendingWriteQueue(client *c) { } } -/* - * Activate/deactivate reply offload for the client - * according to server config - */ -static void updateReplyOffloadFlag(client *c) { - if (server.reply_offload_enabled && !c->flag.reply_offload && isReplyOffloadAllowable(c)) { - c->flag.reply_offload = 1; - } else if (!server.reply_offload_enabled && c->flag.reply_offload) { - c->flag.reply_offload = 0; - } -} - /* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * @@ -374,11 +374,7 @@ int prepareClientToWrite(client *c) { /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ - if (!clientHasPendingReplies(c)) { - /* We can change reply offload mode for the client only when its reply buffers are empty. */ - updateReplyOffloadFlag(c); - putClientInPendingWriteQueue(c); - } + if (!clientHasPendingReplies(c)) putClientInPendingWriteQueue(c); /* Authorize the caller to queue in the output buffer of this client. */ return C_OK; @@ -430,14 +426,13 @@ void deleteCachedResponseClient(client *recording_client) { /* ----------------------------------------------------------------------------- * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ -static inline void insertPayloadHeader(char *buf, size_t *bufpos, uint8_t type, size_t len, int slot, payloadHeader **last_header) { - /* Save the latest header */ - *last_header = (payloadHeader *)(buf + *bufpos); +static inline void insertPayloadHeader(char *buf, size_t *bufpos, uint8_t type, size_t len, int slot, payloadHeader **new_header) { + *new_header = (payloadHeader *)(buf + *bufpos); - (*last_header)->type = type; - (*last_header)->len = len; - (*last_header)->slot = slot; - (*last_header)->actual_len = 0; + (*new_header)->type = type; + (*new_header)->len = len; + (*new_header)->slot = slot; + (*new_header)->actual_len = 0; *bufpos += sizeof(payloadHeader); } @@ -450,27 +445,29 @@ static inline int updatePayloadHeader(payloadHeader *last_header, uint8_t type, return C_ERR; } +/* Update an existing header, if possible; otherwise insert a new one + * Returns the length of data that can be added to the reply buffer (i.e. min(available, requested)) */ static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **last_header, uint8_t type, size_t len, int slot, size_t available) { /* Enforce min len for offloads as whole pointers must be written to the buffer */ - size_t min_len = (type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD ? len : 1); + size_t min_len = (type == CLIENT_REPLY_BULK_COPY_AVOID ? len : 1); if (min_len > available) return 0; size_t reply_len = min(available, len); // If cluster slots stats disabled set slot to -1 to prevent excessive per slot headers - if (!clusterSlotStatsEnabled()) slot = -1; + if (!clusterSlotStatsEnabled(slot)) slot = -1; /* Try to add payload to last chunk if possible */ if (*last_header != NULL) { if (updatePayloadHeader(*last_header, type, reply_len, slot) == C_OK) return reply_len; } - /* Recheck min len condition and recalcuate allowed len with a new header to be added */ + /* Recheck min len condition and recalculate allowed len with a new header to be added */ if (sizeof(payloadHeader) + min_len > available) return 0; available -= sizeof(payloadHeader); if (len > available) reply_len = available; /* Start a new payload chunk */ - insertPayloadHeader(buf, bufpos, type, reply_len, slot, last_header); + insertPayloadHeader(buf, bufpos, type, reply_len, slot, last_header); return reply_len; } @@ -488,7 +485,7 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le size_t available = c->buf_usable_size - c->bufpos; size_t reply_len = min(available, len); - if (c->flag.reply_offload) { + if (c->flag.buf_encoded) { reply_len = upsertPayloadHeader(c->buf, &c->bufpos, &c->last_header, payload_type, len, c->slot, available); } if (!reply_len) return 0; @@ -500,12 +497,21 @@ static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t le return reply_len; } -size_t _addReplyToBuffer(client *c, const char *s, size_t len) { - return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PAYLOAD_DATA); +static size_t _addReplyToBuffer(client *c, const char *s, size_t len) { + if (!len) return 0; + if (!c->bufpos) { + c->flag.buf_encoded = isCopyAvoidAllowed(c, NULL); + } + return _addReplyPayloadToBuffer(c, s, len, CLIENT_REPLY_PLAIN); } -size_t _addBulkOffloadToBuffer(client *c, robj *obj) { - return _addReplyPayloadToBuffer(c, &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); +static size_t _addBulkOffloadToBuffer(client *c, const void *payload, size_t len) { + if (!c->flag.buf_encoded) { + /* If buffer is plain and not empty then can't add bulk offload to it */ + if (c->bufpos) return 0; + c->flag.buf_encoded = 1; + } + return _addReplyPayloadToBuffer(c, payload, len, CLIENT_REPLY_BULK_COPY_AVOID); } /* Adds the reply to the reply linked list. @@ -513,6 +519,8 @@ size_t _addBulkOffloadToBuffer(client *c, robj *obj) { static void _addReplyPayloadToList(client *c, list *reply_list, const char *payload, size_t len, uint8_t payload_type) { listNode *ln = listLast(reply_list); clientReplyBlock *tail = ln ? listNodeValue(ln) : NULL; + /* Determine if buffer content should be encoded with header */ + int encoded = payload_type == CLIENT_REPLY_BULK_COPY_AVOID || isCopyAvoidAllowed(c, NULL); /* Note that 'tail' may be NULL even if we have a tail node, because when * addReplyDeferredLen() is used, it sets a dummy node to NULL just @@ -524,9 +532,13 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl * new node */ size_t avail = tail->size - tail->used; size_t copy = avail >= len ? len : avail; - if (c->flag.reply_offload) { + + if (tail->flag.buf_encoded) { copy = upsertPayloadHeader(tail->buf, &tail->used, &tail->last_header, payload_type, len, c->slot, avail); + } else if (encoded) { + copy = 0; } + if (copy) { memcpy(tail->buf + tail->used, payload, copy); tail->used += copy; @@ -538,14 +550,15 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl /* Create a new node, make sure it is allocated to at * least PROTO_REPLY_CHUNK_BYTES */ size_t usable_size; - size_t required_size = c->flag.reply_offload ? len + sizeof(payloadHeader) : len; + size_t required_size = encoded ? len + sizeof(payloadHeader) : len; size_t size = required_size < PROTO_REPLY_CHUNK_BYTES ? PROTO_REPLY_CHUNK_BYTES : required_size; tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size); /* take over the allocation's internal fragmentation */ tail->size = usable_size - sizeof(clientReplyBlock); tail->used = 0; + tail->flag.buf_encoded = encoded; tail->last_header = NULL; - if (c->flag.reply_offload) { + if (tail->flag.buf_encoded) { upsertPayloadHeader(tail->buf, &tail->used, &tail->last_header, payload_type, len, c->slot, tail->size); } memcpy(tail->buf + tail->used, payload, len); @@ -558,11 +571,8 @@ static void _addReplyPayloadToList(client *c, list *reply_list, const char *payl } void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) { - _addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PAYLOAD_DATA); -} - -void _addBulkOffloadToList(client *c, robj *obj) { - _addReplyPayloadToList(c, c->reply, (char*) &obj, sizeof(void*), CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); + if (!len) return; + _addReplyPayloadToList(c, reply_list, s, len, CLIENT_REPLY_PLAIN); } /* The subscribe / unsubscribe command family has a push as a reply, @@ -613,11 +623,12 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { void _addBulkOffloadToBufferOrList(client *c, robj *obj) { if (c->flag.close_after_reply) return; - /* Refcount will be decremented in post write handler (i.e. in _postWriteToClient) */ + /* Refcount will be decremented in write completion handler by the main thread */ incrRefCount(obj); - if (!_addBulkOffloadToBuffer(c, obj)) { - _addBulkOffloadToList(c, obj); + bulkCopyAvoid copyAvoid = {.obj = obj, .str = obj->ptr}; + if (!_addBulkOffloadToBuffer(c, (void *)©Avoid, sizeof(copyAvoid))) { + _addReplyPayloadToList(c, c->reply, (char *)©Avoid, sizeof(copyAvoid), CLIENT_REPLY_BULK_COPY_AVOID); } } @@ -913,7 +924,7 @@ void trimReplyUnusedTailSpace(client *c) { * Also, to avoid large memmove which happens as part of realloc, we only do * that if the used part is small. */ if (tail->size - tail->used > tail->size / 4 && tail->used < PROTO_REPLY_CHUNK_BYTES && - c->io_write_state != CLIENT_PENDING_IO && !c->flag.reply_offload) { + c->io_write_state != CLIENT_PENDING_IO && !tail->flag.buf_encoded) { size_t usable_size; size_t old_size = tail->size; tail = zrealloc_usable(tail, tail->used + sizeof(clientReplyBlock), &usable_size); @@ -974,8 +985,8 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { * - It has enough room already allocated * - And not too large (avoid large memmove) * - And the client is not in a pending I/O state */ - if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size - prev->used > 0 && - c->io_write_state != CLIENT_PENDING_IO && !c->flag.reply_offload) { + if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size > prev->used && + c->io_write_state != CLIENT_PENDING_IO && !prev->flag.buf_encoded) { size_t len_to_copy = prev->size - prev->used; if (len_to_copy > length) len_to_copy = length; memcpy(prev->buf + prev->used, s, len_to_copy); @@ -989,7 +1000,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { } if (ln->next != NULL && (next = listNodeValue(ln->next)) && next->size - next->used >= length && - next->used < PROTO_REPLY_CHUNK_BYTES * 4 && c->io_write_state != CLIENT_PENDING_IO && !c->flag.reply_offload) { + next->used < PROTO_REPLY_CHUNK_BYTES * 4 && c->io_write_state != CLIENT_PENDING_IO && !next->flag.buf_encoded) { memmove(next->buf + length, next->buf, next->used); memcpy(next->buf, s, length); next->used += length; @@ -997,18 +1008,12 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { } else { /* Create a new node */ size_t usable_size; - size_t required_size = c->flag.reply_offload ? length + sizeof(payloadHeader) : length; - clientReplyBlock *buf = zmalloc_usable(required_size + sizeof(clientReplyBlock), &usable_size); + clientReplyBlock *buf = zmalloc_usable(length + sizeof(clientReplyBlock), &usable_size); /* Take over the allocation's internal fragmentation */ buf->size = usable_size - sizeof(clientReplyBlock); - buf->used = 0; - buf->last_header = 0; - if (c->flag.reply_offload) { - upsertPayloadHeader(buf->buf, &buf->used, &buf->last_header, CLIENT_REPLY_PAYLOAD_DATA, length, c->slot, buf->size); - } - memcpy(buf->buf + buf->used, s, length); - buf->used += length; - + buf->used = length; + buf->flag.buf_encoded = 0; + memcpy(buf->buf, s, length); listNodeValue(ln) = buf; c->reply_bytes += buf->size; @@ -1272,9 +1277,8 @@ void addReplyBulkLen(client *c, robj *obj) { } int tryOffloadBulkReply(client *c, robj *obj) { - if (!c->flag.reply_offload) return C_ERR; - if (obj->encoding != OBJ_ENCODING_RAW) return C_ERR; - if (obj->refcount == OBJ_STATIC_REFCOUNT) return C_ERR; + + if (!isCopyAvoidAllowed(c, obj)) return C_ERR; if (prepareClientToWrite(c) != C_OK) return C_ERR; _addBulkOffloadToBufferOrList(c, obj); @@ -1455,6 +1459,7 @@ void AddReplyFromClient(client *dst, client *src) { } /* First add the static buffer (either into the static buffer or reply list) */ + serverAssert(src->flag.buf_encoded == 0); addReplyProto(dst, src->buf, src->bufpos); /* We need to check with prepareClientToWrite again (after addReplyProto) @@ -1885,7 +1890,7 @@ void freeClient(client *c) { freeClientPubSubData(c); /* Free data structures. */ - if (c->flag.reply_offload) releaseReplyOffloads(c); + releaseReplyOffloads(c); listRelease(c->reply); c->reply = NULL; zfree_with_size(c->buf, c->buf_usable_size); @@ -2131,16 +2136,15 @@ typedef struct replyIOV { int cnt; int max; struct iovec *iov; - ssize_t iov_len_total; /* Total length of data pointed by iov array */ - size_t last_written_len; /* Length of data in the last written buffer - * partially written in previous writevToClient invocation */ - int limit_reached; /* Non zero if either max iov count or NET_MAX_WRITES_PER_EVENT limit - * reached during iov prepearation */ - int offload_active; - int prfxcnt; /* prfxcnt, prefixes and clrf are auxiliary fields - * for expanding reply offloads */ - char (*prefixes)[LONG_STR_SIZE + 3]; - char *crlf; + ssize_t iov_len_total; /* Total length of data pointed by iov array */ + size_t last_written_len; /* Length of data in the last written buffer + * partially written in previous writevToClient invocation */ + int limit_reached; /* Non zero if either max iov count or NET_MAX_WRITES_PER_EVENT limit + * reached during iov prepearation */ + int prfxcnt; /* prfxcnt, prefixes and clrf are auxiliary fields + * for expanding bulk offloads */ + char (*prefixes)[LONG_STR_SIZE + 3]; /* bulk string prefixes */ + char *crlf; /* bulk string suffix */ } replyIOV; /* @@ -2150,7 +2154,7 @@ typedef struct replyIOV { typedef struct bufWriteMetadata { char *buf; size_t bufpos; - uint64_t data_len; /* Actual bytes out, differ from bufpos in case of reply offload */ + uint64_t data_len; /* Actual bytes out, differ from bufpos in case of copy avoidance */ int complete; /* Was the buffer completely scattered to iov or process stopped due encountered limit */ } bufWriteMetadata; @@ -2162,26 +2166,24 @@ static void initReplyIOV(client *c, int iovmax, struct iovec *iov_arr, char (*pr reply->iov = iov_arr; reply->iov_len_total = 0; reply->last_written_len = c->io_last_written_data_len; - reply->offload_active = c->flag.reply_offload; - if (reply->offload_active) { - reply->prfxcnt = 0; - reply->prefixes = prefixes; - reply->crlf = crlf; - } + reply->prfxcnt = 0; + reply->prefixes = prefixes; + reply->crlf = crlf; } static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) { if (reply->limit_reached) return; - if (reply->cnt == reply->max || reply->iov_len_total > NET_MAX_WRITES_PER_EVENT) { + if (reply->cnt == reply->max) { reply->limit_reached = 1; return; } - /* Aggregate data len from the beginning of the buffer even though - * part of the data should be skipped in this round due to last_written_len */ + /* Aggregate data length from the beginning of the buffer even though + * part of the data can be skipped in this writevToClient invocation due to last_written_len */ metadata->data_len += buf_len; + /* Skip data written in the previous writevToClient invocation(s) */ if (reply->last_written_len >= buf_len) { reply->last_written_len -= buf_len; return; @@ -2194,13 +2196,13 @@ static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, reply->iov_len_total += reply->iov[reply->cnt++].iov_len; } -static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) { +static void addBulkStringToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) { + bulkCopyAvoid *copyAvoid = (bulkCopyAvoid *)buf; while (buf_len > 0 && !reply->limit_reached) { - robj **obj = (robj **)buf; - char *str = (*obj)->ptr; - size_t str_len = stringObjectLen(*obj); + size_t str_len = sdslen(copyAvoid->str); - char* prefix = reply->prefixes[reply->prfxcnt]; + /* RESP encodes bulk strings as $\r\n\r\n */ + char *prefix = reply->prefixes[reply->prfxcnt]; prefix[0] = '$'; size_t num_len = ll2string(prefix + 1, sizeof(reply->prefixes[0]) - 3, str_len); prefix[num_len + 1] = '\r'; @@ -2208,50 +2210,65 @@ static void addOffloadedBulkToReplyIOV(char *buf, size_t buf_len, replyIOV *repl int cnt = reply->cnt; addPlainBufferToReplyIOV(reply->prefixes[reply->prfxcnt], num_len + 3, reply, metadata); - /* Increment prfxcnt only if prefix was added to reply in this round */ + /* Increment prfxcnt only if prefix was added to reply in this writevToClient invocation */ if (reply->cnt > cnt) reply->prfxcnt++; - addPlainBufferToReplyIOV(str, str_len, reply, metadata); + addPlainBufferToReplyIOV(copyAvoid->str, str_len, reply, metadata); addPlainBufferToReplyIOV(reply->crlf, 2, reply, metadata); - buf += sizeof(void*); - buf_len -= sizeof(void*); + copyAvoid++; + buf_len -= sizeof(bulkCopyAvoid); } } -static void addCompoundBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) { +static void addEncodedBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) { char *ptr = buf; while (ptr < buf + bufpos && !reply->limit_reached) { - payloadHeader *header = (payloadHeader*)ptr; + payloadHeader *header = (payloadHeader *)ptr; ptr += sizeof(payloadHeader); - if (header->type == CLIENT_REPLY_PAYLOAD_DATA) { + if (header->type == CLIENT_REPLY_PLAIN) { addPlainBufferToReplyIOV(ptr, header->len, reply, metadata); } else { - serverAssert(header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); uint64_t data_len = metadata->data_len; - addOffloadedBulkToReplyIOV(ptr, header->len, reply, metadata); + addBulkStringToReplyIOV(ptr, header->len, reply, metadata); /* Store actual reply len for cluster slot stats */ header->actual_len = metadata->data_len - data_len; } ptr += header->len; } - serverAssert(ptr <= buf + bufpos); } -static void addBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) { +static void addBufferToReplyIOV(int encoded, char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) { metadata->data_len = 0; - if (reply->offload_active) { - addCompoundBufferToReplyIOV(buf, bufpos, reply, metadata); + if (encoded) { + addEncodedBufferToReplyIOV(buf, bufpos, reply, metadata); metadata->complete = !reply->limit_reached; } else { addPlainBufferToReplyIOV(buf, bufpos, reply, metadata); metadata->complete = 1; } + if (reply->iov_len_total > NET_MAX_WRITES_PER_EVENT) { + reply->limit_reached = 1; + } + metadata->buf = buf; metadata->bufpos = bufpos; } +/* + * This function calculates and stores on the client next: + * io_last_written_buf - Last buffer that has been written to the client connection + * io_last_written_bufpos - The buffer has been written until this position + * io_last_written_data_len - The actual length of the data written from this buffer + * This length differs from written bufpos in case of copy avoidance + * + * The io_last_written_buf and io_last_written_bufpos are used by _postWriteToClient + * to detect last client reply buffer that can be released + * + * The io_last_written_data_len is used by writevToClient for resuming write from the point + * where previous writevToClient invocation stopped + **/ static void saveLastWrittenBuf(client *c, bufWriteMetadata *metadata, int bufcnt, size_t totlen, size_t totwritten) { int last = bufcnt - 1; if (totwritten == totlen) { @@ -2286,49 +2303,25 @@ void proceedToUnwritten(replyIOV *reply, int nwritten) { } } +/* Bulk string prefix max size */ +#define BULK_STRING_LEN_PREFIX_MAX_SIZE (LONG_STR_SIZE + 3) +/* Bulk string offload requires 3 iov entries - + * length prefix ($\r\n), string () and suffix (\r\n) */ +#define NUM_OF_IOV_PER_BULK_OFFLOAD 3 + /* This function should be called from _writeToClient when the reply list is not empty, * it gathers the scattered buffers from reply list and sends them away with connWritev. * If we write successfully, it returns C_OK, otherwise, C_ERR is returned. * Sets the c->nwritten to the number of bytes the server wrote to the client. - * Can be called from the main thread or an I/O thread - * - * INTERNALS - * The writevToClient strives to write all client reply buffers to the client connection. - * However, it may encounter NET_MAX_WRITES_PER_EVENT or IOV_MAX or socket limit. In such case, - * some client reply buffers will be written completely and some partially. - * In next invocation writevToClient should resume from the exact position where it stopped. - * Also writevToClient should communicate to _postWriteToClient which buffers written completely - * and can be released. It is intricate in case of reply offloading as length of reply buffer does not match - * to network bytes out. - * - * For this purpose, writevToClient uses 3 data members on the client struct as input/output paramaters: - * io_last_written_buf - Last buffer that has been written to the client connection - * io_last_written_bufpos - The buffer has been written until this position - * io_last_written_data_len - The actual length of the data written from this buffer - * This length differs from written bufpos in case of reply offload - * - * The writevToClient uses addBufferToReplyIOV, addCompoundBufferToReplyIOV, addOffloadedBulkToReplyIOV, addPlainBufferToReplyIOV - * to build reply iovec array. These functions know to skip io_last_written_data_len, specifically addPlainBufferToReplyIOV - * - * In the end of execution writevToClient calls saveLastWrittenBuf for calculating "last written" buf/pos/data_len - * and storing on the client. While building reply iov, writevToClient gathers auxiliary bufWriteMetadata that - * helps in this calculation. In some cases, It may take several (> 2) invocations for writevToClient to write reply - * from a single buffer but saveLastWrittenBuf knows to calculate "last written" buf/pos/data_len properly - * - * The _postWriteToClient uses io_last_written_buf and io_last_written_bufpos in order to detect completely written buffers - * and release them - * - * */ + * Can be called from the main thread or an I/O thread */ static int writevToClient(client *c) { int iovmax = min(IOV_MAX, c->conn->iovcnt); struct iovec iov_arr[iovmax]; - char prefixes[iovmax / 3 + 1][LONG_STR_SIZE + 3]; + /* iov_arr can accommodate iovmax / NUM_OF_IOV_PER_BULK_OFFLOAD full bulk offloads + * and one partial bulk offload */ + char prefixes[iovmax / NUM_OF_IOV_PER_BULK_OFFLOAD + 1][BULK_STRING_LEN_PREFIX_MAX_SIZE]; char crlf[2] = {'\r', '\n'}; int bufcnt = 0; - bufWriteMetadata metadata[listLength(c->reply) + 1]; - - replyIOV reply; - initReplyIOV(c, iovmax, iov_arr, prefixes, crlf, &reply); size_t bufpos = 0; listNode *lastblock; @@ -2340,40 +2333,46 @@ static int writevToClient(client *c) { bufpos = lastblock ? (size_t)c->bufpos : c->io_last_bufpos; } + int reply_blocks = (lastblock ? listLength(c->reply) : 0); + /* +1 is for c->buf */ + bufWriteMetadata metadata[reply_blocks + 1]; + + replyIOV reply; + initReplyIOV(c, iovmax, iov_arr, prefixes, crlf, &reply); + /* If the static reply buffer is not empty, * add it to the iov array for writev() as well. */ if (bufpos > 0) { - addBufferToReplyIOV(c->buf, bufpos, &reply, &metadata[bufcnt++]); + addBufferToReplyIOV(c->flag.buf_encoded, c->buf, bufpos, &reply, &metadata[bufcnt++]); } - listIter iter; - listNode *next; - listRewind(c->reply, &iter); - while ((next = listNext(&iter)) && !reply.limit_reached) { - clientReplyBlock *o = listNodeValue(next); + if (lastblock) { + listIter iter; + listNode *next; + listRewind(c->reply, &iter); + while ((next = listNext(&iter)) && !reply.limit_reached) { + clientReplyBlock *o = listNodeValue(next); - size_t used = o->used; - /* Use c->io_last_bufpos as the currently used portion of the block. - * We use io_last_bufpos instead of o->used to ensure that we only access data guaranteed to be visible to the - * current thread. Using o->used, which may have been updated by the main thread, could lead to accessing data - * that may not yet be visible to the current thread*/ - if (!inMainThread() && next == lastblock) used = c->io_last_bufpos; + size_t used = o->used; + /* Use c->io_last_bufpos as the currently used portion of the block. + * We use io_last_bufpos instead of o->used to ensure that we only access data guaranteed to be visible to the + * current thread. Using o->used, which may have been updated by the main thread, could lead to accessing data + * that may not yet be visible to the current thread*/ + if (!inMainThread() && next == lastblock) used = c->io_last_bufpos; - if (used == 0) { /* empty node, skip over it. */ - if (next == lastblock) break; - continue; - } + if (used == 0) { /* empty node, skip over it. */ + if (next == lastblock) break; + continue; + } - addBufferToReplyIOV(o->buf, used, &reply, &metadata[bufcnt]); - if (!metadata[bufcnt].data_len) break; - bufcnt++; + addBufferToReplyIOV(o->flag.buf_encoded, o->buf, used, &reply, &metadata[bufcnt]); + if (!metadata[bufcnt].data_len) break; + bufcnt++; - if (next == lastblock) break; + if (next == lastblock) break; + } } - serverAssert(reply.last_written_len == 0); - serverAssert(reply.cnt != 0); - ssize_t totwritten = 0; while (1) { int nwritten = connWritev(c->conn, reply.iov, reply.cnt); @@ -2429,7 +2428,7 @@ int _writeToClient(client *c) { } /* If the reply list is not empty, use writev to save system calls and TCP packets */ - if (lastblock || c->flag.reply_offload) return writevToClient(c); + if (lastblock || c->flag.buf_encoded) return writevToClient(c); serverAssert(c->io_last_written_data_len == 0 || c->io_last_written_buf == c->buf); ssize_t bytes_to_write = bufpos - c->io_last_written_data_len; @@ -2466,24 +2465,27 @@ static void releaseBufOffloads(char *buf, size_t bufpos) { payloadHeader *header = (payloadHeader *)ptr; ptr += sizeof(payloadHeader); - if (header->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD) { + if (header->type == CLIENT_REPLY_BULK_COPY_AVOID) { clusterSlotStatsAddNetworkBytesOutForSlot(header->slot, header->actual_len); - robj** obj_ptr = (robj**)ptr; + bulkCopyAvoid *copyAvoid = (bulkCopyAvoid *)ptr; size_t len = header->len; while (len > 0) { - decrRefCount(*obj_ptr); - obj_ptr++; - len -= sizeof(obj_ptr); + decrRefCount(copyAvoid->obj); + copyAvoid++; + len -= sizeof(bulkCopyAvoid); } + } else { + serverAssert(header->type == CLIENT_REPLY_PLAIN); } ptr += header->len; } + serverAssert(ptr == buf + bufpos); } void releaseReplyOffloads(client *c) { - if (c->bufpos > 0) { + if (c->bufpos > 0 && c->flag.buf_encoded) { releaseBufOffloads(c->buf, c->bufpos); } @@ -2492,14 +2494,12 @@ void releaseReplyOffloads(client *c) { listRewind(c->reply, &iter); while ((next = listNext(&iter))) { clientReplyBlock *o = (clientReplyBlock *)listNodeValue(next); - releaseBufOffloads(o->buf, o->used); + if (o->flag.buf_encoded) { + releaseBufOffloads(o->buf, o->used); + } } } -/* - * See INTERNALS note on writevToClient for explanation about - * io_last_written_buf and io_last_written_bufpos - */ static void _postWriteToClient(client *c) { if (c->nwritten <= 0) return; server.stat_net_output_bytes += c->nwritten; @@ -2508,8 +2508,9 @@ static void _postWriteToClient(client *c) { if (c->bufpos > 0) { last_written = (c->buf == c->io_last_written_buf); if (!last_written || c->bufpos == c->io_last_written_bufpos) { - if (c->flag.reply_offload) releaseBufOffloads(c->buf, c->bufpos); + if (c->flag.buf_encoded) releaseBufOffloads(c->buf, c->bufpos); c->bufpos = 0; + c->flag.buf_encoded = 0; c->last_header = 0; if (last_written) resetLastWrittenBuf(c); } @@ -2524,7 +2525,7 @@ static void _postWriteToClient(client *c) { last_written = (o->buf == c->io_last_written_buf); if (!last_written || o->used == c->io_last_written_bufpos) { c->reply_bytes -= o->size; - if (c->flag.reply_offload) releaseBufOffloads(o->buf, o->used); + if (o->flag.buf_encoded) releaseBufOffloads(o->buf, o->used); listDelNode(c->reply, next); if (last_written) resetLastWrittenBuf(c); } diff --git a/src/replication.c b/src/replication.c index 1e418acaf61..7f4794c2f10 100644 --- a/src/replication.c +++ b/src/replication.c @@ -889,7 +889,8 @@ int primaryTryPartialResynchronization(client *c, long long psync_offset) { * 4) Send the backlog data (from the offset to the end) to the replica. */ waitForClientIO(c); c->flag.replica = 1; - c->flag.reply_offload = 0; + serverAssert(c->bufpos == 0); + c->flag.buf_encoded = 0; if (c->repl_data->associated_rdb_client_id && lookupRdbClientByID(c->repl_data->associated_rdb_client_id)) { c->repl_data->repl_state = REPLICA_STATE_BG_RDB_LOAD; removeReplicaFromPsyncWait(c); @@ -1156,7 +1157,8 @@ void syncCommand(client *c) { if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ c->repl_data->repldbfd = -1; c->flag.replica = 1; - c->flag.reply_offload = 0; + serverAssert(c->bufpos == 0); + c->flag.buf_encoded = 0; listAddNodeTail(server.replicas, c); /* Create the replication backlog if needed. */ @@ -4222,10 +4224,8 @@ void replicationCachePrimary(client *c) { server.primary->repl_data->repl_applied = 0; server.primary->repl_data->read_reploff = server.primary->repl_data->reploff; if (c->flag.multi) discardTransaction(c); - if (c->flag.reply_offload) { - releaseReplyOffloads(c); - resetLastWrittenBuf(c); - } + releaseReplyOffloads(c); + resetLastWrittenBuf(c); listEmpty(c->reply); c->reply_bytes = 0; c->bufpos = 0; diff --git a/src/server.h b/src/server.h index c5ff29a1d64..b72fb9e0354 100644 --- a/src/server.h +++ b/src/server.h @@ -784,11 +784,20 @@ struct evictionPoolEntry; /* Defined in evict.c */ typedef struct payloadHeader payloadHeader; /* Defined in networking.c */ +typedef struct ClientReplyBlockFlags { + uint8_t buf_encoded : 1; /* True if reply block buf content is encoded (e.g. for copy avoidance) */ + uint8_t reserved : 7; +} ClientReplyBlockFlags; + /* This structure is used in order to represent the output buffer of a client, * which is actually a linked list of blocks like that, that is: client->reply. */ typedef struct clientReplyBlock { size_t size, used; - payloadHeader* last_header; + union { + uint8_t raw_flag; + ClientReplyBlockFlags flag; + }; + payloadHeader *last_header; char buf[]; } clientReplyBlock; @@ -1016,7 +1025,7 @@ typedef struct { /* General */ int saved; /* 1 if we already saved the offset (first time we call addReply*) */ /* Offset within the static reply buffer */ - int bufpos; + size_t bufpos; /* Offset within the reply block list */ struct { int index; @@ -1057,7 +1066,7 @@ typedef struct ClientFlags { uint64_t prevent_prop : 1; /* Don't propagate to AOF or replicas. */ uint64_t pending_write : 1; /* Client has output to send but a write handler is yet not installed. */ uint64_t pending_read : 1; /* Client has output to send but a write handler is yet not installed. */ - uint64_t reply_offload : 1; /* Client is in reply offload mode */ + uint64_t buf_encoded : 1; /* True if c->buf content is encoded (e.g. for copy avoidance) */ uint64_t reply_off : 1; /* Don't send replies to client. */ uint64_t reply_skip_next : 1; /* Set CLIENT_REPLY_SKIP for next cmd */ uint64_t reply_skip : 1; /* Don't send just this reply. */ @@ -1211,16 +1220,16 @@ typedef struct client { list *reply; /* List of reply objects to send to the client. */ listNode *io_last_reply_block; /* Last client reply block when sent to IO thread */ size_t io_last_bufpos; /* The client's bufpos at the time it was sent to the IO thread */ - char* io_last_written_buf; /* Last buffer that has been written to the client connection */ + char *io_last_written_buf; /* Last buffer that has been written to the client connection */ size_t io_last_written_bufpos; /* The buffer has been written until this position */ size_t io_last_written_data_len; /* The actual length of the data written from this buffer This length differs from written bufpos in case of reply offload */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ listNode clients_pending_write_node; /* list node in clients_pending_write or in clients_pending_io_write list */ size_t bufpos; - payloadHeader* last_header; /* Pointer to the last header in a buffer in reply offload mode */ - int original_argc; /* Num of arguments of original command if arguments were rewritten. */ - robj **original_argv; /* Arguments of original command if arguments were rewritten. */ + payloadHeader *last_header; /* Pointer to the last header in a buffer in reply offload mode */ + int original_argc; /* Num of arguments of original command if arguments were rewritten. */ + robj **original_argv; /* Arguments of original command if arguments were rewritten. */ /* Client flags and state indicators */ union { uint64_t raw_flag; @@ -1666,9 +1675,11 @@ struct valkeyServer { int io_threads_num; /* Number of IO threads to use. */ int active_io_threads_num; /* Current number of active IO threads, includes main thread. */ int events_per_io_thread; /* Number of events on the event loop to trigger IO threads activation. */ + int min_io_threads_for_copy_avoid; /* Minimum number of IO threads for copy avoidance */ + int min_bulk_string_size_for_copy_avoid; /* Minimum bulk string size for copy avoidance */ + int min_io_threads_value_prefetch_off; /* Minimum number of IO threads for disabling value prefetch */ int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ - int reply_offload_enabled; /* Reply offload enabled or not */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */ diff --git a/src/unit/test_networking.c b/src/unit/test_networking.c index 6eeb20302a1..ef7553db258 100644 --- a/src/unit/test_networking.c +++ b/src/unit/test_networking.c @@ -130,15 +130,15 @@ int test_rewriteClientCommandArgument(int argc, char **argv, int flags) { return 0; } -static client* createTestClient(void) { +static client *createTestClient(void) { client *c = zcalloc(sizeof(client)); c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size); c->reply = listCreate(); listSetFreeMethod(c->reply, freeClientReplyValue); listSetDupMethod(c->reply, dupClientReplyValue); - c->flag.reply_offload = 1; - c->flag.fake = 1; + /* dummy connection to bypass assert in closeClientOnOutputBufferLimitReached */ + c->conn = (connection *)c; return c; } @@ -149,23 +149,26 @@ static void freeReplyOffloadClient(client *c) { zfree(c); } +/* Each bulk offload puts 2 pointers to a reply buffer */ +#define PTRS_LEN (sizeof(void *) * 2) + int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) { UNUSED(argc); UNUSED(argv); UNUSED(flags); - client * c = createTestClient(); + client *c = createTestClient(); /* Test 1: Add bulk offloads to the buffer */ robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test")); _addBulkOffloadToBufferOrList(c, obj); TEST_ASSERT(obj->refcount == 2); - TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + sizeof(void*)); + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + PTRS_LEN); payloadHeader *header1 = c->last_header; - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == sizeof(void*)); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_COPY_AVOID); + TEST_ASSERT(header1->len == PTRS_LEN); robj **ptr = (robj **)(c->buf + sizeof(payloadHeader)); TEST_ASSERT(obj == *ptr); @@ -173,44 +176,44 @@ int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) { robj *obj2 = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test2")); _addBulkOffloadToBufferOrList(c, obj2); - TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * sizeof(void*)); - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == 2 * sizeof(void*)); + /* 2 offloads expected in c->buf */ + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * PTRS_LEN); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_COPY_AVOID); + TEST_ASSERT(header1->len == 2 * PTRS_LEN); - ptr = (robj **)(c->buf + sizeof(payloadHeader) + sizeof(void*)); + ptr = (robj **)(c->buf + sizeof(payloadHeader) + PTRS_LEN); TEST_ASSERT(obj2 == *ptr); /* Test 2: Add plain reply to the buffer */ - const char* plain = "+OK\r\n"; + const char *plain = "+OK\r\n"; size_t plain_len = strlen(plain); _addReplyToBufferOrList(c, plain, plain_len); - TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + plain_len); - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == 2 * sizeof(void*)); + /* 2 offloads and plain reply expected in c->buf. So 2 headers expected as well */ + TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * PTRS_LEN + plain_len); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_COPY_AVOID); + TEST_ASSERT(header1->len == 2 * PTRS_LEN); payloadHeader *header2 = c->last_header; - TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA); + TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN); TEST_ASSERT(header2->len == plain_len); + /* Add more plain replies. Check same plain reply header updated properly */ for (int i = 0; i < 9; ++i) _addReplyToBufferOrList(c, plain, plain_len); - TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * sizeof(void*) + 10 * plain_len); - TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA); + TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * PTRS_LEN + 10 * plain_len); + TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN); TEST_ASSERT(header2->len == plain_len * 10); /* Test 3: Add one more bulk offload to the buffer */ _addBulkOffloadToBufferOrList(c, obj); TEST_ASSERT(obj->refcount == 3); - TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * sizeof(void*) + 10 * plain_len); + TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * PTRS_LEN + 10 * plain_len); payloadHeader *header3 = c->last_header; - TEST_ASSERT(header3->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - ptr = (robj **)((char*)c->last_header + sizeof(payloadHeader)); + TEST_ASSERT(header3->type == CLIENT_REPLY_BULK_COPY_AVOID); + ptr = (robj **)((char *)c->last_header + sizeof(payloadHeader)); TEST_ASSERT(obj == *ptr); + releaseReplyOffloads(c); decrRefCount(obj); - decrRefCount(obj); - decrRefCount(obj); - - decrRefCount(obj2); decrRefCount(obj2); freeReplyOffloadClient(c); @@ -223,35 +226,46 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { UNUSED(argv); UNUSED(flags); + /* Required for isCopyAvoidAllowed / isCopyAvoidIndicatedByIOThreads */ + int io_threads_num = server.io_threads_num; + int min_io_threads_for_copy_avoid = server.min_io_threads_for_copy_avoid; + server.io_threads_num = 1; + server.min_io_threads_for_copy_avoid = 1; + client *c = createTestClient(); /* Test 1: Add bulk offloads to the reply list */ - /* Reply len to fill the buffer almost completely */ + /* Select reply length so that there is place for 2 headers and 4 bytes only + * 4 bytes is not enough for object pointer(s) + * This will force bulk offload to be added to reply list + */ size_t reply_len = c->buf_usable_size - 2 * sizeof(payloadHeader) - 4; - char *reply = zmalloc(reply_len); memset(reply, 'a', reply_len); _addReplyToBufferOrList(c, reply, reply_len); + TEST_ASSERT(c->flag.buf_encoded); TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len); TEST_ASSERT(listLength(c->reply) == 0); + /* As bulk offload header+pointer can't be accommodated in c->buf + * then one block is expected in c->reply */ robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test")); _addBulkOffloadToBufferOrList(c, obj); - TEST_ASSERT(obj->refcount == 2); TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len); TEST_ASSERT(listLength(c->reply) == 1); + /* Check bulk offload header+pointer inside c->reply */ listIter iter; listRewind(c->reply, &iter); listNode *next = listNext(&iter); clientReplyBlock *blk = listNodeValue(next); - TEST_ASSERT(blk->used == sizeof(payloadHeader) + sizeof(void*)); + TEST_ASSERT(blk->used == sizeof(payloadHeader) + PTRS_LEN); payloadHeader *header1 = blk->last_header; - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == sizeof(void*)); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_COPY_AVOID); + TEST_ASSERT(header1->len == PTRS_LEN); robj **ptr = (robj **)(blk->buf + sizeof(payloadHeader)); TEST_ASSERT(obj == *ptr); @@ -260,9 +274,9 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { _addBulkOffloadToBufferOrList(c, obj); TEST_ASSERT(obj->refcount == 3); TEST_ASSERT(listLength(c->reply) == 1); - TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * sizeof(void*)); - TEST_ASSERT(header1->type == CLIENT_REPLY_PAYLOAD_BULK_OFFLOAD); - TEST_ASSERT(header1->len == 2 * sizeof(void*)); + TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * PTRS_LEN); + TEST_ASSERT(header1->type == CLIENT_REPLY_BULK_COPY_AVOID); + TEST_ASSERT(header1->len == 2 * PTRS_LEN); /* Test 3: Add plain replies to cause reply list grow */ while (reply_len < blk->size - blk->used) _addReplyToBufferOrList(c, reply, reply_len); @@ -277,15 +291,20 @@ int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { clientReplyBlock *blk2 = listNodeValue(next); /* last header in 2nd block */ payloadHeader *header3 = blk2->last_header; - TEST_ASSERT(header2->type == CLIENT_REPLY_PAYLOAD_DATA && header3->type == CLIENT_REPLY_PAYLOAD_DATA); + TEST_ASSERT(header2->type == CLIENT_REPLY_PLAIN && header3->type == CLIENT_REPLY_PLAIN); TEST_ASSERT((header2->len + header3->len) % reply_len == 0); + releaseReplyOffloads(c); decrRefCount(obj); - decrRefCount(obj); - decrRefCount(obj); + + zfree(reply); freeReplyOffloadClient(c); + /* Restore modified values */ + server.io_threads_num = io_threads_num; + server.min_io_threads_for_copy_avoid = min_io_threads_for_copy_avoid; + return 0; } @@ -294,7 +313,7 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { UNUSED(argv); UNUSED(flags); - const char* expected_reply = "$5\r\nhello\r\n"; + const char *expected_reply = "$5\r\nhello\r\n"; ssize_t total_len = strlen(expected_reply); const int iovmax = 16; char crlf[2] = {'\r', '\n'}; @@ -310,11 +329,11 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { replyIOV reply; initReplyIOV(c, iovmax, iov_arr, prefixes, crlf, &reply); - addBufferToReplyIOV(c->buf, c->bufpos, &reply, &metadata[0]); + addBufferToReplyIOV(c->flag.buf_encoded, c->buf, c->bufpos, &reply, &metadata[0]); TEST_ASSERT(reply.iov_len_total == total_len); TEST_ASSERT(reply.cnt == 3); - const char* ptr = expected_reply; + const char *ptr = expected_reply; for (int i = 0; i < reply.cnt; ++i) { TEST_ASSERT(memcmp(ptr, reply.iov[i].iov_base, reply.iov[i].iov_len) == 0); ptr += reply.iov[i].iov_len; @@ -333,14 +352,14 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { replyIOV reply2; initReplyIOV(c, iovmax, iov_arr2, prefixes2, crlf, &reply2); - addBufferToReplyIOV(c->buf, c->bufpos, &reply2, &metadata2[0]); + addBufferToReplyIOV(c->flag.buf_encoded, c->buf, c->bufpos, &reply2, &metadata2[0]); TEST_ASSERT(reply2.iov_len_total == total_len - 1); - TEST_ASSERT((*(char*)reply2.iov[0].iov_base) == '5'); + TEST_ASSERT((*(char *)reply2.iov[0].iov_base) == '5'); /* Test 4: Last written buf/pos/data_len after 2nd invocation */ saveLastWrittenBuf(c, metadata2, 1, reply2.iov_len_total, 4); /* 4 more bytes has been written */ TEST_ASSERT(c->io_last_written_buf == c->buf); - TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */ + TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */ TEST_ASSERT(c->io_last_written_data_len == 5); /* 1 + 4 */ /* Test 5: 3rd writevToclient invocation */ @@ -350,9 +369,9 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { replyIOV reply3; initReplyIOV(c, iovmax, iov_arr3, prefixes3, crlf, &reply3); - addBufferToReplyIOV(c->buf, c->bufpos, &reply3, &metadata3[0]); + addBufferToReplyIOV(c->flag.buf_encoded, c->buf, c->bufpos, &reply3, &metadata3[0]); TEST_ASSERT(reply3.iov_len_total == total_len - 5); - TEST_ASSERT((*(char*)reply3.iov[0].iov_base) == 'e'); + TEST_ASSERT((*(char *)reply3.iov[0].iov_base) == 'e'); /* Test 6: Last written buf/pos/data_len after 3rd invocation */ saveLastWrittenBuf(c, metadata3, 1, reply3.iov_len_total, reply3.iov_len_total); /* everything has been written */ @@ -366,4 +385,4 @@ int test_addBufferToReplyIOV(int argc, char **argv, int flags) { freeReplyOffloadClient(c); return 0; -} \ No newline at end of file +} diff --git a/tests/instances.tcl b/tests/instances.tcl index 5cc96b0edb3..3d578806757 100644 --- a/tests/instances.tcl +++ b/tests/instances.tcl @@ -111,6 +111,7 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} { if {$::io_threads} { puts $cfg "io-threads 2" puts $cfg "events-per-io-thread 0" + puts $cfg "min-io-threads-copy-avoid-on 2" } if {$::log_req_res} { diff --git a/tests/support/server.tcl b/tests/support/server.tcl index 6b1b9afc0af..fe08833998a 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -256,11 +256,6 @@ proc tags_acceptable {tags err_return} { return 0 } - if {$::reply_offload && [lsearch $tags "reply-offload:skip"] >= 0} { - set err "Not supported in reply-offload mode" - return 0 - } - if {$::tcl_version < 8.6 && [lsearch $tags "ipv6"] >= 0} { set err "TCL version is too low and does not support this" return 0 @@ -539,10 +534,7 @@ proc start_server {options {code undefined}} { if {$::io_threads} { dict set config "io-threads" 2 dict set config "events-per-io-thread" 0 - } - - if {$::reply_offload} { - dict set config "reply-offload" "yes" + dict set config "min-io-threads-copy-avoid-on" 2 } foreach line $data { diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 9d278d07c5a..662449134ab 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -55,7 +55,6 @@ set ::valgrind 0 set ::durable 0 set ::tls 0 set ::io_threads 0 -set ::reply_offload 0 set ::tls_module 0 set ::stack_logging 0 set ::verbose 0 @@ -611,7 +610,6 @@ proc print_help_screen {} { " debugger)." "--dump-logs Dump server log on test failure." "--io-threads Run tests with IO threads." - "--reply-offload Run tests with reply offload enabled." "--tls Run tests in TLS mode." "--tls-module Run tests in TLS mode with Valkey module." "--host Run tests against an external host." @@ -676,8 +674,6 @@ for {set j 0} {$j < [llength $argv]} {incr j} { set ::quiet 1 } elseif {$opt eq {--io-threads}} { set ::io_threads 1 - } elseif {$opt eq {--reply-offload}} { - set ::reply_offload 1 } elseif {$opt eq {--tls} || $opt eq {--tls-module}} { package require tls 1.6 set ::tls 1 diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl index 4140b6d05eb..efeece0b399 100644 --- a/tests/unit/client-eviction.tcl +++ b/tests/unit/client-eviction.tcl @@ -1,4 +1,4 @@ -tags {"external:skip logreqres:skip reply-offload:skip"} { +tags {"external:skip logreqres:skip"} { # Get info about a server client connection: # name - name of client we want to query @@ -52,6 +52,9 @@ proc kb {v} { start_server {} { set maxmemory_clients 3000000 r config set maxmemory-clients $maxmemory_clients + # Disable copy avoidance + r config set min-io-threads-copy-avoid-on 0 + r config set min-bulk-string-size-copy-avoid-on 0 test "client evicted due to large argv" { r flushdb @@ -332,6 +335,9 @@ start_server {} { set obuf_limit [mb 3] r config set maxmemory-clients $maxmemory_clients r config set client-output-buffer-limit "normal $obuf_limit 0 0" + # Disable copy avoidance + r config set min-io-threads-copy-avoid-on 0 + r config set min-bulk-string-size-copy-avoid-on 0 test "avoid client eviction when client is freed by output buffer limit" { r flushdb @@ -385,13 +391,16 @@ start_server {} { } start_server {} { + # Disable copy avoidance + r config set min-io-threads-copy-avoid-on 0 + r config set min-bulk-string-size-copy-avoid-on 0 + test "decrease maxmemory-clients causes client eviction" { set maxmemory_clients [mb 4] set client_count 10 set qbsize [expr ($maxmemory_clients - [mb 1]) / $client_count] r config set maxmemory-clients $maxmemory_clients - # Make multiple clients consume together roughly 1mb less than maxmemory_clients set rrs {} for {set j 0} {$j < $client_count} {incr j} { @@ -426,6 +435,10 @@ start_server {} { } start_server {} { + # Disable copy avoidance + r config set min-io-threads-copy-avoid-on 0 + r config set min-bulk-string-size-copy-avoid-on 0 + test "evict clients only until below limit" { set client_count 10 set client_mem [mb 1] @@ -434,6 +447,7 @@ start_server {} { r client setname control r client no-evict on + # Make multiple clients consume together roughly 1mb less than maxmemory_clients set total_client_mem 0 set max_client_mem 0 @@ -488,6 +502,10 @@ start_server {} { } start_server {} { + # Disable copy avoidance + r config set min-io-threads-copy-avoid-on 0 + r config set min-bulk-string-size-copy-avoid-on 0 + test "evict clients in right order (large to small)" { # Note that each size step needs to be at least x2 larger than previous step # because of how the client-eviction size bucketing works @@ -555,6 +573,10 @@ start_server {} { } start_server {} { + # Disable copy avoidance + r config set min-io-threads-copy-avoid-on 0 + r config set min-bulk-string-size-copy-avoid-on 0 + foreach type {"client no-evict" "maxmemory-clients disabled"} { r flushall r client no-evict on diff --git a/tests/unit/commandlog.tcl b/tests/unit/commandlog.tcl index 39038d79509..1edc71b9e18 100644 --- a/tests/unit/commandlog.tcl +++ b/tests/unit/commandlog.tcl @@ -27,12 +27,18 @@ start_server {tags {"commandlog"} overrides {commandlog-execution-slower-than 10 assert_equal [r commandlog len large-request] 1 # for large-reply + set copy_avoid [lindex [r config get min-io-threads-copy-avoid-on] 1] + r config set min-io-threads-copy-avoid-on 0 + r config set commandlog-reply-larger-than 1024 r ping assert_equal [r commandlog len large-reply] 0 r get testkey assert_equal [r commandlog len large-reply] 1 - } {} {needs:debug} + + # Restore min-io-threads-copy-avoid-on value + r config set min-io-threads-copy-avoid-on $copy_avoid + } {OK} {needs:debug} test {COMMANDLOG - zero max length is correctly handled} { r commandlog reset slow @@ -120,6 +126,9 @@ start_server {tags {"commandlog"} overrides {commandlog-execution-slower-than 10 assert_equal {foobar} [lindex $e 5] # for large-reply + set copy_avoid [lindex [r config get min-io-threads-copy-avoid-on] 1] + r config set min-io-threads-copy-avoid-on 0 + r get testkey set e [lindex [r commandlog get -1 large-reply] 0] assert_equal [llength $e] 6 @@ -129,7 +138,10 @@ start_server {tags {"commandlog"} overrides {commandlog-execution-slower-than 10 assert_equal [expr {[lindex $e 2] > 1024}] 1 assert_equal [lindex $e 3] {get testkey} assert_equal {foobar} [lindex $e 5] - } {} {needs:debug} + + # Restore min-io-threads-copy-avoid-on value + r config set min-io-threads-copy-avoid-on $copy_avoid + } {OK} {needs:debug} test {COMMANDLOG slow - Certain commands are omitted that contain sensitive information} { r config set commandlog-slow-execution-max-len 100 diff --git a/tests/unit/info.tcl b/tests/unit/info.tcl index 94a56bb4a5c..8f872f38a7d 100644 --- a/tests/unit/info.tcl +++ b/tests/unit/info.tcl @@ -384,6 +384,12 @@ start_server {tags {"info" "external:skip" "debug_defrag:skip"}} { } test {stats: client input and output buffer limit disconnections} { + # Disable copy avoidance + set min_threads [lindex [r config get min-io-threads-copy-avoid-on] 1] + set min_size [lindex [r config get min-bulk-string-size-copy-avoid-on] 1] + r config set min-io-threads-copy-avoid-on 0 + r config set min-bulk-string-size-copy-avoid-on 0 + r config resetstat set info [r info stats] assert_equal [getInfoProperty $info client_query_buffer_limit_disconnections] {0} @@ -407,9 +413,14 @@ start_server {tags {"info" "external:skip" "debug_defrag:skip"}} { r set key [string repeat a 100000] ;# to trigger output buffer limit check this needs to be big catch {r get key} r config set client-output-buffer-limit $org_outbuf_limit + + # Restore copy avoidance configs + r config set min-io-threads-copy-avoid-on $min_threads + r config set min-bulk-string-size-copy-avoid-on $min_size + set info [r info stats] assert_equal [getInfoProperty $info client_output_buffer_limit_disconnections] {1} - } {} {logreqres:skip reply-offload:skip} ;# same as obuf-limits.tcl, skip logreqres + } {} {logreqres:skip} ;# same as obuf-limits.tcl, skip logreqres test {clients: pubsub clients} { set info [r info clients] diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 504b787bdae..111086c12b4 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -1,7 +1,9 @@ -start_server {tags {"maxmemory external:skip reply-offload:skip"}} { +start_server {tags {"maxmemory external:skip"}} { r config set maxmemory 11mb r config set maxmemory-policy allkeys-lru set server_pid [s process_id] + # Disable copy avoidance + r config set min-io-threads-copy-avoid-on 0 proc init_test {client_eviction} { r flushdb sync @@ -112,6 +114,12 @@ start_server {tags {"maxmemory external:skip reply-offload:skip"}} { set clients {} test "eviction due to output buffers of pubsub, client eviction: $client_eviction" { + # Disable copy avoidance + set min_threads [lindex [r config get min-io-threads-copy-avoid-on] 1] + set min_size [lindex [r config get min-bulk-string-size-copy-avoid-on] 1] + r config set min-io-threads-copy-avoid-on 0 + r config set min-bulk-string-size-copy-avoid-on 0 + init_test $client_eviction for {set j 0} {$j < 20} {incr j} { @@ -135,6 +143,10 @@ start_server {tags {"maxmemory external:skip reply-offload:skip"}} { } } + # Restore copy avoidance configs + r config set min-io-threads-copy-avoid-on $min_threads + r config set min-bulk-string-size-copy-avoid-on $min_size + verify_eviction_test $client_eviction } foreach rr $clients { diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index 1f391dfd73c..11d403b046a 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -1,4 +1,7 @@ -start_server {tags {"obuf-limits external:skip logreqres:skip reply-offload:skip"}} { +start_server {tags {"obuf-limits external:skip logreqres:skip"}} { + # Disable copy avoidance + r config set min-io-threads-copy-avoid-on 0 + test {CONFIG SET client-output-buffer-limit} { set oldval [lindex [r config get client-output-buffer-limit] 1] diff --git a/tests/unit/replybufsize.tcl b/tests/unit/replybufsize.tcl index 4929fa832fc..60cca7d0549 100644 --- a/tests/unit/replybufsize.tcl +++ b/tests/unit/replybufsize.tcl @@ -8,8 +8,10 @@ proc get_reply_buffer_size {cname} { return $rbufsize } -start_server {tags {"replybufsize reply-offload:skip"}} { - +start_server {tags {"replybufsize"}} { + # Disable copy avoidance + r config set min-io-threads-copy-avoid-on 0 + test {verify reply buffer limits} { # In order to reduce test time we can set the peak reset time very low r debug replybuffer peak-reset-time 100 diff --git a/valkey.conf b/valkey.conf index 9617f6b12b6..99041fb4092 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1448,12 +1448,6 @@ lazyfree-lazy-user-flush yes # # prefetch-batch-max-size 16 # -# For use cases where command replies include Bulk strings (e.g. GET, MGET) -# reply offload can be enabled to eliminate espensive memory access -# and redundant data copy performed by main thread -# -# reply-offload yes -# # NOTE: # 1. The 'io-threads-do-reads' config is deprecated and has no effect. Please # avoid using this config if possible.