diff --git a/src/handler.cc b/src/handler.cc index a37633ec2..d68eda23b 100644 --- a/src/handler.cc +++ b/src/handler.cc @@ -342,7 +342,7 @@ template void invoke_callback(const mc_PACKET *pkt, lcb_INSTANCE *instance, T *resp, lcb_CALLBACK_TYPE cbtype) { if (instance != nullptr) { - std::string collection_path = instance->collcache->id_to_name(mcreq_get_cid(instance, pkt)); + std::string collection_path = instance->collcache->id_to_name(mcreq_get_cid(instance, pkt, NULL)); if (!collection_path.empty()) { size_t dot = collection_path.find('.'); if (dot != std::string::npos) { diff --git a/src/mc/mcreq.c b/src/mc/mcreq.c index 5a3705f2f..cd03e4a81 100644 --- a/src/mc/mcreq.c +++ b/src/mc/mcreq.c @@ -109,6 +109,7 @@ lcb_STATUS mcreq_reserve_key(mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t h /* copy collection ID prefix */ if (ncid) { memcpy(SPAN_BUFFER(&packet->kh_span) + hdrsize, cid, ncid); + packet->flags |= MCREQ_F_HASCID; } /** * Copy the key into the packet starting at the extras end @@ -243,10 +244,89 @@ void mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar); } +static void check_collection_id(mc_PIPELINE *pipeline, mc_PACKET *packet) +{ + if ((packet->flags & MCREQ_F_NOCID) != 0) { + return; + } + + // before adding packet to pipeline lets see if we need add or remove collection id prefix + char *header_and_key = SPAN_BUFFER(&packet->kh_span); + protocol_binary_request_header *request = (protocol_binary_request_header *)header_and_key; + + uint16_t key_length; + uint8_t flexible_extras_length = 0; + + if (request->request.magic == PROTOCOL_BINARY_AREQ) { + flexible_extras_length = request->request.keylen & 0xff; + key_length = request->request.keylen >> 8; + } else { + key_length = ntohs(request->request.keylen); + } + if (key_length == 0) { + return; + } + + char *key = header_and_key + sizeof(*request) + request->request.extlen + flexible_extras_length; + uint32_t collection_id = 0; + + uint16_t collection_id_length = 0; + if ((packet->flags & MCREQ_F_HASCID) != 0) { + collection_id_length = (uint16_t)leb128_decode((const uint8_t *)key, key_length, &collection_id); + } + + switch (pipeline->collections) { + case MCREQ_COLLECTIONS_SUPPORTED: + // the pipeline had negotiated collections feature with kv engine, so we have to encode collection id + // prefix + if (collection_id_length == 0) { + // but collection id prefix was not encoded, we should assume default collection and prepend zero as + // a collection identifier + mcreq_set_cid(pipeline, packet, 0); + } + break; + + case MCREQ_COLLECTIONS_UNSUPPORTTED: + // the pipeline been told that the kv engine instance does not support collections + if (collection_id_length != 0) { + // but the packet has encoded collection id + if (collection_id == 0) { + // strip it if it is default collection + request->request.bodylen = htonl(ntohl(request->request.bodylen) - collection_id_length); + uint16_t new_key_length = key_length - collection_id_length; + if (request->request.magic == PROTOCOL_BINARY_AREQ) { + request->request.keylen = (new_key_length << 8U) | (flexible_extras_length & 0xffU); + } else { + request->request.keylen = htons(new_key_length); + } + + // shift the key content to the left + for (int i = 0; i < new_key_length; ++i) { + key[i] = key[i + collection_id_length]; + } + } else { + fprintf( + stderr, + "custom collection id has been dispatched to the node, that does not support collections\n"); + // TODO log error + } + } + break; + + default: + // the pipeline hadn't completed handshake yet, so trust global settings, and let operation be fixed + // when it will be retried in case of misprediction. + fprintf(stderr, "collections has not been negotiated for the pipeline yet\n"); + break; + } +} + void mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet) { nb_SPAN *vspan = &packet->u_value.single; sllist_append(&pipeline->requests, &packet->slnode); + + check_collection_id(pipeline, packet); netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span, packet); MC_INCR_METRIC(pipeline, bytes_queued, packet->kh_span.size); @@ -593,9 +673,10 @@ void mcreq_set_cid(mc_PIPELINE *pipeline, mc_PACKET *packet, uint32_t cid) netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span); } CREATE_STANDALONE_SPAN(&packet->kh_span, kdata, new_size); + packet->flags |= MCREQ_F_HASCID; } -uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet) +uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet, int *cid_set) { uint8_t ffext = 0; uint16_t nk = 0; @@ -605,6 +686,10 @@ uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet) char *kh = SPAN_BUFFER(&packet->kh_span); char *k = NULL; + if (cid_set != NULL) { + *cid_set = 0; + } + memcpy(&req, kh, sizeof(req)); if (req.request.magic == PROTOCOL_BINARY_AREQ) { ffext = req.request.keylen & 0xff; @@ -616,6 +701,9 @@ uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet) if ((packet->flags & MCREQ_F_NOCID) == 0 && instance && LCBT_SETTING(instance, use_collections)) { ncid = leb128_decode((uint8_t *)k, nk, &cid); if (ncid) { + if (cid_set != NULL) { + *cid_set = 1; + } return cid; } } @@ -702,6 +790,7 @@ int mcreq_pipeline_init(mc_PIPELINE *pipeline) pipeline->index = 0; memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued); pipeline->buf_done_callback = NULL; + pipeline->collections = MCREQ_COLLECTIONS_UNKNOWN; netbuf_default_settings(&settings); diff --git a/src/mc/mcreq.h b/src/mc/mcreq.h index e3b1a42b0..74eb9c203 100644 --- a/src/mc/mcreq.h +++ b/src/mc/mcreq.h @@ -295,9 +295,20 @@ typedef enum { * The request has "replace" store semantics. * Utilized during error translation to map DOCUMENT_EXISTS to CAS_MISMATCH (see make_error() in handler.cc) */ - MCREQ_F_REPLACE_SEMANTICS = 1u << 11u + MCREQ_F_REPLACE_SEMANTICS = 1u << 11u, + + /** + * collection id is prepended to the key + */ + MCREQ_F_HASCID = 1u << 12u, } mcreq_flags; +typedef enum { + MCREQ_COLLECTIONS_UNKNOWN = 0, + MCREQ_COLLECTIONS_SUPPORTED = 1, + MCREQ_COLLECTIONS_UNSUPPORTTED = 2, +} mcreq_collections_support; + /** @brief mask of flags indicating user-allocated buffers */ #define MCREQ_UBUF_FLAGS (MCREQ_F_KEY_NOCOPY | MCREQ_F_VALUE_NOCOPY) /** @brief mask of flags indicating response state of the packet */ @@ -425,6 +436,8 @@ typedef struct mc_pipeline_st { /** Allocator for packet structures */ nb_MGR reqpool; + mcreq_collections_support collections; + /** Optional metrics structure for server */ struct lcb_SERVERMETRICS_st *metrics; } mc_PIPELINE; @@ -690,7 +703,7 @@ uint32_t mcreq_get_bodysize(const mc_PACKET *packet); */ uint32_t mcreq_get_size(const mc_PACKET *packet); -uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet); +uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet, int *cid_set); void mcreq_set_cid(mc_PIPELINE *pipeline, mc_PACKET *packet, uint32_t cid); @@ -704,6 +717,8 @@ uint16_t mcreq_get_vbucket(const mc_PACKET *packet); /** Initializes a single pipeline object */ int mcreq_pipeline_init(mc_PIPELINE *pipeline); +int mcreq_pipeline_support_collections(mc_PIPELINE *pipeline); + /** Cleans up any initialization from pipeline_init */ void mcreq_pipeline_cleanup(mc_PIPELINE *pipeline); diff --git a/src/mcserver/mcserver.cc b/src/mcserver/mcserver.cc index ed81d7bdb..540efc3fd 100644 --- a/src/mcserver/mcserver.cc +++ b/src/mcserver/mcserver.cc @@ -273,7 +273,20 @@ bool Server::handle_unknown_collection(MemcachedResponse &resp, mc_PACKET *oldpk return true; } - uint32_t cid = mcreq_get_cid(instance, oldpkt); + int cid_set = 0; + uint32_t cid = mcreq_get_cid(instance, oldpkt, &cid_set); + if ((collections == MCREQ_COLLECTIONS_UNSUPPORTTED && cid_set) /* we need to strip collection and retry */ + || (collections == MCREQ_COLLECTIONS_SUPPORTED && !cid_set) /* we need to prepend collection and retry */) { + lcb_log(LOGARGS_T(WARN), + LOGFMT + "UNKNOWN_COLLECTION. Packet=%p (M=0x%x, S=%u, OP=0x%x), CID=%u, collections=%d (set=%d). Retrying", + LOGID_T(), (void *)oldpkt, (int)req.request.magic, oldpkt->opaque, (int)req.request.opcode, + (unsigned)cid, (int)collections, cid_set); + mc_PACKET *newpkt = mcreq_renew_packet(oldpkt); + newpkt->flags &= ~MCREQ_STATE_FLAGS; + instance->retryq->ucadd((mc_EXPACKET *)newpkt, LCB_ERR_TIMEOUT, orig_status); + return true; + } std::string name = instance->collcache->id_to_name(cid); packet_wrapper wrapper; @@ -1082,14 +1095,17 @@ void Server::handle_connected(lcbio_SOCKET *sock, lcb_STATUS err, lcbio_OSERR sy sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_CLUSTERMAP_CHANGE_NOTIFICATION_BRIEF); config_with_known_version = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_GET_CLUSTER_CONFIG_WITH_KNOWN_VERSION); + collections = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_COLLECTIONS) ? MCREQ_COLLECTIONS_SUPPORTED + : MCREQ_COLLECTIONS_UNSUPPORTTED; lcb_log( LOGARGS_T(TRACE), - R"(<%s:%s> (SRV=%p) Got new KV connection (json=%s, snappy=%s, mt=%s, durability=%s, config_push=%s, config_ver=%s, bucket=%s "%s"%s%s))", - curhost->host, curhost->port, (void *)this, jsonsupport ? "yes" : "no", compsupport ? "yes" : "no", - mutation_tokens ? "yes" : "no", new_durability ? "yes" : "no", - clustermap_change_notification ? "yes" : "no", config_with_known_version ? "yes" : "no", - selected_bucket ? "yes" : "no", selected_bucket ? bucket.c_str() : "-", - try_to_select_bucket ? " selecting " : "", try_to_select_bucket ? settings->bucket : ""); + R"(<%s:%s> (SRV=%p) Got new KV connection (collections=%s, json=%s, snappy=%s, mt=%s, durability=%s, config_push=%s, config_ver=%s, bucket=%s "%s"%s%s))", + curhost->host, curhost->port, (void *)this, collections == MCREQ_COLLECTIONS_SUPPORTED ? "yes" : "no", + jsonsupport ? "yes" : "no", compsupport ? "yes" : "no", mutation_tokens ? "yes" : "no", + new_durability ? "yes" : "no", clustermap_change_notification ? "yes" : "no", + config_with_known_version ? "yes" : "no", selected_bucket ? "yes" : "no", + selected_bucket ? bucket.c_str() : "-", try_to_select_bucket ? " selecting " : "", + try_to_select_bucket ? settings->bucket : ""); } lcbio_CTXPROCS procs{}; diff --git a/src/retryq.cc b/src/retryq.cc index 5aaa48be2..926b2176c 100644 --- a/src/retryq.cc +++ b/src/retryq.cc @@ -273,12 +273,13 @@ void RetryQueue::flush(bool throttle) fail(op, LCB_ERR_NO_MATCHING_SERVER, now); } } else { - uint32_t cid = mcreq_get_cid(get_instance(), op->pkt); + int cid_set = 0; + uint32_t cid = mcreq_get_cid(get_instance(), op->pkt, &cid_set); lcb_log(LOGARGS(this, TRACE), - "Flush PKT=%p to network. retries=%u, cid=%u, opaque=%u, IX=%d, spent=%" PRIu64 + "Flush PKT=%p to network. retries=%u, cid=%u (%s), opaque=%u, IX=%d, spent=%" PRIu64 "us, deadline_in=%" PRIu64 "us", - (void *)op->pkt, op->pkt->retries, cid, op->pkt->opaque, srvix, LCB_NS2US(now - op->start), - LCB_NS2US(op->deadline - now)); + (void *)op->pkt, op->pkt->retries, cid, cid_set ? "set" : "unset", op->pkt->opaque, srvix, + LCB_NS2US(now - op->start), LCB_NS2US(op->deadline - now)); mc_PIPELINE *newpl = cq->pipelines[srvix]; mcreq_enqueue_packet(newpl, op->pkt); newpl->flush_start(newpl); @@ -422,12 +423,13 @@ void RetryQueue::add(mc_EXPACKET *pkt, const lcb_STATUS err, protocol_binary_res lcb_list_add_sorted(&schedops, static_cast(op), cmpfn_retry); lcb_list_add_sorted(&tmoops, static_cast(op), cmpfn_tmo); - uint32_t cid = mcreq_get_cid(get_instance(), &pkt->base); + int cid_set = 0; + uint32_t cid = mcreq_get_cid(get_instance(), op->pkt, &cid_set); lcb_log(LOGARGS(this, DEBUG), - "Adding PKT=%p to retry queue. retries=%u, cid=%u, opaque=%u, now=%" PRIu64 "ms, spent=%" PRIu64 + "Adding PKT=%p to retry queue. retries=%u, cid=%u (%s), opaque=%u, now=%" PRIu64 "ms, spent=%" PRIu64 "us, deadline_in=%" PRIu64 "us, status=0x%02x, rc=%s", - (void *)pkt, pkt->base.retries, cid, pkt->base.opaque, LCB_NS2MS(now), LCB_NS2US(now - op->start), - LCB_NS2US(op->deadline - now), status, lcb_strerror_short(err)); + (void *)pkt, pkt->base.retries, cid, cid_set ? "set" : "unset", pkt->base.opaque, LCB_NS2MS(now), + LCB_NS2US(now - op->start), LCB_NS2US(op->deadline - now), status, lcb_strerror_short(err)); schedule(); if (settings->metrics) {