Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(duplication): deal with bulkload dup crash #2101

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions src/server/pegasus_mutation_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "common/common.h"
#include "common/duplication_common.h"
#include "duplication_internal_types.h"
#include "gutil/map_util.h"
#include "pegasus/client.h"
#include "pegasus_key_schema.h"
#include "rpc/rpc_message.h"
Expand Down Expand Up @@ -238,28 +239,42 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
auto batch_request = std::make_unique<dsn::apps::duplicate_request>();
uint batch_count = 0;
uint batch_bytes = 0;
// The rpc codes should be ignored:
// - RPC_RRDB_RRDB_DUPLICATE: Now not supports duplicating the deuplicate mutations to the
// remote cluster.
// - RPC_RRDB_RRDB_BULK_LOAD: Now not supports the control flow RPC.
const static std::set<int> ingnored_rpc_code = {dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
dsn::apps::RPC_RRDB_RRDB_BULK_LOAD};

for (auto mut : muts) {
// mut: 0=timestamp, 1=rpc_code, 2=raw_message
batch_count++;
dsn::task_code rpc_code = std::get<1>(mut);
dsn::blob raw_message = std::get<2>(mut);
auto dreq = std::make_unique<dsn::apps::duplicate_request>();

if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
// ignore if it is a DUPLICATE
// Because DUPLICATE comes from other clusters should not be forwarded to any other
// destinations. A DUPLICATE is meant to be targeting only one cluster.
if (gutil::ContainsKey(ingnored_rpc_code, rpc_code)) {
// It it do not recommend to use bulkload and normal writing in the same app,
// it may also cause inconsistency between actual data and expected data
// And duplication will not dup the data of bulkload to backup clusters,
// if you want to force use it, you can permit this risk in you own way on the clusters
// you maintenance. For example, you can do bulkload both on master-clusters and
// backup-cluster (with duplication enable) at the same time, but this will inevitably
// cause data inconsistency problems.
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
LOG_DEBUG_PREFIX("Ignore sending bulkload rpc when doing duplication");
}
continue;
} else {
dsn::apps::duplicate_entry entry;
entry.__set_raw_message(raw_message);
entry.__set_task_code(rpc_code);
entry.__set_timestamp(std::get<0>(mut));
entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id());
batch_request->entries.emplace_back(std::move(entry));
batch_bytes += raw_message.length();
}

dsn::apps::duplicate_entry entry;
entry.__set_raw_message(raw_message);
entry.__set_task_code(rpc_code);
entry.__set_timestamp(std::get<0>(mut));
entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id());
batch_request->entries.emplace_back(std::move(entry));
batch_bytes += raw_message.length();

if (batch_count == muts.size() || batch_bytes >= FLAGS_duplicate_log_batch_bytes ||
batch_bytes >= dsn::replication::FLAGS_dup_max_allowed_write_size) {
// since all the plog's mutations of replica belong to same gpid though the hash of
Expand Down