Skip to content

Commit

Permalink
CCBC-1607: fix collection id encoding in mixed cluster
Browse files Browse the repository at this point in the history
Change-Id: I1893cfa1b67ca9457ad7dd0a14eda5695c5792c5
Reviewed-on: https://review.couchbase.org/c/libcouchbase/+/194901
Tested-by: Build Bot <[email protected]>
Reviewed-by: Brett Lawson <[email protected]>
  • Loading branch information
avsej committed Aug 12, 2023
1 parent 95e4542 commit ad04729
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ template <typename T>
void invoke_callback(const mc_PACKET *pkt, lcb_INSTANCE *instance, T *resp, lcb_CALLBACK_TYPE cbtype)
{
if (instance != nullptr) {
std::string collection_path = instance->collcache->id_to_name(mcreq_get_cid(instance, pkt));
std::string collection_path = instance->collcache->id_to_name(mcreq_get_cid(instance, pkt, NULL));
if (!collection_path.empty()) {
size_t dot = collection_path.find('.');
if (dot != std::string::npos) {
Expand Down
91 changes: 90 additions & 1 deletion src/mc/mcreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ lcb_STATUS mcreq_reserve_key(mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t h
/* copy collection ID prefix */
if (ncid) {
memcpy(SPAN_BUFFER(&packet->kh_span) + hdrsize, cid, ncid);
packet->flags |= MCREQ_F_HASCID;
}
/**
* Copy the key into the packet starting at the extras end
Expand Down Expand Up @@ -243,10 +244,89 @@ void mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar);
}

static void check_collection_id(mc_PIPELINE *pipeline, mc_PACKET *packet)
{
if ((packet->flags & MCREQ_F_NOCID) != 0) {
return;
}

// before adding packet to pipeline lets see if we need add or remove collection id prefix
char *header_and_key = SPAN_BUFFER(&packet->kh_span);
protocol_binary_request_header *request = (protocol_binary_request_header *)header_and_key;

uint16_t key_length;
uint8_t flexible_extras_length = 0;

if (request->request.magic == PROTOCOL_BINARY_AREQ) {
flexible_extras_length = request->request.keylen & 0xff;
key_length = request->request.keylen >> 8;
} else {
key_length = ntohs(request->request.keylen);
}
if (key_length == 0) {
return;
}

char *key = header_and_key + sizeof(*request) + request->request.extlen + flexible_extras_length;
uint32_t collection_id = 0;

uint16_t collection_id_length = 0;
if ((packet->flags & MCREQ_F_HASCID) != 0) {
collection_id_length = (uint16_t)leb128_decode((const uint8_t *)key, key_length, &collection_id);
}

switch (pipeline->collections) {
case MCREQ_COLLECTIONS_SUPPORTED:
// the pipeline had negotiated collections feature with kv engine, so we have to encode collection id
// prefix
if (collection_id_length == 0) {
// but collection id prefix was not encoded, we should assume default collection and prepend zero as
// a collection identifier
mcreq_set_cid(pipeline, packet, 0);
}
break;

case MCREQ_COLLECTIONS_UNSUPPORTTED:
// the pipeline been told that the kv engine instance does not support collections
if (collection_id_length != 0) {
// but the packet has encoded collection id
if (collection_id == 0) {
// strip it if it is default collection
request->request.bodylen = htonl(ntohl(request->request.bodylen) - collection_id_length);
uint16_t new_key_length = key_length - collection_id_length;
if (request->request.magic == PROTOCOL_BINARY_AREQ) {
request->request.keylen = (new_key_length << 8U) | (flexible_extras_length & 0xffU);
} else {
request->request.keylen = htons(new_key_length);
}

// shift the key content to the left
for (int i = 0; i < new_key_length; ++i) {
key[i] = key[i + collection_id_length];
}
} else {
fprintf(
stderr,
"custom collection id has been dispatched to the node, that does not support collections\n");
// TODO log error
}
}
break;

default:
// the pipeline hadn't completed handshake yet, so trust global settings, and let operation be fixed
// when it will be retried in case of misprediction.
fprintf(stderr, "collections has not been negotiated for the pipeline yet\n");
break;
}
}

void mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
{
nb_SPAN *vspan = &packet->u_value.single;
sllist_append(&pipeline->requests, &packet->slnode);

check_collection_id(pipeline, packet);
netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span, packet);
MC_INCR_METRIC(pipeline, bytes_queued, packet->kh_span.size);

Expand Down Expand Up @@ -593,9 +673,10 @@ void mcreq_set_cid(mc_PIPELINE *pipeline, mc_PACKET *packet, uint32_t cid)
netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
}
CREATE_STANDALONE_SPAN(&packet->kh_span, kdata, new_size);
packet->flags |= MCREQ_F_HASCID;
}

uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet)
uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet, int *cid_set)
{
uint8_t ffext = 0;
uint16_t nk = 0;
Expand All @@ -605,6 +686,10 @@ uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet)
char *kh = SPAN_BUFFER(&packet->kh_span);
char *k = NULL;

if (cid_set != NULL) {
*cid_set = 0;
}

memcpy(&req, kh, sizeof(req));
if (req.request.magic == PROTOCOL_BINARY_AREQ) {
ffext = req.request.keylen & 0xff;
Expand All @@ -616,6 +701,9 @@ uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet)
if ((packet->flags & MCREQ_F_NOCID) == 0 && instance && LCBT_SETTING(instance, use_collections)) {
ncid = leb128_decode((uint8_t *)k, nk, &cid);
if (ncid) {
if (cid_set != NULL) {
*cid_set = 1;
}
return cid;
}
}
Expand Down Expand Up @@ -702,6 +790,7 @@ int mcreq_pipeline_init(mc_PIPELINE *pipeline)
pipeline->index = 0;
memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued);
pipeline->buf_done_callback = NULL;
pipeline->collections = MCREQ_COLLECTIONS_UNKNOWN;

netbuf_default_settings(&settings);

Expand Down
19 changes: 17 additions & 2 deletions src/mc/mcreq.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,20 @@ typedef enum {
* The request has "replace" store semantics.
* Utilized during error translation to map DOCUMENT_EXISTS to CAS_MISMATCH (see make_error() in handler.cc)
*/
MCREQ_F_REPLACE_SEMANTICS = 1u << 11u
MCREQ_F_REPLACE_SEMANTICS = 1u << 11u,

/**
* collection id is prepended to the key
*/
MCREQ_F_HASCID = 1u << 12u,
} mcreq_flags;

typedef enum {
MCREQ_COLLECTIONS_UNKNOWN = 0,
MCREQ_COLLECTIONS_SUPPORTED = 1,
MCREQ_COLLECTIONS_UNSUPPORTTED = 2,
} mcreq_collections_support;

/** @brief mask of flags indicating user-allocated buffers */
#define MCREQ_UBUF_FLAGS (MCREQ_F_KEY_NOCOPY | MCREQ_F_VALUE_NOCOPY)
/** @brief mask of flags indicating response state of the packet */
Expand Down Expand Up @@ -425,6 +436,8 @@ typedef struct mc_pipeline_st {
/** Allocator for packet structures */
nb_MGR reqpool;

mcreq_collections_support collections;

/** Optional metrics structure for server */
struct lcb_SERVERMETRICS_st *metrics;
} mc_PIPELINE;
Expand Down Expand Up @@ -690,7 +703,7 @@ uint32_t mcreq_get_bodysize(const mc_PACKET *packet);
*/
uint32_t mcreq_get_size(const mc_PACKET *packet);

uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet);
uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet, int *cid_set);

void mcreq_set_cid(mc_PIPELINE *pipeline, mc_PACKET *packet, uint32_t cid);

Expand All @@ -704,6 +717,8 @@ uint16_t mcreq_get_vbucket(const mc_PACKET *packet);
/** Initializes a single pipeline object */
int mcreq_pipeline_init(mc_PIPELINE *pipeline);

int mcreq_pipeline_support_collections(mc_PIPELINE *pipeline);

/** Cleans up any initialization from pipeline_init */
void mcreq_pipeline_cleanup(mc_PIPELINE *pipeline);

Expand Down
30 changes: 23 additions & 7 deletions src/mcserver/mcserver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,20 @@ bool Server::handle_unknown_collection(MemcachedResponse &resp, mc_PACKET *oldpk
return true;
}

uint32_t cid = mcreq_get_cid(instance, oldpkt);
int cid_set = 0;
uint32_t cid = mcreq_get_cid(instance, oldpkt, &cid_set);
if ((collections == MCREQ_COLLECTIONS_UNSUPPORTTED && cid_set) /* we need to strip collection and retry */
|| (collections == MCREQ_COLLECTIONS_SUPPORTED && !cid_set) /* we need to prepend collection and retry */) {
lcb_log(LOGARGS_T(WARN),
LOGFMT
"UNKNOWN_COLLECTION. Packet=%p (M=0x%x, S=%u, OP=0x%x), CID=%u, collections=%d (set=%d). Retrying",
LOGID_T(), (void *)oldpkt, (int)req.request.magic, oldpkt->opaque, (int)req.request.opcode,
(unsigned)cid, (int)collections, cid_set);
mc_PACKET *newpkt = mcreq_renew_packet(oldpkt);
newpkt->flags &= ~MCREQ_STATE_FLAGS;
instance->retryq->ucadd((mc_EXPACKET *)newpkt, LCB_ERR_TIMEOUT, orig_status);
return true;
}
std::string name = instance->collcache->id_to_name(cid);

packet_wrapper wrapper;
Expand Down Expand Up @@ -1082,14 +1095,17 @@ void Server::handle_connected(lcbio_SOCKET *sock, lcb_STATUS err, lcbio_OSERR sy
sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_CLUSTERMAP_CHANGE_NOTIFICATION_BRIEF);
config_with_known_version =
sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_GET_CLUSTER_CONFIG_WITH_KNOWN_VERSION);
collections = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_COLLECTIONS) ? MCREQ_COLLECTIONS_SUPPORTED
: MCREQ_COLLECTIONS_UNSUPPORTTED;
lcb_log(
LOGARGS_T(TRACE),
R"(<%s:%s> (SRV=%p) Got new KV connection (json=%s, snappy=%s, mt=%s, durability=%s, config_push=%s, config_ver=%s, bucket=%s "%s"%s%s))",
curhost->host, curhost->port, (void *)this, jsonsupport ? "yes" : "no", compsupport ? "yes" : "no",
mutation_tokens ? "yes" : "no", new_durability ? "yes" : "no",
clustermap_change_notification ? "yes" : "no", config_with_known_version ? "yes" : "no",
selected_bucket ? "yes" : "no", selected_bucket ? bucket.c_str() : "-",
try_to_select_bucket ? " selecting " : "", try_to_select_bucket ? settings->bucket : "");
R"(<%s:%s> (SRV=%p) Got new KV connection (collections=%s, json=%s, snappy=%s, mt=%s, durability=%s, config_push=%s, config_ver=%s, bucket=%s "%s"%s%s))",
curhost->host, curhost->port, (void *)this, collections == MCREQ_COLLECTIONS_SUPPORTED ? "yes" : "no",
jsonsupport ? "yes" : "no", compsupport ? "yes" : "no", mutation_tokens ? "yes" : "no",
new_durability ? "yes" : "no", clustermap_change_notification ? "yes" : "no",
config_with_known_version ? "yes" : "no", selected_bucket ? "yes" : "no",
selected_bucket ? bucket.c_str() : "-", try_to_select_bucket ? " selecting " : "",
try_to_select_bucket ? settings->bucket : "");
}

