diff --git a/src/cluster.c b/src/cluster.c index 2a9d33a57d2..4e54a220f91 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1402,20 +1402,11 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in serverAssert(nested_elements == 3); /* Original 3 elements */ } -void clusterCommandSlots(client * c) { - /* Format: 1) 1) start slot - * 2) end slot - * 3) 1) master IP - * 2) master port - * 3) node ID - * 4) 1) replica IP - * 2) replica port - * 3) node ID - * ... continued until done - */ +sds generateClusterSlotResponse(void) { + client *recording_client = createCachedResponseClient(); clusterNode *n = NULL; int num_masters = 0, start = -1; - void *slot_replylen = addReplyDeferredLen(c); + void *slot_replylen = addReplyDeferredLen(recording_client); for (int i = 0; i <= CLUSTER_SLOTS; i++) { /* Find start node and slot id. */ @@ -1429,14 +1420,48 @@ void clusterCommandSlots(client * c) { /* Add cluster slots info when occur different node with start * or end of slot. */ if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) { - addNodeReplyForClusterSlot(c, n, start, i-1); + addNodeReplyForClusterSlot(recording_client, n, start, i-1); num_masters++; if (i == CLUSTER_SLOTS) break; n = getNodeBySlot(i); start = i; } } - setDeferredArrayLen(c, slot_replylen, num_masters); + setDeferredArrayLen(recording_client, slot_replylen, num_masters); + return stopCaching(recording_client); +} + +int verifyCachedClusterSlotsResponse(sds cached_response) { + sds generated_response = generateClusterSlotResponse(); + int result = !sdscmp(generated_response, cached_response); + if (!result) serverLog(LL_NOTICE,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response); + sdsfree(generated_response); + return result; +} + +void clusterCommandSlots(client * c) { + /* Format: 1) 1) start slot + * 2) end slot + * 3) 1) master IP + * 2) master port + * 3) node ID + * 4) 1) replica IP + * 2) replica port + * 3) node ID + * ... continued until done + */ + enum connTypeForCaching conn_type = connIsTLS(c->conn); + + /* Check if we have a response cached for cluster slots for early exit. */ + updateNodesHealth(); + if (isClusterSlotsResponseCached(conn_type)) { + debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(getClusterSlotReply(conn_type)) == 1); + addReplyProto(c, getClusterSlotReply(conn_type), sdslen(getClusterSlotReply(conn_type))); + return; + } + + cacheSlotsResponse(generateClusterSlotResponse(), conn_type); + addReplyProto(c, getClusterSlotReply(conn_type), sdslen(getClusterSlotReply(conn_type))); } /* ----------------------------------------------------------------------------- diff --git a/src/cluster.h b/src/cluster.h index f21f1e9c16e..7dc8c626e0c 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -115,4 +115,10 @@ int isValidAuxString(char *s, unsigned int length); void migrateCommand(client *c); void clusterCommand(client *c); ConnectionType *connTypeOfCluster(void); + +int isClusterSlotsResponseCached(enum connTypeForCaching conn_type); +sds getClusterSlotReply(enum connTypeForCaching conn_type); +void clearCachedClusterSlotsResp(void); +void cacheSlotsResponse(sds response_to_cache, enum connTypeForCaching conn_type); +void updateNodesHealth(void); #endif /* __CLUSTER_H */ diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 4e4f9425d66..ea64e9c69b5 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -110,6 +110,31 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen); void freeClusterLink(clusterLink *link); int verifyClusterNodeId(const char *name, int length); +int isClusterSlotsResponseCached(enum connTypeForCaching conn_type) { + if (server.cluster->cached_cluster_slot_info[conn_type] && + sdslen(server.cluster->cached_cluster_slot_info[conn_type])) { + return 1; + } + return 0; +} + +sds getClusterSlotReply(enum connTypeForCaching conn_type) { + return server.cluster->cached_cluster_slot_info[conn_type]; +} + +void clearCachedClusterSlotsResp(void) { + for (enum connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) { + if (server.cluster->cached_cluster_slot_info[conn_type]) { + sdsfree(server.cluster->cached_cluster_slot_info[conn_type]); + server.cluster->cached_cluster_slot_info[conn_type] = NULL; + } + } +} + +void cacheSlotsResponse(sds response_to_cache, enum connTypeForCaching conn_type) { + server.cluster->cached_cluster_slot_info[conn_type] = response_to_cache; +} + int getNodeDefaultClientPort(clusterNode *n) { return server.tls_cluster ? n->tls_port : n->tcp_port; } @@ -492,6 +517,7 @@ int clusterLoadConfig(char *filename) { } *p = '\0'; memcpy(n->ip,aux_argv[0],strlen(aux_argv[0])+1); + clearCachedClusterSlotsResp(); char *port = p+1; char *busp = strchr(port,'@'); if (busp) { @@ -887,6 +913,7 @@ void clusterUpdateMyselfIp(void) { } else { myself->ip[0] = '\0'; /* Force autodetection. */ } + clearCachedClusterSlotsResp(); } } @@ -904,6 +931,7 @@ static void updateAnnouncedHostname(clusterNode *node, char *new) { } else if (sdslen(node->hostname) != 0) { sdsclear(node->hostname); } + clearCachedClusterSlotsResp(); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } @@ -1028,6 +1056,9 @@ void clusterInit(void) { server.cluster->mf_end = 0; server.cluster->mf_slave = NULL; + for (enum connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) { + server.cluster->cached_cluster_slot_info[conn_type] = NULL; + } resetManualFailover(); clusterUpdateMyselfFlags(); clusterUpdateMyselfIp(); @@ -1352,6 +1383,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->repl_offset_time = 0; node->repl_offset = 0; listSetFreeMethod(node->fail_reports,zfree); + node->node_health = 0; return node; } @@ -1480,6 +1512,7 @@ int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) { master->slaves[master->numslaves] = slave; master->numslaves++; master->flags |= CLUSTER_NODE_MIGRATE_TO; + clearCachedClusterSlotsResp(); return C_OK; } @@ -1526,6 +1559,7 @@ void clusterAddNode(clusterNode *node) { retval = dictAdd(server.cluster->nodes, sdsnewlen(node->name,CLUSTER_NAMELEN), node); serverAssert(retval == DICT_OK); + clearCachedClusterSlotsResp(); } /* Remove a node from the cluster. The function performs the high level @@ -1567,6 +1601,7 @@ void clusterDelNode(clusterNode *delnode) { /* 3) Remove the node from the owning shard */ clusterRemoveNodeFromShard(delnode); + clearCachedClusterSlotsResp(); /* 4) Free the node, unlinking it from the cluster. */ freeClusterNode(delnode); @@ -2184,6 +2219,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { node->tls_port = msg_tls_port; node->cport = ntohs(g->cport); node->flags &= ~CLUSTER_NODE_NOADDR; + clearCachedClusterSlotsResp(); } } else if (!node) { /* If it's not in NOADDR state and we don't have it, we @@ -2279,6 +2315,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, serverLog(LL_NOTICE,"Address updated for node %.40s (%s), now %s:%d", node->name, node->human_nodename, node->ip, getNodeDefaultClientPort(node)); + clearCachedClusterSlotsResp(); /* Check if this is our master and we have to change the * replication target as well. */ if (nodeIsSlave(myself) && myself->slaveof == node) @@ -2300,6 +2337,7 @@ void clusterSetNodeAsMaster(clusterNode *n) { n->flags |= CLUSTER_NODE_MASTER; n->slaveof = NULL; + clearCachedClusterSlotsResp(); /* Update config and state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); @@ -2344,7 +2382,9 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc serverLog(LL_NOTICE,"Discarding UPDATE message about myself."); return; } - + + clearCachedClusterSlotsResp(); + for (j = 0; j < CLUSTER_SLOTS; j++) { if (bitmapTestBit(slots,j)) { sender_slots++; @@ -2851,6 +2891,7 @@ int clusterProcessPacket(clusterLink *link) { strcmp(ip,myself->ip)) { memcpy(myself->ip,ip,NET_IP_STR_LEN); + clearCachedClusterSlotsResp(); serverLog(LL_NOTICE,"IP address for this node updated to %s", myself->ip); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); @@ -2933,6 +2974,7 @@ int clusterProcessPacket(clusterLink *link) { link->node->cport = 0; freeClusterLink(link); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); + clearCachedClusterSlotsResp(); return 0; } } @@ -4177,6 +4219,7 @@ void clusterFailoverReplaceYourMaster(void) { /* 3) Update state and save config. */ clusterUpdateState(); clusterSaveConfigOrDie(1); + clearCachedClusterSlotsResp(); /* 4) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ @@ -4233,6 +4276,7 @@ void clusterHandleSlaveFailover(void) { server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; return; } + clearCachedClusterSlotsResp(); /* Set data_age to the number of milliseconds we are disconnected from * the master. */ @@ -4961,6 +5005,7 @@ int clusterAddSlot(clusterNode *n, int slot) { if (server.cluster->slots[slot]) return C_ERR; clusterNodeSetSlotBit(n,slot); server.cluster->slots[slot] = n; + clearCachedClusterSlotsResp(); return C_OK; } @@ -4979,6 +5024,7 @@ int clusterDelSlot(int slot) { server.cluster->slots[slot] = NULL; /* Make owner_not_claiming_slot flag consistent with slot ownership information. */ bitmapClearBit(server.cluster->owner_not_claiming_slot, slot); + clearCachedClusterSlotsResp(); return C_OK; } @@ -5603,6 +5649,14 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) { } } +long long getNodeOffSet(clusterNode *node) { + if (node->flags & CLUSTER_NODE_MYSELF) { + return nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset; + } else { + return node->repl_offset; + } +} + /* Add detailed information of a node to the output buffer of the given client. */ void addNodeDetailsToShardReply(client *c, clusterNode *node) { int reply_count = 0; @@ -5637,12 +5691,7 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) { reply_count++; } - long long node_offset; - if (node->flags & CLUSTER_NODE_MYSELF) { - node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset; - } else { - node_offset = node->repl_offset; - } + long long node_offset = getNodeOffSet(node); addReplyBulkCString(c, "role"); addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master"); @@ -6517,3 +6566,31 @@ int clusterAllowFailoverCmd(client *c) { void clusterPromoteSelfToMaster(void) { replicationUnsetMaster(); } + +void updateNodesHealth(void) { + dictIterator *di; + dictEntry *de; + clusterNode *node; + int overall_health_changed = 0; + di = dictGetSafeIterator(server.cluster->nodes); + while((de = dictNext(di)) != NULL) { + node = dictGetVal(de); + int present_node_health; + + long long node_offset = getNodeOffSet(node); + + if (nodeFailed(node) || (nodeIsSlave(node) && node_offset == 0)) { + present_node_health = 0; + } else { + present_node_health = 1; + } + + if (present_node_health != node->node_health) { + overall_health_changed = 1; + } + node->node_health = present_node_health; + } + dictReleaseIterator(di); + + if (overall_health_changed) clearCachedClusterSlotsResp(); +} diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index a857184ab3e..6728c5beea7 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -305,6 +305,7 @@ struct _clusterNode { clusterLink *link; /* TCP/IP link established toward this node */ clusterLink *inbound_link; /* TCP/IP link accepted from this node */ list *fail_reports; /* List of nodes signaling this as failing */ + int node_health; /* Last updated node health, can be stale. Update by calling updateNodesHealth() */ }; struct clusterState { @@ -353,6 +354,7 @@ struct clusterState { * stops claiming the slot. This prevents spreading incorrect information (that * source still owns the slot) using UPDATE messages. */ unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8]; + sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; }; diff --git a/src/config.c b/src/config.c index 14d45b67212..5d62d995df9 100644 --- a/src/config.c +++ b/src/config.c @@ -2581,6 +2581,12 @@ static int updateOOMScoreAdj(const char **err) { return 1; } +int invalidateClusterSlotsResp(const char **err) { + UNUSED(err); + clearCachedClusterSlotsResp(); + return 1; +} + int updateRequirePass(const char **err) { UNUSED(err); /* The old "requirepass" directive just translates to setting @@ -3156,7 +3162,7 @@ standardConfig static_configs[] = { createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), - createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL), + createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, invalidateClusterSlotsResp), createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL), createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL), createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL), diff --git a/src/connection.h b/src/connection.h index d0340d18f5d..096f893f817 100644 --- a/src/connection.h +++ b/src/connection.h @@ -62,6 +62,12 @@ typedef enum { #define CONN_TYPE_TLS "tls" #define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */ +enum connTypeForCaching { + CACHE_CONN_TCP, + CACHE_CONN_TLS, + CACHE_CONN_TYPE_MAX +}; + typedef void (*ConnectionCallbackFunc)(struct connection *conn); typedef struct ConnectionType { diff --git a/src/networking.c b/src/networking.c index 5d0167fc18d..2c4e377b345 100644 --- a/src/networking.c +++ b/src/networking.c @@ -317,6 +317,42 @@ int prepareClientToWrite(client *c) { return C_OK; } +/* Returns everything in the client reply linked list in a SDS format. */ +static sds getClientOutputBuffer(client *c) { + sds cmd_response = sdsempty(); + listIter li; + listNode *ln; + clientReplyBlock *val_block; + listRewind(c->reply,&li); + + serverAssert(c->bufpos == 0); + while ((ln = listNext(&li)) != NULL) { + val_block = (clientReplyBlock *)listNodeValue(ln); + cmd_response = sdscatlen(cmd_response, val_block->buf,val_block->used); + } + return cmd_response; +} + +/* This function creates and returns a fake client for recording the command response + * to initiate caching of any command response. + * + * It needs be paired with `stopCaching` function to stop caching. */ +client *createCachedResponseClient(void) { + struct client *recording_client = createClient(NULL); + recording_client->conn = zcalloc(sizeof(connection)); + return recording_client; +} + +/* This function is used to stop caching of any command response after `createCachedResponseClient` is called. + * It returns the command response as SDS from the recording_client's reply buffer. */ +sds stopCaching(client *recording_client) { + zfree(recording_client->conn); + recording_client->conn = NULL; + sds output_buff = getClientOutputBuffer(recording_client); + freeClient(recording_client); + return output_buff; +} + /* ----------------------------------------------------------------------------- * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ diff --git a/src/server.h b/src/server.h index 411edadec4e..5ca78720d70 100644 --- a/src/server.h +++ b/src/server.h @@ -2674,6 +2674,8 @@ void initThreadedIO(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); +client *createCachedResponseClient(void); +sds stopCaching(client *recording_client); /* logreqres.c - logging of requests and responses */ void reqresReset(client *c, int free_buf); diff --git a/tests/cluster/tests/02-failover.tcl b/tests/cluster/tests/02-failover.tcl index 6b2fd09af8b..ce6603eb503 100644 --- a/tests/cluster/tests/02-failover.tcl +++ b/tests/cluster/tests/02-failover.tcl @@ -62,4 +62,5 @@ test "Instance #0 gets converted into a slave" { } else { fail "Old master was not converted into slave" } + wait_for_cluster_propagation } diff --git a/tests/cluster/tests/04-resharding.tcl b/tests/cluster/tests/04-resharding.tcl index 3a08c708021..9d24d965f88 100644 --- a/tests/cluster/tests/04-resharding.tcl +++ b/tests/cluster/tests/04-resharding.tcl @@ -125,7 +125,7 @@ test "Cluster consistency during live resharding" { } else { fail "Resharding is not terminating after some time." } - + wait_for_cluster_propagation } test "Verify $numkeys keys for consistency with logical content" { diff --git a/tests/cluster/tests/10-manual-failover.tcl b/tests/cluster/tests/10-manual-failover.tcl index 5441b79f388..cd8ae77dff8 100644 --- a/tests/cluster/tests/10-manual-failover.tcl +++ b/tests/cluster/tests/10-manual-failover.tcl @@ -65,6 +65,7 @@ test "Wait for failover" { } else { fail "No failover detected" } + wait_for_cluster_propagation } test "Cluster should eventually be up again" { @@ -92,6 +93,7 @@ test "Instance #0 gets converted into a slave" { } else { fail "Old master was not converted into slave" } + wait_for_cluster_propagation } ## Check that manual failover does not happen if we can't talk with the master.