diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 091d1247a8..f311174359 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -35,6 +35,7 @@ #include "common/common.h" #include "common/duplication_common.h" #include "duplication_internal_types.h" +#include "gutil/map_util.h" #include "pegasus/client.h" #include "pegasus_key_schema.h" #include "rpc/rpc_message.h" @@ -238,6 +239,13 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb auto batch_request = std::make_unique(); uint batch_count = 0; uint batch_bytes = 0; + // The rpc codes should be ignored: + // - RPC_RRDB_RRDB_DUPLICATE: Now not supports duplicating the deuplicate mutations to the + // remote cluster. + // - RPC_RRDB_RRDB_BULK_LOAD: Now not supports the control flow RPC. + const static std::set ingnored_rpc_code = {dsn::apps::RPC_RRDB_RRDB_DUPLICATE, + dsn::apps::RPC_RRDB_RRDB_BULK_LOAD}; + for (auto mut : muts) { // mut: 0=timestamp, 1=rpc_code, 2=raw_message batch_count++; @@ -245,21 +253,28 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb dsn::blob raw_message = std::get<2>(mut); auto dreq = std::make_unique(); - if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) { - // ignore if it is a DUPLICATE - // Because DUPLICATE comes from other clusters should not be forwarded to any other - // destinations. A DUPLICATE is meant to be targeting only one cluster. + if (gutil::ContainsKey(ingnored_rpc_code, rpc_code)) { + // It it do not recommend to use bulkload and normal writing in the same app, + // it may also cause inconsistency between actual data and expected data + // And duplication will not dup the data of bulkload to backup clusters, + // if you want to force use it, you can permit this risk in you own way on the clusters + // you maintenance. For example, you can do bulkload both on master-clusters and + // backup-cluster (with duplication enable) at the same time, but this will inevitably + // cause data inconsistency problems. + if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) { + LOG_DEBUG_PREFIX("Ignore sending bulkload rpc when doing duplication"); + } continue; - } else { - dsn::apps::duplicate_entry entry; - entry.__set_raw_message(raw_message); - entry.__set_task_code(rpc_code); - entry.__set_timestamp(std::get<0>(mut)); - entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id()); - batch_request->entries.emplace_back(std::move(entry)); - batch_bytes += raw_message.length(); } + dsn::apps::duplicate_entry entry; + entry.__set_raw_message(raw_message); + entry.__set_task_code(rpc_code); + entry.__set_timestamp(std::get<0>(mut)); + entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id()); + batch_request->entries.emplace_back(std::move(entry)); + batch_bytes += raw_message.length(); + if (batch_count == muts.size() || batch_bytes >= FLAGS_duplicate_log_batch_bytes || batch_bytes >= dsn::replication::FLAGS_dup_max_allowed_write_size) { // since all the plog's mutations of replica belong to same gpid though the hash of