diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index 445f20d4601..49e45aa1c26 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -404,9 +404,11 @@ class CommandSlowlog : public Commander { class CommandClient : public Commander { public: Status Parse(const std::vector &args) override { - subcommand_ = util::ToLower(args[1]); - // subcommand: getname id kill list info setname - if ((subcommand_ == "id" || subcommand_ == "getname" || subcommand_ == "list" || subcommand_ == "info") && + CommandParser parser(args, 1); + subcommand_ = util::ToLower(GET_OR_RET(parser.TakeStr())); + // subcommand: getname id kill list info setname unpause + if ((subcommand_ == "id" || subcommand_ == "getname" || subcommand_ == "list" || subcommand_ == "info" || + subcommand_ == "unpause") && args.size() == 2) { return Status::OK(); } @@ -477,7 +479,48 @@ class CommandClient : public Commander { } return Status::OK(); } - return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL ip:port|GETNAME|SETNAME"}; + + // command format: client pause [write|all] + if ((subcommand_ == "pause")) { + if (args.size() != 3 && args.size() != 4) { + return {Status::RedisParseErr, errInvalidSyntax}; + } + + pause_timeout_ms_ = GET_OR_RET(parser.TakeInt()); + + if (parser.EatEqICase("all")) { + pause_type_ = kPauseAll; + } else if (parser.EatEqICase("write")) { + pause_type_ = kPauseWrite; + } else if (!parser.Good()) { // Default mode is to pause all commands + pause_type_ = kPauseAll; + } else { + return {Status::RedisParseErr, errInvalidSyntax}; + } + + return Status::OK(); + } + + if (subcommand_ == "reply") { + if (args.size() != 2 && args.size() != 3) { + return {Status::RedisParseErr, errInvalidSyntax}; + } + + if (parser.EatEqICase("on")) { + reply_type_ = 0; + } else if (parser.EatEqICase("off")) { + reply_type_ = Connection::Flag::kReplyModeOff; + } else if (parser.EatEqICase("skip")) { + reply_type_ = Connection::Flag::kReplyModeSkipNext; + } else { + return {Status::RedisParseErr, errInvalidSyntax}; + } + + return Status::OK(); + } + + return {Status::RedisInvalidCmd, + "Syntax error, try CLIENT LIST|INFO|KILL|PAUSE|UNPAUSE|REPLY ip:port|GETNAME|SETNAME|timeout"}; } Status Execute(Server *srv, Connection *conn, std::string *output) override { @@ -510,6 +553,20 @@ class CommandClient : public Commander { *output = redis::SimpleString("OK"); } return Status::OK(); + } else if (subcommand_ == "pause") { + srv->PauseCommands(pause_type_, pause_timeout_ms_); + return Status::OK(); + } else if (subcommand_ == "unpause") { + srv->UnpauseCommands(); + return Status::OK(); + } else if (subcommand_ == "reply") { + conn->DisableFlag(redis::Connection::Flag::kReplyModeOff); + conn->DisableFlag(redis::Connection::Flag::kReplyModeSkip); + conn->DisableFlag(redis::Connection::Flag::kReplyModeSkipNext); + if (reply_type_ != 0) { + conn->EnableFlag((redis::Connection::Flag)reply_type_); + } + return Status::OK(); } return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL ip:port|GETNAME|SETNAME"}; @@ -521,6 +578,9 @@ class CommandClient : public Commander { std::string subcommand_; bool skipme_ = false; int64_t kill_type_ = 0; + int64_t pause_type_ = 0; + int64_t pause_timeout_ms_ = 0; + int64_t reply_type_ = 0; uint64_t id_ = 0; bool new_format_ = true; }; diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index f7276249e4c..5053ec3a088 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -130,6 +130,17 @@ void Connection::OnEvent(bufferevent *bev, int16_t events) { } void Connection::Reply(const std::string &msg) { + // Do not send replies for both SKIP and OFF modes + if (IsFlagEnabled(Flag::kReplyModeOff) || IsFlagEnabled(Flag::kReplyModeSkip)) { + return; + } + + // Skip starting from the next reply for SKIP mode + if (IsFlagEnabled(Flag::kReplyModeSkipNext)) { + DisableFlag(Flag::kReplyModeSkipNext); + EnableFlag(Flag::kReplyModeSkip); + } + owner_->srv->stats.IncrOutboundBytes(msg.size()); redis::Reply(bufferevent_get_output(bev_), msg); } @@ -368,6 +379,17 @@ static bool IsCmdForIndexing(const CommandAttributes *attr) { } void Connection::ExecuteCommands(std::deque *to_process_cmds) { + // Do not execute commands if we are in pause mode + if (srv_->GetCommandPauseType() == kPauseAll || + (srv_->GetCommandPauseType() == kPauseWrite && this->IsFlagEnabled(kPaused))) { + return; + } else { + // Unpause the client when the pause mode ends + if (this->IsFlagEnabled(kPaused)) { + this->DisableFlag(kPaused); + } + } + const Config *config = srv_->GetConfig(); std::string reply; std::string password = config->requirepass; @@ -396,6 +418,16 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { auto cmd_name = attributes->name; auto cmd_flags = attributes->GenerateFlags(cmd_tokens); + // Start pausing the client when we first receive the write command + if (!(this->GetClientType() == kTypeSlave) && srv_->GetCommandPauseType() == kPauseWrite && + (cmd_flags & kCmdWrite)) { + this->EnableFlag(kPaused); // Pause the client + to_process_cmds->emplace_back(cmd_tokens); + + Reply(redis::SimpleString("OK")); // Should return OK ASAP + break; // Do not process remanining commands for now + } + if (GetNamespace().empty()) { if (!password.empty()) { if (cmd_name != "auth" && cmd_name != "hello") { diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h index 83c57c013b4..91ab810fd66 100644 --- a/src/server/redis_connection.h +++ b/src/server/redis_connection.h @@ -48,6 +48,10 @@ class Connection : public EvbufCallbackBase { kMultiExec = 1 << 8, kReadOnly = 1 << 9, kAsking = 1 << 10, + kReplyModeOff = 1 << 11, + kReplyModeSkip = 1 << 12, + kReplyModeSkipNext = 1 << 13, + kPaused = 1 << 14 }; explicit Connection(bufferevent *bev, Worker *owner); diff --git a/src/server/server.cc b/src/server/server.cc index 3381ffbc7ed..0b98a8d18bd 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -1313,6 +1313,15 @@ void Server::GetInfo(const std::string &ns, const std::string §ion, std::str *info = string_stream.str(); } +int64_t Server::GetCommandPauseType() { + // Stop pausing command if the timeout has reached + if (pause_end_timestamp_ms_.load(std::memory_order_relaxed) <= util::GetTimeStampMS()) { + UnpauseCommands(); + } + + return (int64_t)pause_type_.load(std::memory_order_relaxed); +} + std::string Server::GetRocksDBStatsJson() const { jsoncons::json stats_json; @@ -1798,6 +1807,13 @@ Status Server::ExecPropagatedCommand(const std::vector &tokens) { return Status::OK(); } +void Server::PauseCommands(uint64_t type, uint64_t timeout_ms) { + pause_type_.store((int64_t)type, std::memory_order_relaxed); + pause_end_timestamp_ms_.store((int64_t)(util::GetTimeStampMS() + timeout_ms), std::memory_order_relaxed); +} + +void Server::UnpauseCommands() { pause_type_.store((int64_t)kPauseNone, std::memory_order_relaxed); } + // AdjustOpenFilesLimit only try best to raise the max open files according to // the max clients and RocksDB open file configuration. It also reserves a number // of file descriptors(128) for extra operations of persistence, listening sockets, diff --git a/src/server/server.h b/src/server/server.h index 1bb639ba6b0..b9974b987cb 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -140,6 +140,12 @@ enum ClientType { kTypeSlave = (1ULL << 3), // slave client }; +enum ClientCommandPauseType { + kPauseNone = (1ULL << 0), // pause no commands + kPauseWrite = (1ULL << 1), // pause write commands + kPauseAll = (1ULL << 2) // pause all commands +}; + enum ServerLogType { kServerLogNone, kReplIdLog }; enum class AuthResult { @@ -242,6 +248,7 @@ class Server { void GetCommandsStatsInfo(std::string *info); void GetClusterInfo(std::string *info); void GetInfo(const std::string &ns, const std::string §ion, std::string *info); + int64_t GetCommandPauseType(); std::string GetRocksDBStatsJson() const; ReplState GetReplicationState(); @@ -284,6 +291,8 @@ class Server { Status Propagate(const std::string &channel, const std::vector &tokens) const; Status ExecPropagatedCommand(const std::vector &tokens); Status ExecPropagateScriptCommand(const std::vector &tokens); + void PauseCommands(uint64_t type, uint64_t timeout_ms); + void UnpauseCommands(); void SetCurrentConnection(redis::Connection *conn) { curr_connection_ = conn; } redis::Connection *GetCurrentConnection() { return curr_connection_; } @@ -340,6 +349,8 @@ class Server { Config *config_ = nullptr; std::string last_random_key_cursor_; std::mutex last_random_key_cursor_mu_; + std::atomic pause_type_{kPauseNone}; + std::atomic pause_end_timestamp_ms_{0}; std::atomic lua_;