From 2c809b96196fbd33ca950456e847233ffc419503 Mon Sep 17 00:00:00 2001 From: lqx Date: Tue, 17 Dec 2024 20:23:59 +0800 Subject: [PATCH 1/3] refactor:Command objects use memory pools --- include/acl.h | 4 +- include/pika_acl.h | 3 + include/pika_admin.h | 102 ++++++++++++++++++++++++++ include/pika_bit.h | 15 ++++ include/pika_client_conn.h | 8 ++- include/pika_cmd_table_manager.h | 5 ++ include/pika_command.h | 2 + include/pika_consensus.h | 6 +- include/pika_dispatch_thread.h | 2 +- include/pika_geo.h | 18 +++++ include/pika_hash.h | 54 ++++++++++++++ include/pika_hyperloglog.h | 9 +++ include/pika_kv.h | 105 +++++++++++++++++++++++++++ include/pika_list.h | 48 +++++++++++++ include/pika_pubsub.h | 18 +++++ include/pika_rm.h | 2 +- include/pika_set.h | 45 ++++++++++++ include/pika_slot_command.h | 42 +++++++++++ include/pika_stream.h | 24 +++++++ include/pika_transaction.h | 15 ++++ include/pika_zset.h | 69 ++++++++++++++++++ src/acl.cc | 6 +- src/net/include/net_conn.h | 5 +- src/net/include/server_thread.h | 2 +- src/net/src/dispatch_thread.cc | 10 +-- src/net/src/dispatch_thread.h | 2 +- src/net/src/memoey_pool.h | 118 +++++++++++++++++++++++++++++++ src/net/src/worker_thread.cc | 11 +-- src/net/src/worker_thread.h | 6 +- src/pika_acl.cc | 2 +- src/pika_admin.cc | 2 +- src/pika_client_conn.cc | 47 ++++++++---- src/pika_cmd_table_manager.cc | 6 ++ src/pika_command.cc | 2 +- src/pika_consensus.cc | 10 +-- src/pika_dispatch_thread.cc | 5 +- src/pika_migrate_thread.cc | 2 +- src/pika_rm.cc | 2 +- src/pika_server.cc | 3 +- 39 files changed, 786 insertions(+), 51 deletions(-) create mode 100644 src/net/src/memoey_pool.h diff --git a/include/acl.h b/include/acl.h index 77bd5ba8a3..ca6baa95cf 100644 --- a/include/acl.h +++ b/include/acl.h @@ -149,7 +149,7 @@ class AclSelector { void ACLDescribeSelector(std::vector& vector); - AclDeniedCmd CheckCanExecCmd(std::shared_ptr& cmd, int8_t subCmdIndex, const std::vector& keys, + AclDeniedCmd CheckCanExecCmd(Cmd* cmd, int8_t subCmdIndex, const std::vector& keys, std::string* errKey); bool SetSelectorCommandBitsForCategory(const std::string& categoryName, bool allow); @@ -281,7 +281,7 @@ class User { std::vector AllChannelKey(); // check the user can exec the cmd - AclDeniedCmd CheckUserPermission(std::shared_ptr& cmd, const PikaCmdArgsType& argv, int8_t& subCmdIndex, + AclDeniedCmd CheckUserPermission(Cmd* cmd, const PikaCmdArgsType& argv, int8_t& subCmdIndex, std::string* errKey); private: diff --git a/include/pika_acl.h b/include/pika_acl.h index 8d830581f8..11f024111f 100644 --- a/include/pika_acl.h +++ b/include/pika_acl.h @@ -23,6 +23,9 @@ class PikaAclCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PikaAclCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; diff --git a/include/pika_admin.h b/include/pika_admin.h index 1b1aa1bad3..4e9e476014 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -31,6 +31,9 @@ class SlaveofCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlaveofCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string master_ip_; @@ -52,6 +55,9 @@ class DbSlaveofCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new DbSlaveofCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string db_name_; @@ -76,6 +82,9 @@ class AuthCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new AuthCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -89,6 +98,9 @@ class BgsaveCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new BgsaveCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -104,6 +116,9 @@ class CompactCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new CompactCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -121,6 +136,9 @@ class CompactRangeCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new CompactRangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -142,6 +160,9 @@ class PurgelogstoCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PurgelogstoCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: uint32_t num_ = 0; @@ -156,6 +177,9 @@ class PingCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PingCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -168,6 +192,9 @@ class SelectCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SelectCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -184,6 +211,9 @@ class FlushallCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new FlushallCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } bool FlushAllWithoutLock(); void DoBinlog() override; void DoBinlogByDB(const std::shared_ptr& sync_db); @@ -210,6 +240,9 @@ class FlushdbCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new FlushdbCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } std::string GetFlushDBname() { return db_name_; } void DoBinlog() override; bool DoWithoutLock(); @@ -236,6 +269,9 @@ class ClientCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ClientCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: const static std::string KILLTYPE_NORMAL; @@ -270,6 +306,9 @@ class InfoCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new InfoCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void Execute() override; private: @@ -323,6 +362,9 @@ class ShutdownCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ShutdownCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -338,6 +380,9 @@ class ConfigCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ConfigCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void Execute() override; private: @@ -358,6 +403,9 @@ class MonitorCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new MonitorCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -371,6 +419,9 @@ class DbsizeCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new DbsizeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -383,6 +434,9 @@ class TimeCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new TimeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -395,6 +449,9 @@ class LastsaveCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new LastsaveCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -408,6 +465,9 @@ class DelbackupCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new DelbackupCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -420,6 +480,9 @@ class EchoCmd : public Cmd { void Do() override; void Split(const HintKeys& hint_keys) override {}; Cmd* Clone() override { return new EchoCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string body_; @@ -433,6 +496,9 @@ class ScandbCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ScandbCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: storage::DataType type_ = storage::DataType::kAll; @@ -449,6 +515,9 @@ class SlowlogCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlowlogCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: int64_t number_ = 10; @@ -468,6 +537,9 @@ class PaddingCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PaddingCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -484,6 +556,9 @@ class PKPatternMatchDelCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKPatternMatchDelCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: @@ -503,6 +578,9 @@ class DummyCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new DummyCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -515,6 +593,9 @@ class QuitCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new QuitCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -527,6 +608,9 @@ class HelloCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HelloCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -540,6 +624,9 @@ class DiskRecoveryCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new DiskRecoveryCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -554,6 +641,9 @@ class ClearReplicationIDCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ClearReplicationIDCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -566,6 +656,9 @@ class DisableWalCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new DisableWalCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -579,6 +672,9 @@ class CacheCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new CacheCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: CacheCondition condition_; @@ -597,6 +693,9 @@ class ClearCacheCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ClearCacheCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -610,6 +709,9 @@ class CommandCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new CommandCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } class CommandFieldCompare { public: diff --git a/include/pika_bit.h b/include/pika_bit.h index 94e7767b16..c79aa7473d 100644 --- a/include/pika_bit.h +++ b/include/pika_bit.h @@ -31,6 +31,9 @@ class BitGetCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new BitGetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -58,6 +61,9 @@ class BitSetCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new BitSetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -88,6 +94,9 @@ class BitCountCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new BitCountCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -120,6 +129,9 @@ class BitPosCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new BitPosCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -162,6 +174,9 @@ class BitOpCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new BitOpCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 3124d2036c..3224d8a253 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -8,10 +8,12 @@ #include #include +#include #include "acl.h" #include "include/pika_command.h" #include "include/pika_define.h" +#include "net/src/memoey_pool.h" // TODO: stat time costing in write out data to connfd struct TimeStat { @@ -114,6 +116,8 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr time_stat_; + void SetMemoryPool(net::MemoryPool *memory_pool) override{ memory_pool_ = memory_pool; } + private: net::ServerThread* const server_thread_; std::string current_db_; @@ -127,7 +131,9 @@ class PikaClientConn : public net::RedisConn { bool authenticated_ = false; std::shared_ptr user_; - std::shared_ptr DoCmd(const PikaCmdArgsType& argv, const std::string& opt, + net::MemoryPool *memory_pool_; + + std::variant, Cmd*> DoCmd(const PikaCmdArgsType& argv, const std::string& opt, const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration); diff --git a/include/pika_cmd_table_manager.h b/include/pika_cmd_table_manager.h index 8177fa63b9..bfe5f425c8 100644 --- a/include/pika_cmd_table_manager.h +++ b/include/pika_cmd_table_manager.h @@ -32,6 +32,7 @@ class PikaCmdTableManager { void InitCmdTable(void); void RenameCommand(const std::string before, const std::string after); std::shared_ptr GetCmd(const std::string& opt); + Cmd* GetRawCmd(const std::string& opt); bool CmdExist(const std::string& cmd) const; CmdTable* GetCmdTable(); uint32_t GetMaxCmdId(); @@ -43,6 +44,8 @@ class PikaCmdTableManager { */ std::unordered_map* GetCommandStatMap(); + size_t GetMaxCmdSize() const { return maxCmdSize; } + private: std::shared_ptr NewCommand(const std::string& opt); @@ -60,5 +63,7 @@ class PikaCmdTableManager { * Info Commandstats used */ std::unordered_map cmdstat_map_; + + size_t maxCmdSize = 0; }; #endif diff --git a/include/pika_command.h b/include/pika_command.h index c132eae9c5..b9720d65cd 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -19,6 +19,7 @@ #include "pstd/include/pstd_string.h" #include "net/src/dispatch_thread.h" +#include "net/src/memoey_pool.h" class SyncMasterDB; class SyncSlaveDB; @@ -529,6 +530,7 @@ class Cmd : public std::enable_shared_from_this { virtual void DoUpdateCache() {} virtual void ReadCache() {} virtual Cmd* Clone() = 0; + virtual Cmd* Clone(net::MemoryPool* pool) = 0; // used for execute multikey command into different slots virtual void Split(const HintKeys& hint_keys) = 0; virtual void Merge() = 0; diff --git a/include/pika_consensus.h b/include/pika_consensus.h index bb774b5e3b..cdf3993993 100644 --- a/include/pika_consensus.h +++ b/include/pika_consensus.h @@ -109,7 +109,7 @@ class ConsensusCoordinator { // invoked by dbsync process pstd::Status Reset(const LogOffset& offset); - pstd::Status ProposeLog(const std::shared_ptr& cmd_ptr); + pstd::Status ProposeLog(Cmd* cmd_ptr); pstd::Status UpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end); pstd::Status AddSlaveNode(const std::string& ip, int port, int session_id); pstd::Status RemoveSlaveNode(const std::string& ip, int port); @@ -167,8 +167,8 @@ class ConsensusCoordinator { private: pstd::Status TruncateTo(const LogOffset& offset); - pstd::Status InternalAppendLog(const std::shared_ptr& cmd_ptr); - pstd::Status InternalAppendBinlog(const std::shared_ptr& cmd_ptr); + pstd::Status InternalAppendLog(Cmd* cmd_ptr); + pstd::Status InternalAppendBinlog(Cmd* cmd_ptr); void InternalApply(const MemLog::LogItem& log); void InternalApplyFollower(const std::shared_ptr& cmd_ptr); diff --git a/include/pika_dispatch_thread.h b/include/pika_dispatch_thread.h index 01a6fe96b0..f4523e1117 100644 --- a/include/pika_dispatch_thread.h +++ b/include/pika_dispatch_thread.h @@ -11,7 +11,7 @@ class PikaDispatchThread { public: PikaDispatchThread(std::set& ips, int port, int work_num, int cron_interval, int queue_limit, - int max_conn_rbuf_size); + int max_conn_rbuf_size, size_t memory_pool_page_size); ~PikaDispatchThread(); int StartThread(); void StopThread(); diff --git a/include/pika_geo.h b/include/pika_geo.h index 70b287da03..7a1df8391b 100644 --- a/include/pika_geo.h +++ b/include/pika_geo.h @@ -63,6 +63,9 @@ class GeoAddCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new GeoAddCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -83,6 +86,9 @@ class GeoPosCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new GeoPosCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -103,6 +109,9 @@ class GeoDistCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new GeoDistCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, first_pos_, second_pos_, unit_; @@ -122,6 +131,9 @@ class GeoHashCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override{}; Cmd* Clone() override { return new GeoHashCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -137,6 +149,9 @@ class GeoRadiusCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new GeoRadiusCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -163,6 +178,9 @@ class GeoRadiusByMemberCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new GeoRadiusByMemberCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; diff --git a/include/pika_hash.h b/include/pika_hash.h index 1362040682..1c0b6dcdc0 100644 --- a/include/pika_hash.h +++ b/include/pika_hash.h @@ -30,6 +30,9 @@ class HDelCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HDelCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -56,6 +59,9 @@ class HGetCmd : public Cmd { void Merge() override {}; bool IsTooLargeKey(const int &max_sz) override { return key_.size() > static_cast(max_sz); } Cmd* Clone() override { return new HGetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, field_; @@ -79,6 +85,9 @@ class HGetallCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HGetallCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -101,6 +110,9 @@ class HSetCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HSetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, field_, value_; @@ -124,6 +136,9 @@ class HExistsCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HExistsCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, field_; @@ -146,6 +161,9 @@ class HIncrbyCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HIncrbyCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, field_; @@ -169,6 +187,9 @@ class HIncrbyfloatCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HIncrbyfloatCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, field_, by_; @@ -192,6 +213,9 @@ class HKeysCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HKeysCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -215,6 +239,9 @@ class HLenCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HLenCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -238,6 +265,9 @@ class HMgetCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HMgetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -261,6 +291,9 @@ class HMsetCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HMsetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -284,6 +317,9 @@ class HSetnxCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HSetnxCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, field_, value_; @@ -307,6 +343,9 @@ class HStrlenCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HStrlenCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, field_; @@ -330,6 +369,9 @@ class HValsCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HValsCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, field_; @@ -350,6 +392,9 @@ class HScanCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HScanCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -376,6 +421,9 @@ class HScanxCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HScanxCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -402,6 +450,9 @@ class PKHScanRangeCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKHScanRangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -429,6 +480,9 @@ class PKHRScanRangeCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKHRScanRangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; diff --git a/include/pika_hyperloglog.h b/include/pika_hyperloglog.h index 77c374642f..c0fbab68b4 100644 --- a/include/pika_hyperloglog.h +++ b/include/pika_hyperloglog.h @@ -23,6 +23,9 @@ class PfAddCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PfAddCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -38,6 +41,9 @@ class PfCountCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PfCountCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::vector keys_; @@ -61,6 +67,9 @@ class PfMergeCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PfMergeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: diff --git a/include/pika_kv.h b/include/pika_kv.h index 7da694705b..9ad25ce20f 100644 --- a/include/pika_kv.h +++ b/include/pika_kv.h @@ -31,6 +31,9 @@ class SetCmd : public Cmd { void Merge() override{}; bool IsTooLargeKey(const int& max_sz) override { return key_.size() > static_cast(max_sz); } Cmd* Clone() override { return new SetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -67,6 +70,9 @@ class GetCmd : public Cmd { void Merge() override{}; bool IsTooLargeKey(const int &max_sz) override { return key_.size() > static_cast(max_sz); } Cmd* Clone() override { return new GetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -87,6 +93,9 @@ class DelCmd : public Cmd { void Split(const HintKeys& hint_keys) override; void Merge() override; Cmd* Clone() override { return new DelCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: @@ -111,6 +120,9 @@ class IncrCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new IncrCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -136,6 +148,9 @@ class IncrbyCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new IncrbyCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -161,6 +176,9 @@ class IncrbyfloatCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new IncrbyfloatCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, value_, new_value_; @@ -186,6 +204,9 @@ class DecrCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new DecrCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -209,6 +230,9 @@ class DecrbyCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new DecrbyCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -232,6 +256,9 @@ class GetsetCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new GetsetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -255,6 +282,9 @@ class AppendCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new AppendCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -278,6 +308,9 @@ class MgetCmd : public Cmd { void Split(const HintKeys& hint_keys) override; void Merge() override; Cmd* Clone() override { return new MgetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -303,6 +336,9 @@ class KeysCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new KeysCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string pattern_; @@ -325,6 +361,9 @@ class SetnxCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SetnxCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -350,6 +389,9 @@ class SetexCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SetexCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -375,6 +417,9 @@ class PsetexCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PsetexCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -398,6 +443,9 @@ class DelvxCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new DelvxCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -430,6 +478,9 @@ class MsetCmd : public Cmd { void Split(const HintKeys& hint_keys) override; void Merge() override; Cmd* Clone() override { return new MsetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: @@ -461,6 +512,9 @@ class MsetnxCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new MsetnxCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: @@ -487,6 +541,9 @@ class GetrangeCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new GetrangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -513,6 +570,9 @@ class SetrangeCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SetrangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -538,6 +598,9 @@ class StrlenCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new StrlenCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -558,6 +621,9 @@ class ExistsCmd : public Cmd { void Split(const HintKeys& hint_keys) override; void Merge() override; Cmd* Clone() override { return new ExistsCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::vector keys_; @@ -580,6 +646,9 @@ class ExpireCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ExpireCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -604,6 +673,9 @@ class PexpireCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PexpireCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -628,6 +700,9 @@ class ExpireatCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ExpireatCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -651,6 +726,9 @@ class PexpireatCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PexpireatCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -674,6 +752,9 @@ class TtlCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new TtlCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -696,6 +777,9 @@ class PttlCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PttlCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -718,6 +802,9 @@ class PersistCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PersistCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -740,6 +827,9 @@ class TypeCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new TypeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -755,6 +845,9 @@ class ScanCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ScanCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: int64_t cursor_ = 0; @@ -778,6 +871,9 @@ class ScanxCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ScanxCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: storage::DataType type_; @@ -807,6 +903,9 @@ class PKSetexAtCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKSetexAtCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -830,6 +929,9 @@ class PKScanRangeCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKScanRangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: storage::DataType type_; @@ -860,6 +962,9 @@ class PKRScanRangeCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKRScanRangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: storage::DataType type_ = storage::DataType::kAll; diff --git a/include/pika_list.h b/include/pika_list.h index 1591e76c32..8055a40f56 100644 --- a/include/pika_list.h +++ b/include/pika_list.h @@ -29,6 +29,9 @@ class LIndexCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LIndexCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -53,6 +56,9 @@ class LInsertCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LInsertCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -79,6 +85,9 @@ class LLenCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LLenCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -119,6 +128,9 @@ class BLPopCmd final : public BlockingBaseCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new BLPopCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoInitial() override; void DoBinlog() override; @@ -145,6 +157,9 @@ class LPopCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LPopCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -167,6 +182,9 @@ class LPushCmd : public BlockingBaseCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LPushCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -191,6 +209,9 @@ class LPushxCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LPushxCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -215,6 +236,9 @@ class LRangeCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LRangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -239,6 +263,9 @@ class LRemCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LRemCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -263,6 +290,9 @@ class LSetCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LSetCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -287,6 +317,9 @@ class LTrimCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new LTrimCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -305,6 +338,9 @@ class BRPopCmd final : public BlockingBaseCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new BRPopCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoInitial() override; void DoBinlog() override; @@ -330,6 +366,9 @@ class RPopCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new RPopCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -365,6 +404,9 @@ class RPopLPushCmd : public BlockingBaseCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new RPopLPushCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: @@ -394,6 +436,9 @@ class RPushCmd : public BlockingBaseCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new RPushCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -418,6 +463,9 @@ class RPushxCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new RPushxCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; diff --git a/include/pika_pubsub.h b/include/pika_pubsub.h index f9f7d85a30..e78ea006ae 100644 --- a/include/pika_pubsub.h +++ b/include/pika_pubsub.h @@ -20,6 +20,9 @@ class PublishCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PublishCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } std::vector current_key() const override { return {channel_}; } private: @@ -36,6 +39,9 @@ class SubscribeCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SubscribeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } std::vector current_key() const override { return channels_; } private: @@ -51,6 +57,9 @@ class UnSubscribeCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new UnSubscribeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } std::vector current_key() const override { return channels_; } private: @@ -66,6 +75,9 @@ class PUnSubscribeCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PUnSubscribeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } std::vector current_key() const override { return {channels_}; } private: @@ -81,6 +93,9 @@ class PSubscribeCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PSubscribeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } std::vector current_key() const override { return {channels_}; } std::vector channels_; @@ -96,6 +111,9 @@ class PubSubCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new PubSubCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string subcommand_; diff --git a/include/pika_rm.h b/include/pika_rm.h index ec80c1ff58..03fc3d297d 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -68,7 +68,7 @@ class SyncMasterDB : public SyncDB { // consensus use pstd::Status ConsensusUpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end); - pstd::Status ConsensusProposeLog(const std::shared_ptr& cmd_ptr); + pstd::Status ConsensusProposeLog(Cmd* cmd_ptr); pstd::Status ConsensusProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute); LogOffset ConsensusCommittedIndex(); LogOffset ConsensusLastIndex(); diff --git a/include/pika_set.h b/include/pika_set.h index c4b8eb2031..be5003fb1e 100644 --- a/include/pika_set.h +++ b/include/pika_set.h @@ -28,6 +28,9 @@ class SAddCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SAddCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -51,6 +54,9 @@ class SRemCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SRemCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -83,6 +89,9 @@ class SPopCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SPopCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: @@ -113,6 +122,9 @@ class SCardCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SCardCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -136,6 +148,9 @@ class SMembersCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SMembersCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -156,6 +171,9 @@ class SScanCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SScanCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, pattern_ = "*"; @@ -176,6 +194,9 @@ class SUnionCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SUnionCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::vector keys_; @@ -217,6 +238,9 @@ class SUnionstoreCmd : public SetOperationCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SUnionstoreCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -231,6 +255,9 @@ class SInterCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SInterCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::vector keys_; @@ -246,6 +273,9 @@ class SInterstoreCmd : public SetOperationCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SInterstoreCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -268,6 +298,9 @@ class SIsmemberCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SIsmemberCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -284,6 +317,9 @@ class SDiffCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SDiffCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::vector keys_; @@ -299,6 +335,9 @@ class SDiffstoreCmd : public SetOperationCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SDiffstoreCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: rocksdb::Status s_; @@ -328,6 +367,9 @@ class SMoveCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SMoveCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: @@ -355,6 +397,9 @@ class SRandmemberCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new SRandmemberCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; diff --git a/include/pika_slot_command.h b/include/pika_slot_command.h index 53937d6172..4b87236067 100644 --- a/include/pika_slot_command.h +++ b/include/pika_slot_command.h @@ -68,6 +68,9 @@ class SlotsMgrtTagSlotCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtTagSlotCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string dest_ip_; @@ -85,6 +88,9 @@ class SlotsMgrtTagSlotAsyncCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtTagSlotAsyncCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string dest_ip_; @@ -104,6 +110,9 @@ class SlotsMgrtTagOneCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtTagOneCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string dest_ip_; @@ -123,6 +132,9 @@ class SlotsMgrtAsyncStatusCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtAsyncStatusCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -135,6 +147,9 @@ class SlotsInfoCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsInfoCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -150,6 +165,9 @@ class SlotsMgrtAsyncCancelCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtAsyncCancelCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -162,6 +180,9 @@ class SlotsDelCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsDelCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::vector slots_; @@ -175,6 +196,9 @@ class SlotsHashKeyCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsHashKeyCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::vector keys_; @@ -188,6 +212,9 @@ class SlotsScanCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsScanCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -213,6 +240,9 @@ class SlotsMgrtExecWrapperCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsMgrtExecWrapperCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -228,6 +258,9 @@ class SlotsReloadCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsReloadCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -240,6 +273,9 @@ class SlotsReloadOffCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsReloadOffCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -252,6 +288,9 @@ class SlotsCleanupCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsCleanupCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } std::vector cleanup_slots_; private: @@ -265,6 +304,9 @@ class SlotsCleanupOffCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new SlotsCleanupOffCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; diff --git a/include/pika_stream.h b/include/pika_stream.h index bf61a96c6b..1c15e3b1b7 100644 --- a/include/pika_stream.h +++ b/include/pika_stream.h @@ -35,6 +35,9 @@ class XAddCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new XAddCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -53,6 +56,9 @@ class XDelCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new XDelCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -70,6 +76,9 @@ class XReadCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new XReadCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: storage::StreamReadGroupReadArgs args_; @@ -89,6 +98,9 @@ class XRangeCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new XRangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } protected: std::string key_; @@ -104,6 +116,9 @@ class XRevrangeCmd : public XRangeCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new XRevrangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } }; class XLenCmd : public Cmd { @@ -114,6 +129,9 @@ class XLenCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new XLenCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -129,6 +147,9 @@ class XTrimCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new XTrimCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -145,6 +166,9 @@ class XInfoCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new XInfoCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; diff --git a/include/pika_transaction.h b/include/pika_transaction.h index f772ef4e90..a6d4905066 100644 --- a/include/pika_transaction.h +++ b/include/pika_transaction.h @@ -18,6 +18,9 @@ class MultiCmd : public Cmd { : Cmd(name, arity, flag, static_cast(AclCategory::TRANSACTION)) {} void Do() override; Cmd* Clone() override { return new MultiCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void Split(const HintKeys& hint_keys) override {} void Merge() override {} @@ -31,6 +34,9 @@ class ExecCmd : public Cmd { : Cmd(name, arity, flag, static_cast(AclCategory::TRANSACTION)) {} void Do() override; Cmd* Clone() override { return new ExecCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void Split(const HintKeys& hint_keys) override {} void Merge() override {} std::vector current_key() const override { return {}; } @@ -65,6 +71,9 @@ class DiscardCmd : public Cmd { : Cmd(name, arity, flag, static_cast(AclCategory::TRANSACTION)) {} void Do() override; Cmd* Clone() override { return new DiscardCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void Split(const HintKeys& hint_keys) override {} void Merge() override {} @@ -80,6 +89,9 @@ class WatchCmd : public Cmd { void Do() override; void Split(const HintKeys& hint_keys) override {} Cmd* Clone() override { return new WatchCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void Merge() override {} std::vector current_key() const override { return keys_; } void Execute() override; @@ -97,6 +109,9 @@ class UnwatchCmd : public Cmd { void Do() override; Cmd* Clone() override { return new UnwatchCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void Split(const HintKeys& hint_keys) override {} void Merge() override {} diff --git a/include/pika_zset.h b/include/pika_zset.h index b4e5726233..1aefdd1d42 100644 --- a/include/pika_zset.h +++ b/include/pika_zset.h @@ -29,6 +29,9 @@ class ZAddCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZAddCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -53,6 +56,9 @@ class ZCardCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZCardCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -72,6 +78,9 @@ class ZScanCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ZScanCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, pattern_ = "*"; @@ -98,6 +107,9 @@ class ZIncrbyCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZIncrbyCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } double Score() { return score_; } private: @@ -136,6 +148,9 @@ class ZRangeCmd : public ZsetRangeParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: rocksdb::Status s_; @@ -157,6 +172,9 @@ class ZRevrangeCmd : public ZsetRangeParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRevrangeCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: rocksdb::Status s_; @@ -205,6 +223,9 @@ class ZRangebyscoreCmd : public ZsetRangebyscoreParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRangebyscoreCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: rocksdb::Status s_; @@ -227,6 +248,9 @@ class ZRevrangebyscoreCmd : public ZsetRangebyscoreParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRevrangebyscoreCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: rocksdb::Status s_; @@ -249,6 +273,9 @@ class ZCountCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZCountCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } double MinScore() { return min_score_; } double MaxScore() { return max_score_; } bool LeftClose() { return left_close_; } @@ -282,6 +309,9 @@ class ZRemCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRemCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_; @@ -330,6 +360,9 @@ class ZUnionstoreCmd : public ZsetUIstoreParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZUnionstoreCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -348,6 +381,9 @@ class ZInterstoreCmd : public ZsetUIstoreParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZInterstoreCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } void DoBinlog() override; private: @@ -382,6 +418,9 @@ class ZRankCmd : public ZsetRankParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRankCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: rocksdb::Status s_; @@ -403,6 +442,9 @@ class ZRevrankCmd : public ZsetRankParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRevrankCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: rocksdb::Status s_; @@ -424,6 +466,9 @@ class ZScoreCmd : public ZsetRankParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZScoreCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, member_; @@ -464,6 +509,9 @@ class ZRangebylexCmd : public ZsetRangebylexParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRangebylexCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: rocksdb::Status s_; @@ -485,6 +533,9 @@ class ZRevrangebylexCmd : public ZsetRangebylexParentCmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRevrangebylexCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -507,6 +558,9 @@ class ZLexcountCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZLexcountCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, min_member_, max_member_; @@ -532,6 +586,9 @@ class ZRemrangebyrankCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRemrangebyrankCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, min_, max_; @@ -556,6 +613,9 @@ class ZRemrangebyscoreCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRemrangebyscoreCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, min_, max_; @@ -581,6 +641,9 @@ class ZRemrangebylexCmd : public Cmd { void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new ZRemrangebylexCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: std::string key_, min_, max_; @@ -606,6 +669,9 @@ class ZPopmaxCmd : public Cmd { void DoThroughDB() override; void DoUpdateCache() override; Cmd* Clone() override { return new ZPopmaxCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; @@ -628,6 +694,9 @@ class ZPopminCmd : public Cmd { void DoThroughDB() override; void DoUpdateCache() override; Cmd* Clone() override { return new ZPopminCmd(*this); } + Cmd* Clone(net::MemoryPool* pool) override { + return pool->Allocate::type>(*this); + } private: void DoInitial() override; diff --git a/src/acl.cc b/src/acl.cc index 04226d811c..6c44d31057 100644 --- a/src/acl.cc +++ b/src/acl.cc @@ -257,7 +257,7 @@ void User::GetUserDescribe(CmdRes* res) { } } -AclDeniedCmd User::CheckUserPermission(std::shared_ptr& cmd, const PikaCmdArgsType& argv, int8_t& subCmdIndex, +AclDeniedCmd User::CheckUserPermission(Cmd* cmd, const PikaCmdArgsType& argv, int8_t& subCmdIndex, std::string* errKey) { std::shared_lock l(mutex_); @@ -1222,8 +1222,8 @@ void AclSelector::ACLDescribeSelector(std::vector& vector) { } } -AclDeniedCmd AclSelector::CheckCanExecCmd(std::shared_ptr& cmd, int8_t subCmdIndex, - const std::vector& keys, std::string* errKey) { +AclDeniedCmd AclSelector::CheckCanExecCmd(Cmd* cmd, int8_t subCmdIndex, const std::vector& keys, + std::string* errKey) { if (!HasFlags(static_cast(AclSelectorFlag::ALL_COMMANDS)) && !(cmd->flag() & kCmdFlagsNoAuth)) { if (subCmdIndex < 0) { if (!allowedCommands_.test(cmd->GetCmdId())) { diff --git a/src/net/include/net_conn.h b/src/net/include/net_conn.h index fab23f71b2..e032a5ac46 100644 --- a/src/net/include/net_conn.h +++ b/src/net/include/net_conn.h @@ -17,9 +17,10 @@ #include "net/include/net_define.h" #include "net/include/server_thread.h" +#include "net/src/memoey_pool.h" #include "net/src/net_multiplexer.h" -#include "pstd/include/testutil.h" #include "pstd/include/noncopyable.h" +#include "pstd/include/testutil.h" namespace net { @@ -89,6 +90,8 @@ class NetConn : public std::enable_shared_from_this, public pstd::nonco return ss.str(); } + virtual void SetMemoryPool (MemoryPool* memory_pool){} + #ifdef __ENABLE_SSL SSL* ssl() { return ssl_; } diff --git a/src/net/include/server_thread.h b/src/net/include/server_thread.h index b8defbf2a6..334f274354 100644 --- a/src/net/include/server_thread.h +++ b/src/net/include/server_thread.h @@ -236,7 +236,7 @@ extern ServerThread* NewDispatchThread(const std::string& ip, int port, int work const ServerHandle* handle = nullptr); extern ServerThread* NewDispatchThread(const std::set& ips, int port, int work_num, ConnFactory* conn_factory, int cron_interval = 0, int queue_limit = 1000, - const ServerHandle* handle = nullptr); + const ServerHandle* handle = nullptr, size_t memory_pool_page_size = 0); } // namespace net #endif // NET_INCLUDE_SERVER_THREAD_H_ diff --git a/src/net/src/dispatch_thread.cc b/src/net/src/dispatch_thread.cc index 6fbe97373e..ff8d3d254c 100644 --- a/src/net/src/dispatch_thread.cc +++ b/src/net/src/dispatch_thread.cc @@ -35,13 +35,14 @@ DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, Co } DispatchThread::DispatchThread(const std::set& ips, int port, int work_num, ConnFactory* conn_factory, - int cron_interval, int queue_limit, const ServerHandle* handle) + int cron_interval, int queue_limit, const ServerHandle* handle, + size_t memory_pool_page_size) : ServerThread::ServerThread(ips, port, cron_interval, handle), last_thread_(0), work_num_(work_num), queue_limit_(queue_limit) { for (int i = 0; i < work_num_; i++) { - worker_thread_.emplace_back(std::make_unique(conn_factory, this, queue_limit, cron_interval)); + worker_thread_.emplace_back(std::make_unique(conn_factory, this, queue_limit, cron_interval,memory_pool_page_size)); } } @@ -342,8 +343,9 @@ extern ServerThread* NewDispatchThread(const std::string& ip, int port, int work } extern ServerThread* NewDispatchThread(const std::set& ips, int port, int work_num, ConnFactory* conn_factory, int cron_interval, int queue_limit, - const ServerHandle* handle) { - return new DispatchThread(ips, port, work_num, conn_factory, cron_interval, queue_limit, handle); + const ServerHandle* handle, size_t memory_pool_page_size) { + return new DispatchThread(ips, port, work_num, conn_factory, cron_interval, queue_limit, handle, + memory_pool_page_size); } }; // namespace net diff --git a/src/net/src/dispatch_thread.h b/src/net/src/dispatch_thread.h index 6d6543d3a9..c43276b248 100644 --- a/src/net/src/dispatch_thread.h +++ b/src/net/src/dispatch_thread.h @@ -63,7 +63,7 @@ class DispatchThread : public ServerThread { DispatchThread(const std::string& ip, int port, int work_num, ConnFactory* conn_factory, int cron_interval, int queue_limit, const ServerHandle* handle); DispatchThread(const std::set& ips, int port, int work_num, ConnFactory* conn_factory, int cron_interval, - int queue_limit, const ServerHandle* handle); + int queue_limit, const ServerHandle* handle, size_t memory_pool_page_size); ~DispatchThread() override; diff --git a/src/net/src/memoey_pool.h b/src/net/src/memoey_pool.h new file mode 100644 index 0000000000..19fc94d4a2 --- /dev/null +++ b/src/net/src/memoey_pool.h @@ -0,0 +1,118 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_SRC_NET_SRC_MEMOEY_POOL_H +#define PIKA_SRC_NET_SRC_MEMOEY_POOL_H + +#include +#include +#include +#include + +/** + * page size = pageSize_ + pageOffset_ + * page layout + * |-----------|------------------| + * | page head | Available memory | + * | ----------|------------------| + * | 8 bit | pageSize_ bit | + * |-----------|------------------| + * The page head is used to distinguish the current position in the page table. + * If the page is extra, the page head is 0xFF + */ +namespace net { + +class MemoryPool { + public: + explicit MemoryPool() : MemoryPool(512) {} + explicit MemoryPool(int64_t pageSize) : pageSize_(pageSize) { + for (int i = 0; i < 64; i++) { + pages_[i] = nullptr; + } + } + + MemoryPool(const MemoryPool &) = delete; + MemoryPool &operator=(const MemoryPool &) = delete; + MemoryPool(MemoryPool &&) = delete; + MemoryPool &operator=(MemoryPool &&) = delete; + + ~MemoryPool() { + for (int i = 0; i < 64; i++) { + if (pages_[i] != nullptr) { + auto ptr = reinterpret_cast(pages_[i]); + std::free(--ptr); + } + } + }; + + template + T *Allocate(Args &&...args) { + if (sizeof(T) > pageSize_) { + return AllocateExtend(std::forward(args)...); + } + // If you are in a particularly high multithreaded race scenario + // you can put bits_.load() into the for loop + auto bit = bits_.load(); + for (int i = 0; i < 64;) { + uint64_t _bit = 1ull << i; + if (!(bit & _bit)) { // If the i-th bit is 0, it means that the i-th page is not used + if (bits_.compare_exchange_strong(bit, bit | _bit)) { // Set the i-th bit to 1 + if (!pages_[i]) { // If the i-th page is not allocated, allocate the i-th page + pages_[i] = std::malloc(pageSize_ + pageOffset_); + auto page = reinterpret_cast(pages_[i]); + *page = uint8_t(i); // Set the page head to i + pages_[i] = ++page; + } + return new (pages_[i]) T(std::forward(args)...); + } else { + bit = bits_.load(); + if (!(bit & _bit)) { + continue; // If the i-th bit is 0, it means that the i-th page is not used + } + } + } + ++i; + } + + // If we reach here, it means that all pages are full + // We need to allocate a new page + return AllocateExtend(std::forward(args)...); + } + + template + void Deallocate(T *ptr) { + // Get the page head + auto page = reinterpret_cast(ptr) - 1; + ptr->~T(); + + if (*page == extendFlag_) { // If the page head is 0xFF, it means that the page is extra + std::free(page); + return; + } + + auto index = *page; + bits_ &= ~(1ull << index); + } + + private: + template + T *AllocateExtend(Args &&...args) { + auto newPage = std::malloc(sizeof(T) + pageOffset_); + auto page = reinterpret_cast(newPage); + *page = extendFlag_; // Set the page head to 0xFF + return new (++page) T(std::forward(args)...); + } + + private: + const int64_t pageSize_; // The size of each page + const int8_t pageOffset_ = sizeof(uint8_t); // The size of the page head + const uint8_t extendFlag_ = 0xFF; + std::atomic bits_ = 0; // The bits_ is used to record the status of each page + std::array pages_{}; // The pages_ is used to store the address of each page +}; + +} // namespace net + +#endif // PIKA_SRC_NET_SRC_MEMOEY_POOL_H diff --git a/src/net/src/worker_thread.cc b/src/net/src/worker_thread.cc index c4735f46b4..15a1b7a5dc 100644 --- a/src/net/src/worker_thread.cc +++ b/src/net/src/worker_thread.cc @@ -16,12 +16,13 @@ namespace net { -WorkerThread::WorkerThread(ConnFactory* conn_factory, ServerThread* server_thread, int queue_limit, int cron_interval) - : - server_thread_(server_thread), +WorkerThread::WorkerThread(ConnFactory* conn_factory, ServerThread* server_thread, size_t memory_pool_page_size, + int queue_limit, int cron_interval) + : server_thread_(server_thread), conn_factory_(conn_factory), cron_interval_(cron_interval), - keepalive_timeout_(kDefaultKeepAliveTime) { + keepalive_timeout_(kDefaultKeepAliveTime), + memory_pool_(memory_pool_page_size) { /* * install the protobuf handler here */ @@ -124,6 +125,8 @@ void* WorkerThread::ThreadMain() { continue; } + tc->SetMemoryPool(&memory_pool_); + #ifdef __ENABLE_SSL // Create SSL failed if (server_thread_->security() && !tc->CreateSSL(server_thread_->ssl_ctx())) { diff --git a/src/net/src/worker_thread.h b/src/net/src/worker_thread.h index 47bab0091a..0f54ad4308 100644 --- a/src/net/src/worker_thread.h +++ b/src/net/src/worker_thread.h @@ -18,6 +18,7 @@ #include "net/include/net_define.h" #include "net/include/net_thread.h" #include "net/include/server_thread.h" +#include "net/src/memoey_pool.h" #include "net/src/net_multiplexer.h" #include "net/src/dispatch_thread.h" namespace net { @@ -29,7 +30,8 @@ class ConnFactory; class WorkerThread : public Thread { public: - explicit WorkerThread(ConnFactory* conn_factory, ServerThread* server_thread, int queue_limit, int cron_interval = 0); + explicit WorkerThread(ConnFactory* conn_factory, ServerThread* server_thread, size_t memory_pool_page_size, + int queue_limit, int cron_interval = 0); ~WorkerThread() override; @@ -65,6 +67,8 @@ class WorkerThread : public Thread { ConnFactory* conn_factory_ = nullptr; int cron_interval_ = 0; + MemoryPool memory_pool_; + /* * The epoll handler */ diff --git a/src/pika_acl.cc b/src/pika_acl.cc index b6fe3375b7..566a7db9fb 100644 --- a/src/pika_acl.cc +++ b/src/pika_acl.cc @@ -143,7 +143,7 @@ void PikaAclCmd::DryRun() { } int8_t subCmdIndex = -1; - AclDeniedCmd checkRes = user->CheckUserPermission(cmd, args, subCmdIndex, nullptr); + AclDeniedCmd checkRes = user->CheckUserPermission(cmd.get(), args, subCmdIndex, nullptr); switch (checkRes) { case AclDeniedCmd::OK: diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 583e0dfa68..0470de297a 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -588,7 +588,7 @@ void FlushallCmd::DoBinlogByDB(const std::shared_ptr& sync_db) { return; } - Status s = sync_db->ConsensusProposeLog(shared_from_this()); + Status s = sync_db->ConsensusProposeLog(this); if (!s.ok()) { LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device " << s.ToString(); diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 768cb6d5ad..64982a706e 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -6,6 +6,7 @@ #include #include #include +#include #include #include "include/pika_admin.h" @@ -34,11 +35,12 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread* time_stat_.reset(new TimeStat()); } -std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt, - const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc) { +std::variant, Cmd*> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt, + const std::shared_ptr& resp_ptr, + bool cache_miss_in_rtc) { // Get command info - std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(opt); - if (!c_ptr) { + auto cmd = g_pika_cmd_table_manager->GetRawCmd(opt); + if (!cmd) { std::shared_ptr tmp_ptr = std::make_shared(DummyCmd()); tmp_ptr->res().SetRes(CmdRes::kErrOther, "unknown command \"" + opt + "\""); if (IsInTxn()) { @@ -46,6 +48,7 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st } return tmp_ptr; } + auto c_ptr = cmd->Clone(memory_pool_); c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc); c_ptr->SetConn(shared_from_this()); c_ptr->SetResp(resp_ptr); @@ -115,14 +118,16 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st } if (IsInTxn() && opt != kCmdNameExec && opt != kCmdNameWatch && opt != kCmdNameDiscard && opt != kCmdNameMulti) { - if (c_ptr->is_write() && g_pika_server->readonly(current_db_)) { + auto tmp_cmd = std::shared_ptr(c_ptr->Clone()); + memory_pool_->Deallocate(c_ptr); + if (tmp_cmd->is_write() && g_pika_server->readonly(current_db_)) { SetTxnInitFailState(true); - c_ptr->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica."); - return c_ptr; + tmp_cmd->res().SetRes(CmdRes::kErrOther, "READONLY You can't write against a read only replica."); + return tmp_cmd; } - PushCmdToQue(c_ptr); - c_ptr->res().SetRes(CmdRes::kTxnQueued); - return c_ptr; + PushCmdToQue(tmp_cmd); + tmp_cmd->res().SetRes(CmdRes::kTxnQueued); + return tmp_cmd; } bool is_monitoring = g_pika_server->HasMonitorClients(); @@ -189,7 +194,7 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st if (c_ptr->res().ok() && c_ptr->is_write() && name() != kCmdNameExec) { if (c_ptr->name() == kCmdNameFlushdb) { - auto flushdb = std::dynamic_pointer_cast(c_ptr); + auto flushdb = dynamic_cast(c_ptr); SetTxnFailedIfKeyExists(flushdb->GetFlushDBname()); } else if (c_ptr->name() == kCmdNameFlushall) { SetTxnFailedIfKeyExists(); @@ -337,13 +342,15 @@ void PikaClientConn::BatchExecRedisCmd(const std::vector& bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt) { resp_num.store(1); - std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(opt); - if (!c_ptr) { + auto cmd = g_pika_cmd_table_manager->GetRawCmd(opt); + if (!cmd) { return false; } + auto c_ptr = cmd->Clone(memory_pool_); // Check authed if (AuthRequired()) { // the user is not authed, need to do auth if (!(c_ptr->flag() & kCmdFlagsNoAuth)) { + memory_pool_->Deallocate(c_ptr); return false; } } @@ -353,6 +360,7 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std // the cmd with large key should be non-exist in cache, except for pre-stored if (c_ptr->IsTooLargeKey(g_pika_conf->max_key_size_in_cache())) { resp_num--; + memory_pool_->Deallocate(c_ptr); return false; } // acl check @@ -363,6 +371,7 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std if (checkRes == AclDeniedCmd::CMD || checkRes == AclDeniedCmd::KEY || checkRes == AclDeniedCmd::CHANNEL || checkRes == AclDeniedCmd::NO_SUB_CMD || checkRes == AclDeniedCmd::NO_AUTH) { // acl check failed + memory_pool_->Deallocate(c_ptr); return false; } // only read command(Get, HGet) will reach here, no need of record lock @@ -376,6 +385,7 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std resp_array.emplace_back(std::make_shared(std::move(c_ptr->res().message()))); TryWriteResp(); } + memory_pool_->Deallocate(c_ptr); return read_status; } @@ -520,8 +530,15 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc); - *resp_ptr = std::move(cmd_ptr->res().message()); + auto cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc); + if (std::holds_alternative>(cmd_ptr)) { + auto cmd = std::get>(cmd_ptr); + *resp_ptr = std::move(cmd->res().message()); + } else if (std::holds_alternative(cmd_ptr)) { + auto cmd = std::get(cmd_ptr); + *resp_ptr = std::move(cmd->res().message()); + memory_pool_->Deallocate(cmd); + } resp_num--; } diff --git a/src/pika_cmd_table_manager.cc b/src/pika_cmd_table_manager.cc index 974fceb0ee..d9dc2f9b28 100644 --- a/src/pika_cmd_table_manager.cc +++ b/src/pika_cmd_table_manager.cc @@ -22,6 +22,10 @@ PikaCmdTableManager::PikaCmdTableManager() { void PikaCmdTableManager::InitCmdTable(void) { ::InitCmdTable(cmds_.get()); for (const auto& cmd : *cmds_) { + auto size = sizeof(*cmd.second); + if (size > maxCmdSize) { + maxCmdSize = size; + } if (cmd.second->flag() & kCmdFlagsWrite) { cmd.second->AddAclCategory(static_cast(AclCategory::WRITE)); } @@ -72,6 +76,8 @@ std::shared_ptr PikaCmdTableManager::GetCmd(const std::string& opt) { return NewCommand(internal_opt); } +Cmd* PikaCmdTableManager::GetRawCmd(const std::string& opt) { return GetCmdFromDB(opt, *cmds_); } + std::shared_ptr PikaCmdTableManager::NewCommand(const std::string& opt) { Cmd* cmd = GetCmdFromDB(opt, *cmds_); if (cmd) { diff --git a/src/pika_command.cc b/src/pika_command.cc index 0046b17109..b186b12f00 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -958,7 +958,7 @@ void Cmd::DoBinlog() { return; } - Status s = sync_db_->ConsensusProposeLog(shared_from_this()); + Status s = sync_db_->ConsensusProposeLog(this); if (!s.ok()) { LOG(WARNING) << sync_db_->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device " << s.ToString(); diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index 89f10e0317..7580dfd2f2 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -315,7 +315,7 @@ Status ConsensusCoordinator::Reset(const LogOffset& offset) { return Status::OK(); } -Status ConsensusCoordinator::ProposeLog(const std::shared_ptr& cmd_ptr) { +Status ConsensusCoordinator::ProposeLog(Cmd* cmd_ptr) { std::vector keys = cmd_ptr->current_key(); // slotkey shouldn't add binlog if (cmd_ptr->name() == kCmdNameSAdd && !keys.empty() && @@ -333,7 +333,7 @@ Status ConsensusCoordinator::ProposeLog(const std::shared_ptr& cmd_ptr) { return Status::OK(); } -Status ConsensusCoordinator::InternalAppendLog(const std::shared_ptr& cmd_ptr) { +Status ConsensusCoordinator::InternalAppendLog(Cmd* cmd_ptr) { return InternalAppendBinlog(cmd_ptr); } @@ -349,7 +349,7 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr& cmd_pt auto opt = cmd_ptr->argv()[0]; if (pstd::StringToLower(opt) != kCmdNameFlushdb) { // apply binlog in sync way - Status s = InternalAppendLog(cmd_ptr); + Status s = InternalAppendLog(cmd_ptr.get()); // apply db in async way InternalApplyFollower(cmd_ptr); } else { @@ -362,7 +362,7 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr& cmd_pt wait_ms = wait_ms < 3000 ? wait_ms : 3000; } // apply flushdb-binlog in sync way - Status s = InternalAppendLog(cmd_ptr); + Status s = InternalAppendLog(cmd_ptr.get()); // applyDB in sync way PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); } @@ -380,7 +380,7 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const return Status::OK(); } -Status ConsensusCoordinator::InternalAppendBinlog(const std::shared_ptr& cmd_ptr) { +Status ConsensusCoordinator::InternalAppendBinlog(Cmd* cmd_ptr) { std::string content = cmd_ptr->ToRedisProtocol(); Status s = stable_logger_->Logger()->Put(content); if (!s.ok()) { diff --git a/src/pika_dispatch_thread.cc b/src/pika_dispatch_thread.cc index bc892e23e4..1958323fa0 100644 --- a/src/pika_dispatch_thread.cc +++ b/src/pika_dispatch_thread.cc @@ -15,9 +15,10 @@ extern PikaServer* g_pika_server; PikaDispatchThread::PikaDispatchThread(std::set& ips, int port, int work_num, int cron_interval, - int queue_limit, int max_conn_rbuf_size) + int queue_limit, int max_conn_rbuf_size, size_t memory_pool_page_size) : conn_factory_(max_conn_rbuf_size), handles_(this) { - thread_rep_ = net::NewDispatchThread(ips, port, work_num, &conn_factory_, cron_interval, queue_limit, &handles_); + thread_rep_ = net::NewDispatchThread(ips, port, work_num, &conn_factory_, cron_interval, queue_limit, &handles_, + memory_pool_page_size); thread_rep_->set_thread_name("Dispatcher"); } diff --git a/src/pika_migrate_thread.cc b/src/pika_migrate_thread.cc index fd221f0b8e..918ea79db8 100644 --- a/src/pika_migrate_thread.cc +++ b/src/pika_migrate_thread.cc @@ -471,7 +471,7 @@ void WriteDelKeyToBinlog(const std::string& key, const std::shared_ptr& db) std::shared_ptr sync_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db->GetDBName())); - pstd::Status s = sync_db->ConsensusProposeLog(cmd_ptr); + pstd::Status s = sync_db->ConsensusProposeLog(cmd_ptr.get()); if (!s.ok()) { LOG(ERROR) << "write delete key to binlog failed, key: " << key; } diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 9df7b82101..be238c12fd 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -380,7 +380,7 @@ bool SyncMasterDB::CheckSessionId(const std::string& ip, int port, const std::st return true; } -Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr& cmd_ptr) { +Status SyncMasterDB::ConsensusProposeLog(Cmd* cmd_ptr) { return coordinator_.ProposeLog(cmd_ptr); } diff --git a/src/pika_server.cc b/src/pika_server.cc index 72b16d82f7..788b58aed5 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -71,7 +71,8 @@ PikaServer::PikaServer() LOG(INFO) << "Worker queue limit is " << worker_queue_limit; for_each(ips.begin(), ips.end(), [](auto& ip) { LOG(WARNING) << ip; }); pika_dispatch_thread_ = std::make_unique(ips, port_, worker_num_, 3000, worker_queue_limit, - g_pika_conf->max_conn_rbuf_size()); + g_pika_conf->max_conn_rbuf_size(), + g_pika_cmd_table_manager->GetMaxCmdSize()); pika_rsync_service_ = std::make_unique(g_pika_conf->db_sync_path(), g_pika_conf->port() + kPortShiftRSync); // TODO: remove pika_rsync_service_,reuse pika_rsync_service_ port From 0aed015776cefac3bf529893fd73d571320a312f Mon Sep 17 00:00:00 2001 From: lqx Date: Tue, 17 Dec 2024 20:37:37 +0800 Subject: [PATCH 2/3] fix file name error --- include/pika_client_conn.h | 2 +- include/pika_command.h | 2 +- src/net/include/net_conn.h | 2 +- src/net/src/{memoey_pool.h => memory_pool.h} | 6 +++--- src/net/src/worker_thread.h | 8 ++++---- 5 files changed, 10 insertions(+), 10 deletions(-) rename src/net/src/{memoey_pool.h => memory_pool.h} (96%) diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 3224d8a253..e5b207a709 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -13,7 +13,7 @@ #include "acl.h" #include "include/pika_command.h" #include "include/pika_define.h" -#include "net/src/memoey_pool.h" +#include "net/src/memory_pool.h" // TODO: stat time costing in write out data to connfd struct TimeStat { diff --git a/include/pika_command.h b/include/pika_command.h index b9720d65cd..254ed8c972 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -19,7 +19,7 @@ #include "pstd/include/pstd_string.h" #include "net/src/dispatch_thread.h" -#include "net/src/memoey_pool.h" +#include "net/src/memory_pool.h" class SyncMasterDB; class SyncSlaveDB; diff --git a/src/net/include/net_conn.h b/src/net/include/net_conn.h index e032a5ac46..042fcbd14c 100644 --- a/src/net/include/net_conn.h +++ b/src/net/include/net_conn.h @@ -17,7 +17,7 @@ #include "net/include/net_define.h" #include "net/include/server_thread.h" -#include "net/src/memoey_pool.h" +#include "net/src/memory_pool.h" #include "net/src/net_multiplexer.h" #include "pstd/include/noncopyable.h" #include "pstd/include/testutil.h" diff --git a/src/net/src/memoey_pool.h b/src/net/src/memory_pool.h similarity index 96% rename from src/net/src/memoey_pool.h rename to src/net/src/memory_pool.h index 19fc94d4a2..229b3776d1 100644 --- a/src/net/src/memoey_pool.h +++ b/src/net/src/memory_pool.h @@ -3,8 +3,8 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#ifndef PIKA_SRC_NET_SRC_MEMOEY_POOL_H -#define PIKA_SRC_NET_SRC_MEMOEY_POOL_H +#ifndef PIKA_SRC_NET_SRC_MEMORY_POOL_H +#define PIKA_SRC_NET_SRC_MEMORY_POOL_H #include #include @@ -115,4 +115,4 @@ class MemoryPool { } // namespace net -#endif // PIKA_SRC_NET_SRC_MEMOEY_POOL_H +#endif // PIKA_SRC_NET_SRC_MEMORY_POOL_H diff --git a/src/net/src/worker_thread.h b/src/net/src/worker_thread.h index 0f54ad4308..ea91d2d118 100644 --- a/src/net/src/worker_thread.h +++ b/src/net/src/worker_thread.h @@ -13,14 +13,14 @@ #include #include -#include "pstd/include/pstd_mutex.h" -#include "pstd/include/xdebug.h" #include "net/include/net_define.h" #include "net/include/net_thread.h" #include "net/include/server_thread.h" -#include "net/src/memoey_pool.h" -#include "net/src/net_multiplexer.h" #include "net/src/dispatch_thread.h" +#include "net/src/memory_pool.h" +#include "net/src/net_multiplexer.h" +#include "pstd/include/pstd_mutex.h" +#include "pstd/include/xdebug.h" namespace net { class NetItem; From 7b864f917ee5ad77a23d77863fb6b7eb2c0b6a19 Mon Sep 17 00:00:00 2001 From: lqx Date: Tue, 17 Dec 2024 21:12:34 +0800 Subject: [PATCH 3/3] fix review --- include/pika_client_conn.h | 4 ++-- src/net/src/memory_pool.h | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index e5b207a709..9a89983376 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -116,7 +116,7 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr time_stat_; - void SetMemoryPool(net::MemoryPool *memory_pool) override{ memory_pool_ = memory_pool; } + void SetMemoryPool(net::MemoryPool* memory_pool) override { memory_pool_ = memory_pool; } private: net::ServerThread* const server_thread_; @@ -131,7 +131,7 @@ class PikaClientConn : public net::RedisConn { bool authenticated_ = false; std::shared_ptr user_; - net::MemoryPool *memory_pool_; + net::MemoryPool *memory_pool_= nullptr; std::variant, Cmd*> DoCmd(const PikaCmdArgsType& argv, const std::string& opt, const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); diff --git a/src/net/src/memory_pool.h b/src/net/src/memory_pool.h index 229b3776d1..499f9570c7 100644 --- a/src/net/src/memory_pool.h +++ b/src/net/src/memory_pool.h @@ -93,13 +93,13 @@ class MemoryPool { } auto index = *page; - bits_ &= ~(1ull << index); + bits_.fetch_and(~(1ull << index)); } private: template T *AllocateExtend(Args &&...args) { - auto newPage = std::malloc(sizeof(T) + pageOffset_); + auto newPage = std::aligned_alloc(alignof(T), sizeof(T) + pageOffset_); auto page = reinterpret_cast(newPage); *page = extendFlag_; // Set the page head to 0xFF return new (++page) T(std::forward(args)...);