lcbio_CTXPROCS procs{};
Expand Down
18 changes: 10 additions & 8 deletions src/retryq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,13 @@ void RetryQueue::flush(bool throttle)
fail(op, LCB_ERR_NO_MATCHING_SERVER, now);
}
} else {
uint32_t cid = mcreq_get_cid(get_instance(), op->pkt);
int cid_set = 0;
uint32_t cid = mcreq_get_cid(get_instance(), op->pkt, &cid_set);
lcb_log(LOGARGS(this, TRACE),
"Flush PKT=%p to network. retries=%u, cid=%u, opaque=%u, IX=%d, spent=%" PRIu64
"Flush PKT=%p to network. retries=%u, cid=%u (%s), opaque=%u, IX=%d, spent=%" PRIu64
"us, deadline_in=%" PRIu64 "us",
(void *)op->pkt, op->pkt->retries, cid, op->pkt->opaque, srvix, LCB_NS2US(now - op->start),
LCB_NS2US(op->deadline - now));
(void *)op->pkt, op->pkt->retries, cid, cid_set ? "set" : "unset", op->pkt->opaque, srvix,
LCB_NS2US(now - op->start), LCB_NS2US(op->deadline - now));
mc_PIPELINE *newpl = cq->pipelines[srvix];
mcreq_enqueue_packet(newpl, op->pkt);
newpl->flush_start(newpl);
Expand Down Expand Up @@ -422,12 +423,13 @@ void RetryQueue::add(mc_EXPACKET *pkt, const lcb_STATUS err, protocol_binary_res
lcb_list_add_sorted(&schedops, static_cast<SchedNode *>(op), cmpfn_retry);
lcb_list_add_sorted(&tmoops, static_cast<TmoNode *>(op), cmpfn_tmo);

uint32_t cid = mcreq_get_cid(get_instance(), &pkt->base);
int cid_set = 0;
uint32_t cid = mcreq_get_cid(get_instance(), op->pkt, &cid_set);
lcb_log(LOGARGS(this, DEBUG),
"Adding PKT=%p to retry queue. retries=%u, cid=%u, opaque=%u, now=%" PRIu64 "ms, spent=%" PRIu64
"Adding PKT=%p to retry queue. retries=%u, cid=%u (%s), opaque=%u, now=%" PRIu64 "ms, spent=%" PRIu64
"us, deadline_in=%" PRIu64 "us, status=0x%02x, rc=%s",
(void *)pkt, pkt->base.retries, cid, pkt->base.opaque, LCB_NS2MS(now), LCB_NS2US(now - op->start),
LCB_NS2US(op->deadline - now), status, lcb_strerror_short(err));
(void *)pkt, pkt->base.retries, cid, cid_set ? "set" : "unset", pkt->base.opaque, LCB_NS2MS(now),
LCB_NS2US(now - op->start), LCB_NS2US(op->deadline - now), status, lcb_strerror_short(err));
schedule();

if (settings->metrics) {
Expand Down

0 comments on commit ad04729

Please sign in to comment.