Skip to content

Commit

Permalink
Address feedbacks and move cached_cluster_slot_info to server
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <[email protected]>
  • Loading branch information
roshkhatri committed May 14, 2024
1 parent 0b14569 commit 26a7e53
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 50 deletions.
32 changes: 21 additions & 11 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ void clusterCommand(client *c) {
clusterCommandMyShardId(c);
} else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
/* CLUSTER SLOTS */
clusterSlotsCommand(c);
clusterCommandSlots(c);
} else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) {
/* CLUSTER SHARDS */
clusterCommandShards(c);
Expand Down Expand Up @@ -1403,6 +1403,15 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
serverAssert(nested_elements == 3); /* Original 3 elements */
}

void clearCachedClusterSlotsResponse(void) {
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cached_cluster_slot_info[conn_type]) {
sdsfree(server.cached_cluster_slot_info[conn_type]);
server.cached_cluster_slot_info[conn_type] = NULL;
}
}
}

sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
clusterNode *n = NULL;
Expand Down Expand Up @@ -1435,12 +1444,12 @@ sds generateClusterSlotResponse(void) {
int verifyCachedClusterSlotsResponse(sds cached_response) {
sds generated_response = generateClusterSlotResponse();
int result = !sdscmp(generated_response, cached_response);
if (!result) serverLog(LL_NOTICE,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
if (!result) serverLog(LL_WARNING,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
sdsfree(generated_response);
return result;
}

void clusterSlotsCommand(client *c) {
void clusterCommandSlots(client *c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
Expand All @@ -1453,16 +1462,17 @@ void clusterSlotsCommand(client *c) {
*/
connTypeForCaching conn_type = connIsTLS(c->conn);

/* Check if we have a response cached for cluster slots for early exit. */
updateAllCachedNodesHealth();
if (isClusterSlotsResponseCached(conn_type)) {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(getClusterSlotReply(conn_type)) == 1);
addReplyProto(c, getClusterSlotReply(conn_type), sdslen(getClusterSlotReply(conn_type)));
return;
if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse();

sds cached_reply = server.cached_cluster_slot_info[conn_type];
if (!cached_reply) {
cached_reply = generateClusterSlotResponse();
server.cached_cluster_slot_info[conn_type] = cached_reply;
} else {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1);
}

cacheSlotsResponse(generateClusterSlotResponse(), conn_type);
addReplyProto(c, getClusterSlotReply(conn_type), sdslen(getClusterSlotReply(conn_type)));
addReplyProto(c, cached_reply, sdslen(cached_reply));
}

/* -----------------------------------------------------------------------------
Expand Down
8 changes: 2 additions & 6 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int clusterAllowFailoverCmd(client *c);
void clusterPromoteSelfToMaster(void);
int clusterManualFailoverTimeLimit(void);

void clusterSlotsCommand(client * c);
void clusterCommandSlots(client * c);
void clusterCommandMyId(client *c);
void clusterCommandMyShardId(client *c);
void clusterCommandShards(client *c);
Expand Down Expand Up @@ -117,10 +117,6 @@ void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);

int isClusterSlotsResponseCached(connTypeForCaching conn_type);
sds getClusterSlotReply(connTypeForCaching conn_type);
void clearCachedClusterSlotsResponse(void);
void cacheSlotsResponse(sds response_to_cache, connTypeForCaching conn_type);
void updateAllCachedNodesHealth(void);
int detectAndUpdateCachedNodeHealth(void);
int isNodeAvailable(clusterNode *node);
#endif /* __CLUSTER_H */
31 changes: 3 additions & 28 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,31 +110,6 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);

int isClusterSlotsResponseCached(connTypeForCaching conn_type) {
if (server.cluster->cached_cluster_slot_info[conn_type] &&
sdslen(server.cluster->cached_cluster_slot_info[conn_type])) {
return 1;
}
return 0;
}

sds getClusterSlotReply(connTypeForCaching conn_type) {
return server.cluster->cached_cluster_slot_info[conn_type];
}

void clearCachedClusterSlotsResponse(void) {
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cluster->cached_cluster_slot_info[conn_type]) {
sdsfree(server.cluster->cached_cluster_slot_info[conn_type]);
server.cluster->cached_cluster_slot_info[conn_type] = NULL;
}
}
}

void cacheSlotsResponse(sds response_to_cache, connTypeForCaching conn_type) {
server.cluster->cached_cluster_slot_info[conn_type] = response_to_cache;
}

int getNodeDefaultClientPort(clusterNode *n) {
return server.tls_cluster ? n->tls_port : n->tcp_port;
}
Expand Down Expand Up @@ -1068,7 +1043,7 @@ void clusterInit(void) {
server.cluster->mf_end = 0;
server.cluster->mf_slave = NULL;
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
server.cluster->cached_cluster_slot_info[conn_type] = NULL;
server.cached_cluster_slot_info[conn_type] = NULL;
}
resetManualFailover();
clusterUpdateMyselfFlags();
Expand Down Expand Up @@ -6939,7 +6914,7 @@ void clusterPromoteSelfToMaster(void) {
replicationUnsetMaster();
}

void updateAllCachedNodesHealth(void) {
int detectAndUpdateCachedNodeHealth(void) {
dictIterator di;
dictInitSafeIterator(&di, server.cluster->nodes);
dictEntry *de;
Expand All @@ -6954,7 +6929,7 @@ void updateAllCachedNodesHealth(void) {
}
}

if (overall_health_changed) clearCachedClusterSlotsResponse();
return overall_health_changed;
}

/* Replicate migrating and importing slot states to all replicas */
Expand Down
3 changes: 1 addition & 2 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ struct _clusterNode {
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
int is_node_healthy; /* Boolean last updated node health used for validating
cached response, can be stale. Update by calling updateAllCachedNodesHealth() */
cached response, can be stale. Update by calling detectAndUpdateCachedNodeHealth() */
};

struct clusterState {
Expand Down Expand Up @@ -358,7 +358,6 @@ struct clusterState {
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX];
};


Expand Down
2 changes: 1 addition & 1 deletion src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ typedef enum connTypeForCaching {
CACHE_CONN_TCP,
CACHE_CONN_TLS,
CACHE_CONN_TYPE_MAX
}connTypeForCaching;
} connTypeForCaching;

typedef void (*ConnectionCallbackFunc)(struct connection *conn);

Expand Down
6 changes: 4 additions & 2 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,16 @@ int prepareClientToWrite(client *c) {
return C_OK;
}

/* Returns everything in the client reply linked list in a SDS format. */
/* Returns everything in the client reply linked list in a SDS format.
* This should only be used only with a caching client. */
static sds getClientOutputBuffer(client *c) {
sds cmd_response = sdsempty();
listIter li;
listNode *ln;
clientReplyBlock *val_block;
listRewind(c->reply,&li);


/* Here, c->buf is not used, thus we confirm c->bufpos remains 0. */
serverAssert(c->bufpos == 0);
while ((ln = listNext(&li)) != NULL) {
val_block = (clientReplyBlock *)listNodeValue(ln);
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,7 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX];
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
int pre_command_oom_state; /* OOM before command (script?) was started */
Expand Down Expand Up @@ -2682,6 +2683,7 @@ int authRequired(client *c);
void putClientInPendingWriteQueue(client *c);
client *createCachedResponseClient(void);
sds stopCaching(client *recording_client);
void clearCachedClusterSlotsResponse(void);

/* logreqres.c - logging of requests and responses */
void reqresReset(client *c, int free_buf);
Expand Down

0 comments on commit 26a7e53

Please sign in to comment.