diff --git a/kvrocks.conf b/kvrocks.conf index 16a980e5d51..9779f24aba9 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -373,6 +373,11 @@ json-storage-format json # Default: no txn-context-enabled no +# Whether to enable hash field expiration feature. +# NOTE: This option only affects newly hash object +# Default: no +hash-field-expiration no + ################################## TLS ################################### # By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0. diff --git a/src/commands/cmd_hash.cc b/src/commands/cmd_hash.cc index 30d6eb28ab0..aeb8485a7e9 100644 --- a/src/commands/cmd_hash.cc +++ b/src/commands/cmd_hash.cc @@ -23,6 +23,7 @@ #include "error_constants.h" #include "scan_base.h" #include "server/server.h" +#include "time_util.h" #include "types/redis_hash.h" namespace redis { @@ -444,6 +445,238 @@ class CommandHRandField : public Commander { bool no_parameters_ = true; }; +class CommandFieldExpireBase : public Commander { + protected: + Status commonParse(const std::vector &args, int start_idx) { + CommandParser parser(args, start_idx); + std::string_view expire_flag, num_flag; + uint64_t fields_num = 0; + while (parser.Good()) { + if (parser.EatEqICaseFlag("FIELDS", num_flag)) { + fields_num = GET_OR_RET(parser.template TakeInt()); + break; + } else if (parser.EatEqICaseFlag("NX", expire_flag)) { + field_expire_type_ = HashFieldExpireType::NX; + } else if (parser.EatEqICaseFlag("XX", expire_flag)) { + field_expire_type_ = HashFieldExpireType::XX; + } else if (parser.EatEqICaseFlag("GT", expire_flag)) { + field_expire_type_ = HashFieldExpireType::GT; + } else if (parser.EatEqICaseFlag("LT", expire_flag)) { + field_expire_type_ = HashFieldExpireType::LT; + } else { + return parser.InvalidSyntax(); + } + } + + auto remains = parser.Remains(); + auto size = args.size(); + if (remains != fields_num) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + for (size_t i = size - remains; i < size; i++) { + fields_.emplace_back(args_[i]); + } + + return Status::OK(); + } + + Status expireFieldExecute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) { + if (!srv->storage->GetConfig()->hash_field_expiration) { + return {Status::RedisExecErr, "field expiration feature is disabled"}; + } + + std::vector ret; + redis::Hash hash_db(srv->storage, conn->GetNamespace()); + auto s = hash_db.ExpireFields(ctx, args_[1], expire_, fields_, field_expire_type_, &ret); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + *output = redis::MultiLen(ret.size()); + for (const auto &i : ret) { + output->append(redis::Integer(i)); + } + + return Status::OK(); + } + + Status ttlExpireExecute(engine::Context &ctx, Server *srv, Connection *conn, std::vector &ret) { + redis::Hash hash_db(srv->storage, conn->GetNamespace()); + auto s = hash_db.TTLFields(ctx, args_[1], fields_, &ret); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + return Status::OK(); + } + + uint64_t expire_ = 0; + HashFieldExpireType field_expire_type_ = HashFieldExpireType::None; + std::vector fields_; +}; + +class CommandHExpire : public CommandFieldExpireBase { + public: + Status Parse(const std::vector &args) override { + auto parse_result = ParseInt(args[2], 10); + if (!parse_result) return {Status::RedisParseErr, errValueNotInteger}; + + expire_ = *parse_result * 1000 + util::GetTimeStampMS(); + return CommandFieldExpireBase::commonParse(args, 3); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + return expireFieldExecute(ctx, srv, conn, output); + } +}; + +class CommandHExpireAt : public CommandFieldExpireBase { + public: + Status Parse(const std::vector &args) override { + auto parse_result = ParseInt(args[2], 10); + if (!parse_result) return {Status::RedisParseErr, errValueNotInteger}; + + expire_ = *parse_result * 1000; + return CommandFieldExpireBase::commonParse(args, 3); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + return expireFieldExecute(ctx, srv, conn, output); + } +}; + +class CommandHPExpire : public CommandFieldExpireBase { + public: + Status Parse(const std::vector &args) override { + auto parse_result = ParseInt(args[2], 10); + if (!parse_result) return {Status::RedisParseErr, errValueNotInteger}; + + expire_ = *parse_result + util::GetTimeStampMS(); + return CommandFieldExpireBase::commonParse(args, 3); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + return expireFieldExecute(ctx, srv, conn, output); + } +}; + +class CommandHPExpireAt : public CommandFieldExpireBase { + public: + Status Parse(const std::vector &args) override { + auto parse_result = ParseInt(args[2], 10); + if (!parse_result) return {Status::RedisParseErr, errValueNotInteger}; + + expire_ = *parse_result; + return CommandFieldExpireBase::commonParse(args, 3); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + return expireFieldExecute(ctx, srv, conn, output); + } +}; + +class CommandHExpireTime : public CommandFieldExpireBase { + public: + Status Parse(const std::vector &args) override { return CommandFieldExpireBase::commonParse(args, 2); } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + std::vector ret; + auto s = ttlExpireExecute(ctx, srv, conn, ret); + if (!s.IsOK()) { + return {Status::RedisExecErr, s.Msg()}; + } + auto now = util::GetTimeStampMS(); + *output = redis::MultiLen(ret.size()); + for (const auto &ttl : ret) { + if (ttl > 0) { + output->append(redis::Integer((now + ttl) / 1000)); + } else { + output->append(redis::Integer(ttl)); + } + } + return Status::OK(); + } +}; + +class CommandHPExpireTime : public CommandFieldExpireBase { + public: + Status Parse(const std::vector &args) override { return CommandFieldExpireBase::commonParse(args, 2); } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + std::vector ret; + auto s = ttlExpireExecute(ctx, srv, conn, ret); + if (!s.IsOK()) { + return {Status::RedisExecErr, s.Msg()}; + } + auto now = util::GetTimeStampMS(); + *output = redis::MultiLen(ret.size()); + for (const auto &ttl : ret) { + if (ttl > 0) { + output->append(redis::Integer(now + ttl)); + } else { + output->append(redis::Integer(ttl)); + } + } + return Status::OK(); + } +}; + +class CommandHTTL : public CommandFieldExpireBase { + public: + Status Parse(const std::vector &args) override { return CommandFieldExpireBase::commonParse(args, 2); } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + std::vector ret; + auto s = ttlExpireExecute(ctx, srv, conn, ret); + if (!s.IsOK()) { + return {Status::RedisExecErr, s.Msg()}; + } + *output = redis::MultiLen(ret.size()); + for (const auto &ttl : ret) { + output->append(redis::Integer(ttl > 0 ? ttl / 1000 : ttl)); + } + return Status::OK(); + } +}; + +class CommandHPTTL : public CommandFieldExpireBase { + public: + Status Parse(const std::vector &args) override { return CommandFieldExpireBase::commonParse(args, 2); } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + std::vector ret; + auto s = ttlExpireExecute(ctx, srv, conn, ret); + if (!s.IsOK()) { + return {Status::RedisExecErr, s.Msg()}; + } + *output = redis::MultiLen(ret.size()); + for (const auto &ttl : ret) { + output->append(redis::Integer(ttl)); + } + return Status::OK(); + } +}; + +class CommandHPersist : public CommandFieldExpireBase { + public: + Status Parse(const std::vector &args) override { return CommandFieldExpireBase::commonParse(args, 2); } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + std::vector ret; + redis::Hash hash_db(srv->storage, conn->GetNamespace()); + auto s = hash_db.PersistFields(ctx, args_[1], fields_, &ret); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + *output = redis::MultiLen(ret.size()); + for (const auto &i : ret) { + output->append(redis::Integer(i)); + } + return Status::OK(); + } +}; + REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr("hget", 3, "read-only", 1, 1, 1), MakeCmdAttr("hincrby", 4, "write", 1, 1, 1), MakeCmdAttr("hincrbyfloat", 4, "write", 1, 1, 1), @@ -460,6 +693,15 @@ REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr("hget", 3, "read-only", 1 MakeCmdAttr("hgetall", 2, "read-only slow", 1, 1, 1), MakeCmdAttr("hscan", -3, "read-only", 1, 1, 1), MakeCmdAttr("hrangebylex", -4, "read-only", 1, 1, 1), - MakeCmdAttr("hrandfield", -2, "read-only slow", 1, 1, 1), ) + MakeCmdAttr("hrandfield", -2, "read-only slow", 1, 1, 1), + MakeCmdAttr("hexpire", -6, "write", 1, 1, 1), + MakeCmdAttr("hexpireat", -6, "write", 1, 1, 1), + MakeCmdAttr("hexpiretime", -5, "read-only", 1, 1, 1), + MakeCmdAttr("hpexpire", -6, "write", 1, 1, 1), + MakeCmdAttr("hpexpireat", -6, "write", 1, 1, 1), + MakeCmdAttr("hpexpiretime", -5, "read-only", 1, 1, 1), + MakeCmdAttr("hpersist", -5, "write", 1, 1, 1), + MakeCmdAttr("httl", -5, "read-only", 1, 1, 1), + MakeCmdAttr("hpttl", -5, "read-only", 1, 1, 1), ) } // namespace redis diff --git a/src/config/config.cc b/src/config/config.cc index 9d8e2a79272..4d29c536476 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -240,6 +240,7 @@ Config::Config() { new EnumField(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)}, {"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)}, {"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)}, + {"hash-field-expiration", false, new YesNoField(&hash_field_expiration, false)}, /* rocksdb options */ {"rocksdb.compression", false, diff --git a/src/config/config.h b/src/config/config.h index 9fa5d4168fc..bcfd5cee1cd 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -177,6 +177,9 @@ struct Config { bool skip_block_cache_deallocation_on_close = false; + // whether to enable hash field expiration feature + bool hash_field_expiration = false; + struct RocksDB { int block_size; bool cache_index_and_filter_blocks; diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc index b79219ec423..89ef1387f71 100644 --- a/src/storage/compact_filter.cc +++ b/src/storage/compact_filter.cc @@ -28,6 +28,7 @@ #include "db_util.h" #include "time_util.h" #include "types/redis_bitmap.h" +#include "types/redis_hash.h" namespace engine { @@ -132,7 +133,9 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const Slice &key, const Sl return false; } - return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value)); + return IsMetadataExpired(ikey, metadata) || + (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value)) || + (metadata.Type() == kRedisHash && redis::Hash::IsFieldExpired(cached_metadata_, value)); } } // namespace engine diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 38845fef8c5..3e71a1f352e 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -96,7 +96,21 @@ rocksdb::Status Database::GetMetadata(engine::Context &ctx, RedisTypes types, co auto s = GetRawMetadata(ctx, ns_key, raw_value); *rest = *raw_value; if (!s.ok()) return s; - return ParseMetadataWithStats(types, rest, metadata); + + s = ParseMetadataWithStats(types, rest, metadata); + if (!s.ok()) return s; + + // if type is hash, we still need to check if the all of fields expired. + if (metadata->Type() == kRedisHash) { + HashMetadata hash_metadata(false); + s = hash_metadata.Decode(*raw_value); + if (!s.ok()) return s; + redis::Hash hash_db(storage_, namespace_); + if (!hash_db.ExistValidField(ctx, ns_key, hash_metadata)) { + return rocksdb::Status::NotFound("no element found"); + } + } + return rocksdb::Status::OK(); } rocksdb::Status Database::GetRawMetadata(engine::Context &ctx, const Slice &ns_key, std::string *bytes) { @@ -120,6 +134,15 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui if (!metadata.IsEmptyableType() && metadata.size == 0) { return rocksdb::Status::NotFound("no elements"); } + if (metadata.Type() == kRedisHash) { + HashMetadata hash_metadata(false); + s = hash_metadata.Decode(value); + if (!s.ok()) return s; + redis::Hash hash_db(storage_, namespace_); + if (!hash_db.ExistValidField(ctx, ns_key, hash_metadata)) { + return rocksdb::Status::NotFound("no element found"); + } + } if (metadata.expire == timestamp) return rocksdb::Status::OK(); // +1 to skip the flags @@ -198,8 +221,21 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector &k if (metadata.Expired()) continue; s = batch->Delete(metadata_cf_handle_, ns_keys[i]); - if (!s.ok()) return s; + + // if delete a hash object that all of fields expired, + // so this hash object should be treated as empty and should not affect the deleted_cnt. + if (metadata.Type() == kRedisHash) { + HashMetadata hash_metadata(false); + s = hash_metadata.Decode(rocksdb::Slice(pin_values[i].data(), pin_values[i].size())); + if (!s.ok()) continue; + redis::Hash hash_db(storage_, namespace_); + if (hash_db.ExistValidField(ctx, slice_keys[i], hash_metadata)) { + if (!s.ok()) return s; *deleted_cnt += 1; + } + } else { + *deleted_cnt += 1; + } } if (*deleted_cnt == 0) return rocksdb::Status::OK(); @@ -227,6 +263,16 @@ rocksdb::Status Database::TTL(engine::Context &ctx, const Slice &user_key, int64 Metadata metadata(kRedisNone, false); s = metadata.Decode(value); if (!s.ok()) return s; + if (metadata.Type() == kRedisHash) { + HashMetadata hash_metadata(false); + s = hash_metadata.Decode(value); + if (!s.ok()) return s; + redis::Hash hash_db(storage_, namespace_); + if (!hash_db.ExistValidField(ctx, ns_key, hash_metadata)) { + *ttl = -2; + return rocksdb::Status::OK(); + } + } *ttl = metadata.TTL(); return rocksdb::Status::OK(); @@ -282,6 +328,17 @@ rocksdb::Status Database::Keys(engine::Context &ctx, const std::string &prefix, if (stats) stats->n_expired++; continue; } + // if a hash object that all of fields was expired, + // so the key should not be returned. + if (metadata.Type() == kRedisHash) { + HashMetadata hash_metadata(false); + s = hash_metadata.Decode(iter->value()); + if (!s.ok()) continue; + redis::Hash hash_db(storage_, namespace_); + if (!hash_db.ExistValidField(ctx, iter->key(), hash_metadata)) { + continue; + } + } if (stats) { int64_t ttl = metadata.TTL(); stats->n_key++; @@ -362,6 +419,19 @@ rocksdb::Status Database::Scan(engine::Context &ctx, const std::string &cursor, if (type != kRedisNone && type != metadata.Type()) continue; if (metadata.Expired()) continue; + + // if a hash object that all of fields was expired, + // so the key should not be returned. + if (metadata.Type() == kRedisHash) { + HashMetadata hash_metadata(false); + s = hash_metadata.Decode(iter->value()); + if (!s.ok()) continue; + redis::Hash hash_db(storage_, namespace_); + if (!hash_db.ExistValidField(ctx, iter->key(), hash_metadata)) { + continue; + } + } + std::tie(std::ignore, user_key) = ExtractNamespaceKey(iter->key(), storage_->IsSlotIdEncoded()); if (!util::StringMatch(suffix_glob, user_key.substr(prefix.size()))) { @@ -551,9 +621,27 @@ rocksdb::Status SubKeyScanner::Scan(engine::Context &ctx, RedisType type, const uint64_t cnt = 0; std::string ns_key = AppendNamespacePrefix(user_key); Metadata metadata(type, false); - rocksdb::Status s = GetMetadata(ctx, {type}, ns_key, &metadata); + std::string raw_value; + Slice rest; + + rocksdb::Status s = GetMetadata(ctx, {type}, ns_key, &raw_value, &metadata, &rest); if (!s.ok()) return s; + // for hash type, we should filter expired field if encoding is with_ttl + bool is_encoding_field_ttl = false; + if (metadata.Type() == kRedisHash && !rest.empty()) { + HashSubkeyEncoding field_encoding = HashSubkeyEncoding::VALUE_ONLY; + if (!GetFixed8(&rest, reinterpret_cast(&field_encoding))) { + return rocksdb::Status::InvalidArgument(); + } + if (field_encoding > HashSubkeyEncoding::VALUE_WITH_TTL) { + return rocksdb::Status::InvalidArgument("unexpected subkey encoding version"); + } + if (field_encoding == HashSubkeyEncoding::VALUE_WITH_TTL) { + is_encoding_field_ttl = true; + } + } + auto iter = util::UniqueIterator(ctx, ctx.DefaultScanOptions()); std::string match_prefix_key = InternalKey(ns_key, subkey_prefix, metadata.version, storage_->IsSlotIdEncoded()).Encode(); @@ -564,6 +652,7 @@ rocksdb::Status SubKeyScanner::Scan(engine::Context &ctx, RedisType type, const } else { start_key = match_prefix_key; } + auto now = util::GetTimeStampMS(); for (iter->Seek(start_key); iter->Valid(); iter->Next()) { if (!cursor.empty() && iter->key() == start_key) { // if cursor is not empty, then we need to skip start_key @@ -574,9 +663,19 @@ rocksdb::Status SubKeyScanner::Scan(engine::Context &ctx, RedisType type, const break; } InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); + auto value = iter->value().ToString(); + if (is_encoding_field_ttl) { + uint64_t expire = 0; + rocksdb::Slice data(value.data(), value.size()); + GetFixed64(&data, &expire); + if (expire != 0 && expire <= now) { + continue; + } + value = data.ToString(); + } keys->emplace_back(ikey.GetSubKey().ToString()); if (values != nullptr) { - values->emplace_back(iter->value().ToString()); + values->emplace_back(value); } cnt++; if (limit > 0 && cnt >= limit) { @@ -622,7 +721,17 @@ rocksdb::Status Database::existsInternal(engine::Context &ctx, const std::vector Metadata metadata(kRedisNone, false); s = metadata.Decode(value); if (!s.ok()) return s; - if (!metadata.Expired()) *ret += 1; + if (metadata.Expired()) continue; + if (metadata.Type() == kRedisHash) { + HashMetadata hash_metadata(false); + s = hash_metadata.Decode(value); + if (!s.ok()) return s; + redis::Hash hash_db(storage_, namespace_); + if (!hash_db.ExistValidField(ctx, key, hash_metadata)) { + continue; + } + } + *ret += 1; } } return rocksdb::Status::OK(); @@ -636,9 +745,19 @@ rocksdb::Status Database::typeInternal(engine::Context &ctx, const Slice &key, R Metadata metadata(kRedisNone, false); s = metadata.Decode(value); - if (!s.ok()) return s; + if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; if (metadata.Expired()) { *type = kRedisNone; + } else if (metadata.Type() == kRedisHash) { + HashMetadata hash_metadata(false); + s = hash_metadata.Decode(value); + if (!s.ok()) return s; + redis::Hash hash_db(storage_, namespace_); + if (hash_db.ExistValidField(ctx, key, hash_metadata)) { + *type = metadata.Type(); + } else { + *type = kRedisNone; + } } else { *type = metadata.Type(); } diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index 76403faaef3..c2b23e347d8 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -334,6 +334,30 @@ bool Metadata::IsEmptyableType() const { bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); } +bool HashMetadata::IsFieldExpirationEnabled() const { return field_encoding == HashSubkeyEncoding::VALUE_WITH_TTL; } + +void HashMetadata::Encode(std::string *dst) const { + Metadata::Encode(dst); + PutFixed8(dst, uint8_t(field_encoding)); +} + +rocksdb::Status HashMetadata::Decode(Slice *input) { + if (auto s = Metadata::Decode(input); !s.ok()) { + return s; + } + + if (input->size() >= 1) { + if (!GetFixed8(input, reinterpret_cast(&field_encoding))) { + return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + } + if (field_encoding > HashSubkeyEncoding::VALUE_WITH_TTL) { + return rocksdb::Status::InvalidArgument("unexpected subkey encoding version"); + } + } + + return rocksdb::Status::OK(); +} + ListMetadata::ListMetadata(bool generate_version) : Metadata(kRedisList, generate_version), head(UINT64_MAX / 2), tail(head) {} diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 5590609be37..9589d51622d 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -202,9 +202,22 @@ class Metadata { static uint64_t generateVersion(); }; +enum class HashSubkeyEncoding : uint8_t { + VALUE_ONLY = 0, + VALUE_WITH_TTL = 1, +}; + class HashMetadata : public Metadata { public: + HashSubkeyEncoding field_encoding = HashSubkeyEncoding::VALUE_ONLY; + explicit HashMetadata(bool generate_version = true) : Metadata(kRedisHash, generate_version) {} + + void Encode(std::string *dst) const override; + using Metadata::Decode; + rocksdb::Status Decode(Slice *input) override; + + bool IsFieldExpirationEnabled() const; }; class SetMetadata : public Metadata { diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc index 905efdadd37..c4732769f1f 100644 --- a/src/types/redis_hash.cc +++ b/src/types/redis_hash.cc @@ -21,6 +21,7 @@ #include "redis_hash.h" #include +#include #include #include @@ -31,6 +32,7 @@ #include "db_util.h" #include "parse_util.h" #include "sample_helper.h" +#include "time_util.h" namespace redis { @@ -45,7 +47,31 @@ rocksdb::Status Hash::Size(engine::Context &ctx, const Slice &user_key, uint64_t HashMetadata metadata(false); rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s; - *size = metadata.size; + // if field expiration is disabled, + // the size field in metadata is the length of hash + if (!metadata.IsFieldExpirationEnabled()) { + *size = metadata.size; + return rocksdb::Status::OK(); + } + + // otherwise, we have to check each field to calc the length + std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(); + std::string next_version_prefix_key = + InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + + auto iter = util::UniqueIterator(ctx, read_options); + for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) { + uint64_t expire = 0; + auto value = iter->value().ToString(); + if (!decodeExpireFromValue(metadata, &value, expire).ok()) { + continue; + } + *size += 1; + } return rocksdb::Status::OK(); } @@ -56,7 +82,10 @@ rocksdb::Status Hash::Get(engine::Context &ctx, const Slice &user_key, const Sli if (!s.ok()) return s; rocksdb::ReadOptions read_options; std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); - return storage_->Get(ctx, ctx.GetReadOptions(), sub_key, value); + s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, value); + if (!s.ok()) return s; + uint64_t expire = 0; + return decodeExpireFromValue(metadata, value, expire); } rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const Slice &field, int64_t increment, @@ -69,13 +98,17 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const HashMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; + if (s.IsNotFound()) { + metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL; + } + uint64_t expire = 0; std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); if (s.ok()) { std::string value_bytes; s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value_bytes); if (!s.ok() && !s.IsNotFound()) return s; - if (s.ok()) { + if (s.ok() && decodeExpireFromValue(metadata, &value_bytes, expire).ok()) { auto parse_result = ParseInt(value_bytes, 10); if (!parse_result) { return rocksdb::Status::InvalidArgument(parse_result.Msg()); @@ -85,6 +118,9 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const } old_value = *parse_result; exists = true; + } else { + // reset expire time + expire = 0; } } if ((increment < 0 && old_value < 0 && increment < (LLONG_MIN - old_value)) || @@ -97,7 +133,11 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const WriteBatchLogData log_data(kRedisHash); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; - s = batch->Put(sub_key, std::to_string(*new_value)); + auto value_str = std::to_string(*new_value); + if (metadata.IsFieldExpirationEnabled()) { + encodeExpireToValue(&value_str, expire); + } + batch->Put(sub_key, value_str); if (!s.ok()) return s; if (!exists) { metadata.size += 1; @@ -119,19 +159,25 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c HashMetadata metadata; rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; + if (s.IsNotFound()) { + metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL; + } + uint64_t expire = 0; std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); if (s.ok()) { std::string value_bytes; s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value_bytes); if (!s.ok() && !s.IsNotFound()) return s; - if (s.ok()) { + if (s.ok() && decodeExpireFromValue(metadata, &value_bytes, expire).ok()) { auto value_stat = ParseFloat(value_bytes); if (!value_stat || isspace(value_bytes[0])) { return rocksdb::Status::InvalidArgument("value is not a number"); } old_value = *value_stat; exists = true; + } else { + expire = 0; } } double n = old_value + increment; @@ -144,7 +190,11 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c WriteBatchLogData log_data(kRedisHash); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; - s = batch->Put(sub_key, std::to_string(*new_value)); + auto value_str = std::to_string(*new_value); + if (metadata.IsFieldExpirationEnabled()) { + encodeExpireToValue(&value_str, expire); + } + batch->Put(sub_key, value_str); if (!s.ok()) return s; if (!exists) { metadata.size += 1; @@ -186,10 +236,17 @@ rocksdb::Status Hash::MGet(engine::Context &ctx, const Slice &user_key, const st statuses_vector.resize(keys.size()); storage_->MultiGet(ctx, read_options, storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(), values_vector.data(), statuses_vector.data()); + for (size_t i = 0; i < keys.size(); i++) { if (!statuses_vector[i].ok() && !statuses_vector[i].IsNotFound()) return statuses_vector[i]; - values->emplace_back(values_vector[i].ToString()); - statuses->emplace_back(statuses_vector[i]); + auto value = values_vector[i].ToString(); + auto status = statuses_vector[i]; + if (!status.IsNotFound()) { + uint64_t expire = 0; + status = decodeExpireFromValue(metadata, &value, expire); + } + values->emplace_back(value); + statuses->emplace_back(status); } return rocksdb::Status::OK(); } @@ -213,15 +270,16 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; - std::string value; std::unordered_set field_set; for (const auto &field : fields) { if (!field_set.emplace(field.ToStringView()).second) { continue; } std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + std::string value; s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value); - if (s.ok()) { + uint64_t expire = 0; + if (s.ok() && decodeExpireFromValue(metadata, &value, expire).ok()) { *deleted_cnt += 1; s = batch->Delete(sub_key); if (!s.ok()) return s; @@ -247,6 +305,12 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; + // For avoid affect existing data, we only encode ttl of field + // on new hash object when hash_field_expiration option is yes. + if (s.IsNotFound() && storage_->GetConfig()->hash_field_expiration) { + metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL; + } + int added = 0; auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisHash); @@ -261,6 +325,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st bool exists = false; std::string sub_key = InternalKey(ns_key, it->field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + uint64_t expire = 0; if (metadata.size > 0) { std::string field_value; s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &field_value); @@ -268,14 +333,20 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st if (s.ok()) { if (nx || field_value == it->value) continue; - - exists = true; + exists = decodeExpireFromValue(metadata, &field_value, expire).ok(); } } - if (!exists) added++; + if (!exists) { + added++; + expire = 0; + } - s = batch->Put(sub_key, it->value); + auto value = it->value; + if (metadata.IsFieldExpirationEnabled()) { + encodeExpireToValue(&value, expire); + } + s = batch->Put(sub_key, value); if (!s.ok()) return s; } @@ -341,8 +412,13 @@ rocksdb::Status Hash::RangeByLex(engine::Context &ctx, const Slice &user_key, co break; } if (spec.offset >= 0 && pos++ < spec.offset) continue; - - field_values->emplace_back(ikey.GetSubKey().ToString(), iter->value().ToString()); + // filte expired field + auto value = iter->value().ToString(); + uint64_t expire = 0; + if (!decodeExpireFromValue(metadata, &value, expire).ok()) { + continue; + } + field_values->emplace_back(ikey.GetSubKey().ToString(), value); if (spec.count > 0 && field_values->size() >= static_cast(spec.count)) break; } return rocksdb::Status::OK(); @@ -367,14 +443,20 @@ rocksdb::Status Hash::GetAll(engine::Context &ctx, const Slice &user_key, std::v auto iter = util::UniqueIterator(ctx, read_options); for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) { + // filte expired field + uint64_t expire = 0; + auto value = iter->value().ToString(); + if (!decodeExpireFromValue(metadata, &value, expire).ok()) { + continue; + } if (type == HashFetchType::kOnlyKey) { InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); field_values->emplace_back(ikey.GetSubKey().ToString(), ""); } else if (type == HashFetchType::kOnlyValue) { - field_values->emplace_back("", iter->value().ToString()); + field_values->emplace_back("", value); } else { InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); - field_values->emplace_back(ikey.GetSubKey().ToString(), iter->value().ToString()); + field_values->emplace_back(ikey.GetSubKey().ToString(), value); } } return rocksdb::Status::OK(); @@ -426,4 +508,271 @@ rocksdb::Status Hash::RandField(engine::Context &ctx, const Slice &user_key, int return rocksdb::Status::OK(); } +rocksdb::Status Hash::ExpireFields(engine::Context &ctx, const Slice &user_key, uint64_t expire_ms, + const std::vector &fields, HashFieldExpireType type, + std::vector *ret) { + std::string ns_key = AppendNamespacePrefix(user_key); + HashMetadata metadata(false); + rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); + if (!s.ok() && !s.IsNotFound()) return s; + if (s.IsNotFound()) { + ret->resize(fields.size(), -2); + return rocksdb::Status::OK(); + } + + // we don't support encode ttl on existing hash object + if (!metadata.IsFieldExpirationEnabled()) { + return rocksdb::Status::NotSupported( + "can't expire fields on hash object whose field expiration feature is disabled"); + } + + std::vector keys; + keys.reserve(fields.size()); + std::vector sub_keys; + sub_keys.resize(fields.size()); + for (size_t i = 0; i < fields.size(); i++) { + auto &field = fields[i]; + sub_keys[i] = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + keys.emplace_back(sub_keys[i]); + } + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisHash); + batch->PutLogData(log_data.Encode()); + + // expire special field + std::vector values; + values.resize(sub_keys.size()); + std::vector statuses; + statuses.resize(sub_keys.size()); + + rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions(); + storage_->MultiGet(ctx, read_options, storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(), + values.data(), statuses.data()); + + auto now = util::GetTimeStampMS(); + for (size_t i = 0; i < keys.size(); i++) { + if (!statuses[i].ok() && !statuses[i].IsNotFound()) return statuses[i]; + + // no such field exists + if (statuses[i].IsNotFound()) { + ret->emplace_back(-2); + continue; + } + + // expire with a pass time + if (expire_ms <= now) { + batch->Delete(sub_keys[i]); + ret->emplace_back(2); + metadata.size -= 1; + continue; + } + + auto value = values[i].ToString(); + uint64_t field_expire = 0; + decodeExpireFromValue(metadata, &value, field_expire); + + // if a field has no associated expiration, we treated it expiration is infinite + auto treated_expire = field_expire == 0 ? UINT64_MAX : field_expire; + if (type == HashFieldExpireType::None || (type == HashFieldExpireType::NX && field_expire == 0) || + (type == HashFieldExpireType::XX && field_expire != 0) || + (type == HashFieldExpireType::GT && expire_ms > treated_expire) || + (type == HashFieldExpireType::LT && expire_ms < treated_expire)) { + encodeExpireToValue(&value, expire_ms); + batch->Put(sub_keys[i], value); + // 1 if expiration was updated + ret->emplace_back(1); + } else { + // 0 if condition has not been met + ret->emplace_back(0); + } + } + + std::string bytes; + metadata.Encode(&bytes); + batch->Put(metadata_cf_handle_, ns_key, bytes); + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice &user_key, const std::vector &fields, + std::vector *ret) { + std::string ns_key = AppendNamespacePrefix(user_key); + HashMetadata metadata(false); + rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); + if (!s.ok() && !s.IsNotFound()) return s; + if (s.IsNotFound()) { + ret->resize(fields.size(), -2); + return rocksdb::Status::OK(); + } + if (!metadata.IsFieldExpirationEnabled()) { + ret->resize(fields.size(), -1); + return rocksdb::Status::OK(); + } + + std::vector keys; + keys.reserve(fields.size()); + std::vector sub_keys; + sub_keys.resize(fields.size()); + for (size_t i = 0; i < fields.size(); i++) { + auto &field = fields[i]; + sub_keys[i] = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + keys.emplace_back(sub_keys[i]); + } + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisHash); + batch->PutLogData(log_data.Encode()); + + std::vector values; + values.resize(sub_keys.size()); + std::vector statuses; + statuses.resize(sub_keys.size()); + rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions(); + storage_->MultiGet(ctx, read_options, storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(), + values.data(), statuses.data()); + + bool removed = false; + for (size_t i = 0; i < keys.size(); i++) { + if (!statuses[i].ok() && !statuses[i].IsNotFound()) return statuses[i]; + + // no such field exists + if (statuses[i].IsNotFound()) { + ret->emplace_back(-2); + continue; + } + + auto value = values[i].ToString(); + uint64_t field_expire = 0; + decodeExpireFromValue(metadata, &value, field_expire); + if (field_expire == 0) { + // -1 if the field exists but has no associated expiration + ret->emplace_back(-1); + } else { + removed = true; + encodeExpireToValue(&value, 0); + batch->Put(sub_keys[i], value); + // 1 if expiration was removed + ret->emplace_back(1); + } + } + + return removed ? storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()) + : rocksdb::Status::OK(); +} + +rocksdb::Status Hash::TTLFields(engine::Context &ctx, const Slice &user_key, const std::vector &fields, + std::vector *ret) { + std::string ns_key = AppendNamespacePrefix(user_key); + HashMetadata metadata(false); + rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); + if (!s.ok() && !s.IsNotFound()) return s; + if (s.IsNotFound()) { + ret->resize(fields.size(), -2); + return rocksdb::Status::OK(); + } + + std::vector keys; + keys.reserve(fields.size()); + std::vector sub_keys; + sub_keys.resize(fields.size()); + for (size_t i = 0; i < fields.size(); i++) { + auto &field = fields[i]; + sub_keys[i] = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + keys.emplace_back(sub_keys[i]); + } + + std::vector values; + values.resize(sub_keys.size()); + std::vector statuses; + statuses.resize(sub_keys.size()); + rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions(); + storage_->MultiGet(ctx, read_options, storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(), + values.data(), statuses.data()); + + ret->reserve(fields.size()); + auto now = util::GetTimeStampMS(); + for (size_t i = 0; i < keys.size(); i++) { + if (!statuses[i].ok() && !statuses[i].IsNotFound()) return statuses[i]; + auto value = values[i].ToString(); + auto status = statuses[i]; + + if (status.IsNotFound()) { + ret->emplace_back(-2); + continue; + } + + uint64_t expire = 0; + status = decodeExpireFromValue(metadata, &value, expire); + if (status.IsNotFound()) { + ret->emplace_back(-2); + } else if (expire == 0) { + ret->emplace_back(-1); + } else { + ret->emplace_back(int64_t(expire - now)); + } + } + return rocksdb::Status::OK(); +} + +bool Hash::IsFieldExpired(const Slice &metadata_key, const Slice &value) { + HashMetadata hash_metadata(false); + if (!hash_metadata.Decode(metadata_key).ok()) { + return false; + } + if (!hash_metadata.IsFieldExpirationEnabled()) { + return false; + } + uint64_t expire = 0; + rocksdb::Slice data(value); + GetFixed64(&data, &expire); + return expire != 0 && expire < util::GetTimeStampMS(); +} + +bool Hash::ExistValidField(engine::Context &ctx, const Slice &ns_key, const HashMetadata &metadata) { + if (metadata.Expired()) { + return false; + } + if (!metadata.IsFieldExpirationEnabled()) { + return true; + } + + std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(); + std::string next_version_prefix_key = + InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + + auto iter = util::UniqueIterator(ctx, read_options); + for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) { + uint64_t expire = 0; + auto value = iter->value().ToString(); + if (!decodeExpireFromValue(metadata, &value, expire).ok()) { + continue; + } + return true; + } + return false; +} + +rocksdb::Status Hash::decodeExpireFromValue(const HashMetadata &metadata, std::string *value, uint64_t &expire) { + if (!metadata.IsFieldExpirationEnabled()) { + return rocksdb::Status::OK(); + } + rocksdb::Slice data(value->data(), value->size()); + GetFixed64(&data, &expire); + *value = data.ToString(); + return (expire == 0 || expire > util::GetTimeStampMS()) ? rocksdb::Status::OK() : rocksdb::Status::NotFound(); +} + +rocksdb::Status Hash::encodeExpireToValue(std::string *value, uint64_t expire) { + std::string buf; + PutFixed64(&buf, expire); + buf.append(*value); + value->assign(buf.data(), buf.size()); + return rocksdb::Status::OK(); +} + } // namespace redis diff --git a/src/types/redis_hash.h b/src/types/redis_hash.h index e5779c7c49c..b1a19845bf2 100644 --- a/src/types/redis_hash.h +++ b/src/types/redis_hash.h @@ -39,6 +39,8 @@ struct FieldValue { enum class HashFetchType { kAll = 0, kOnlyKey = 1, kOnlyValue = 2 }; +enum class HashFieldExpireType { None, NX, XX, GT, LT }; + namespace redis { class Hash : public SubKeyScanner { @@ -68,9 +70,19 @@ class Hash : public SubKeyScanner { std::vector *values = nullptr); rocksdb::Status RandField(engine::Context &ctx, const Slice &user_key, int64_t command_count, std::vector *field_values, HashFetchType type = HashFetchType::kOnlyKey); + rocksdb::Status ExpireFields(engine::Context &ctx, const Slice &user_key, uint64_t expire_ms, + const std::vector &fields, HashFieldExpireType type, std::vector *ret); + rocksdb::Status PersistFields(engine::Context &ctx, const Slice &user_key, const std::vector &fields, + std::vector *ret); + rocksdb::Status TTLFields(engine::Context &ctx, const Slice &user_key, const std::vector &fields, + std::vector *ret); + bool ExistValidField(engine::Context &ctx, const Slice &ns_key, const HashMetadata &metadata); + static bool IsFieldExpired(const Slice &metadata_key, const Slice &value); private: rocksdb::Status GetMetadata(engine::Context &ctx, const Slice &ns_key, HashMetadata *metadata); + static rocksdb::Status decodeExpireFromValue(const HashMetadata &metadata, std::string *value, uint64_t &expire); + static rocksdb::Status encodeExpireToValue(std::string *value, uint64_t expire); friend struct FieldValueRetriever; }; diff --git a/tests/gocase/unit/type/hash/hash_test.go b/tests/gocase/unit/type/hash/hash_test.go index caab802af77..7e5da1e32a2 100644 --- a/tests/gocase/unit/type/hash/hash_test.go +++ b/tests/gocase/unit/type/hash/hash_test.go @@ -62,6 +62,16 @@ func TestHash(t *testing.T) { Options: []string{"yes", "no"}, ConfigType: util.YesNo, }, + { + Name: "hash-field-expiration", + Options: []string{"yes", "no"}, + ConfigType: util.YesNo, + }, + { + Name: "rocksdb.read_options.async_io", + Options: []string{"yes", "no"}, + ConfigType: util.YesNo, + }, } configsMatrix, err := util.GenerateConfigsMatrix(configOptions) @@ -72,7 +82,7 @@ func TestHash(t *testing.T) { } } -var testHash = func(t *testing.T, configs util.KvrocksServerConfigs) { +var testHash = func(t *testing.T, configs map[string]string) { srv := util.StartServer(t, configs) defer srv.Close() ctx := context.Background() @@ -840,12 +850,54 @@ var testHash = func(t *testing.T, configs util.KvrocksServerConfigs) { // TODO: Add test to verify randomness of the selected random fields }) + t.Run("HGetAll support map type", func(t *testing.T) { + testKey := "test-hash-1" + require.NoError(t, rdb.Del(ctx, testKey).Err()) + require.NoError(t, rdb.HSet(ctx, testKey, "key1", "value1", "key2", "value2", "key3", "value3").Err()) + result, err := rdb.HGetAll(ctx, testKey).Result() + require.NoError(t, err) + require.Len(t, result, 3) + require.EqualValues(t, map[string]string{ + "key1": "value1", + "key2": "value2", + "key3": "value3", + }, result) + }) + + t.Run("Test bug with large value after compaction", func(t *testing.T) { + testKey := "test-hash-1" + require.NoError(t, rdb.Del(ctx, testKey).Err()) + + src := rand.NewSource(time.Now().UnixNano()) + dd := make([]byte, 5000) + for i := 1; i <= 50; i++ { + for j := range dd { + dd[j] = byte(src.Int63()) + } + key := util.RandString(10, 20, util.Alpha) + require.NoError(t, rdb.HSet(ctx, testKey, key, string(dd)).Err()) + } + + require.EqualValues(t, 50, rdb.HLen(ctx, testKey).Val()) + require.Len(t, rdb.HGetAll(ctx, testKey).Val(), 50) + require.Len(t, rdb.HKeys(ctx, testKey).Val(), 50) + require.Len(t, rdb.HVals(ctx, testKey).Val(), 50) + + require.NoError(t, rdb.Do(ctx, "COMPACT").Err()) + + time.Sleep(5 * time.Second) + + require.EqualValues(t, 50, rdb.HLen(ctx, testKey).Val()) + require.Len(t, rdb.HGetAll(ctx, testKey).Val(), 50) + require.Len(t, rdb.HKeys(ctx, testKey).Val(), 50) + require.Len(t, rdb.HVals(ctx, testKey).Val(), 50) + }) } } -func TestHGetAllWithRESP3(t *testing.T) { +func TestDisableExpireField(t *testing.T) { srv := util.StartServer(t, map[string]string{ - "resp3-enabled": "yes", + "hash-field-expiration": "no", }) defer srv.Close() @@ -854,22 +906,28 @@ func TestHGetAllWithRESP3(t *testing.T) { ctx := context.Background() - testKey := "test-hash-1" - require.NoError(t, rdb.Del(ctx, testKey).Err()) - require.NoError(t, rdb.HSet(ctx, testKey, "key1", "value1", "key2", "value2", "key3", "value3").Err()) - result, err := rdb.HGetAll(ctx, testKey).Result() - require.NoError(t, err) - require.Len(t, result, 3) - require.EqualValues(t, map[string]string{ - "key1": "value1", - "key2": "value2", - "key3": "value3", - }, result) + // can't expire fields when hash-field-expiration option is no + expectedErrMsg := "ERR field expiration feature is disabled" + require.ErrorContains(t, rdb.HExpire(ctx, "foo", time.Second, "f").Err(), expectedErrMsg) + require.ErrorContains(t, rdb.HPExpire(ctx, "foo", time.Second, "f").Err(), expectedErrMsg) + require.ErrorContains(t, rdb.HExpireAt(ctx, "foo", time.Now().Add(1*time.Second), "f").Err(), expectedErrMsg) + require.ErrorContains(t, rdb.HPExpireAt(ctx, "foo", time.Now().Add(1*time.Second), "f").Err(), expectedErrMsg) + + rdb.HSet(ctx, "foo", "f", "v") + require.NoError(t, rdb.ConfigSet(ctx, "hash-field-expiration", "yes").Err()) + require.Equal(t, "v", rdb.HGet(ctx, "foo", "f").Val()) + + // can't expire fields on hash object whose field expiration feature is disabled + expectedErrMsg = "can't expire fields on hash object whose field expiration feature is disabled" + require.ErrorContains(t, rdb.HExpire(ctx, "foo", time.Second, "f").Err(), expectedErrMsg) + require.ErrorContains(t, rdb.HPExpire(ctx, "foo", time.Second, "f").Err(), expectedErrMsg) + require.ErrorContains(t, rdb.HExpireAt(ctx, "foo", time.Now().Add(1*time.Second), "f").Err(), expectedErrMsg) + require.ErrorContains(t, rdb.HPExpireAt(ctx, "foo", time.Now().Add(1*time.Second), "f").Err(), expectedErrMsg) } -func TestHashWithAsyncIOEnabled(t *testing.T) { +func TestHashFieldExpiration(t *testing.T) { srv := util.StartServer(t, map[string]string{ - "rocksdb.read_options.async_io": "yes", + "hash-field-expiration": "yes", }) defer srv.Close() @@ -878,73 +936,201 @@ func TestHashWithAsyncIOEnabled(t *testing.T) { ctx := context.Background() - t.Run("Test bug with large value after compaction", func(t *testing.T) { - testKey := "test-hash-1" - require.NoError(t, rdb.Del(ctx, testKey).Err()) + t.Run("HFE expire a field of hash", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1").Val()) + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "FIELDS", 1, "f1").Val().([]interface{})[0]) - src := rand.NewSource(time.Now().UnixNano()) - dd := make([]byte, 5000) - for i := 1; i <= 50; i++ { - for j := range dd { - dd[j] = byte(src.Int63()) - } - key := util.RandString(10, 20, util.Alpha) - require.NoError(t, rdb.HSet(ctx, testKey, key, string(dd)).Err()) + require.LessOrEqual(t, int64(0), rdb.Do(ctx, "HTTL", "hfe-key", "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, "v1", rdb.HGet(ctx, "hfe-key", "f1").Val()) + time.Sleep(1 * time.Second) + + require.Equal(t, int64(-2), rdb.Do(ctx, "HTTL", "hfe-key", "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, "", rdb.HGet(ctx, "hfe-key", "f1").Val()) + }) + + t.Run("HFE expireat a field of hash", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1").Val()) + + expireTime := time.Now().Add(1 * time.Second).Unix() + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIREAT", "hfe-key", expireTime, "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, expireTime, rdb.Do(ctx, "HEXPIRETIME", "hfe-key", "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, "v1", rdb.HGet(ctx, "hfe-key", "f1").Val()) + time.Sleep(1 * time.Second) + + require.Equal(t, int64(-2), rdb.Do(ctx, "HEXPIRETIME", "hfe-key", "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, "", rdb.HGet(ctx, "hfe-key", "f1").Val()) + }) + + t.Run("HFE check the ttl of field that no associated expiration set and not exist", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1").Val()) + + ttl := rdb.Do(ctx, "HTTL", "hfe-key", "FIELDS", 2, "f1", "not-exist-field").Val().([]interface{}) + require.EqualValues(t, []interface{}{int64(-1), int64(-2)}, ttl) + pttl := rdb.Do(ctx, "HPTTL", "hfe-key", "FIELDS", 2, "f1", "not-exist-field").Val().([]interface{}) + require.EqualValues(t, []interface{}{int64(-1), int64(-2)}, pttl) + + expireTime := rdb.Do(ctx, "HEXPIRETIME", "hfe-key", "FIELDS", 2, "f1", "not-exist-field").Val().([]interface{}) + require.EqualValues(t, []interface{}{int64(-1), int64(-2)}, expireTime) + pexpireTime := rdb.Do(ctx, "HPEXPIRETIME", "hfe-key", "FIELDS", 2, "f1", "not-exist-field").Val().([]interface{}) + require.EqualValues(t, []interface{}{int64(-1), int64(-2)}, pexpireTime) + }) + + t.Run("HFE can not get expired field", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + fieldValue := map[string]string{ + "f1": "v1", + "f2": "v2", } + require.Equal(t, int64(2), rdb.HSet(ctx, "hfe-key", fieldValue).Val()) + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "FIELDS", 1, "f1").Val().([]interface{})[0]) - require.EqualValues(t, 50, rdb.HLen(ctx, testKey).Val()) - require.Len(t, rdb.HGetAll(ctx, testKey).Val(), 50) - require.Len(t, rdb.HKeys(ctx, testKey).Val(), 50) - require.Len(t, rdb.HVals(ctx, testKey).Val(), 50) + require.Equal(t, fieldValue, rdb.HGetAll(ctx, "hfe-key").Val()) + require.Equal(t, []interface{}{"v1", "v2"}, rdb.HMGet(ctx, "hfe-key", "f1", "f2").Val()) - require.NoError(t, rdb.Do(ctx, "COMPACT").Err()) + time.Sleep(1 * time.Second) + delete(fieldValue, "f1") - time.Sleep(5 * time.Second) + require.Equal(t, fieldValue, rdb.HGetAll(ctx, "hfe-key").Val()) + require.Equal(t, []interface{}{nil, "v2"}, rdb.HMGet(ctx, "hfe-key", "f1", "f2").Val()) + }) - require.EqualValues(t, 50, rdb.HLen(ctx, testKey).Val()) - require.Len(t, rdb.HGetAll(ctx, testKey).Val(), 50) - require.Len(t, rdb.HKeys(ctx, testKey).Val(), 50) - require.Len(t, rdb.HVals(ctx, testKey).Val(), 50) + t.Run("HFE check hash metadata after all of fields expired", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1", "f2", "v2").Val()) + + require.Equal(t, "hash", rdb.Type(ctx, "hfe-key").Val()) + require.Equal(t, int64(2), rdb.HLen(ctx, "hfe-key").Val()) + + rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "FIELDS", 2, "f1", "f2") + time.Sleep(1 * time.Second) + + // if a hash object than all of fields was expired, + // the hash object should be treated not exist. + require.Equal(t, int64(0), rdb.HLen(ctx, "hfe-key").Val()) + require.Equal(t, "none", rdb.Type(ctx, "hfe-key").Val()) + require.Equal(t, int64(0), rdb.Exists(ctx, "hfe-key").Val()) + require.Equal(t, time.Duration(-2), rdb.TTL(ctx, "hfe-key").Val()) + require.Equal(t, time.Duration(-2), rdb.PTTL(ctx, "hfe-key").Val()) + require.Equal(t, time.Duration(-2), rdb.ExpireTime(ctx, "hfe-key").Val()) + require.Equal(t, time.Duration(-2), rdb.PExpireTime(ctx, "hfe-key").Val()) + require.Equal(t, false, rdb.ExpireAt(ctx, "hfe-key", time.Now().Add(1*time.Second)).Val()) + require.Equal(t, false, rdb.PExpireAt(ctx, "hfe-key", time.Unix(time.Now().Unix()+1, 0)).Val()) + require.Equal(t, int64(0), rdb.Copy(ctx, "hfe-key", "dst", 0, true).Val()) + require.Equal(t, "", rdb.Dump(ctx, "hfe-key").Val()) + require.Equal(t, int64(0), rdb.Del(ctx, "hfe-key").Val()) }) -} -func TestHashWithAsyncIODisabled(t *testing.T) { - srv := util.StartServer(t, map[string]string{ - "rocksdb.read_options.async_io": "no", + t.Run("HFE expected 0 if delete a expired field", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1", "f2", "v2").Val()) + rdb.Do(ctx, "HPEXPIRE", "hfe-key", 100, "FIELDS", 1, "f1") + time.Sleep(500 * time.Millisecond) + require.Equal(t, int64(0), rdb.HDel(ctx, "hfe-key", "f1").Val()) }) - defer srv.Close() - rdb := srv.NewClient() - defer func() { require.NoError(t, rdb.Close()) }() + t.Run("HFE perist a field of hash", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1", "f2", "v2").Val()) + rdb.Do(ctx, "HPEXPIRE", "hfe-key", 100, "FIELDS", 1, "f1") - ctx := context.Background() + result := rdb.Do(ctx, "HPERSIST", "hfe-key", "FIELDS", 3, "f1", "f2", "not-exist-field").Val().([]interface{}) + require.EqualValues(t, []interface{}{int64(1), int64(-1), int64(-2)}, result) + require.Equal(t, int64(-1), rdb.Do(ctx, "HTTL", "hfe-key", "FIELDS", 1, "f1").Val().([]interface{})[0]) + }) - t.Run("Test bug with large value after compaction", func(t *testing.T) { - testKey := "test-hash-1" - require.NoError(t, rdb.Del(ctx, testKey).Err()) + t.Run("HFE expired field should not be scan", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, int64(2), rdb.HSet(ctx, "hfe-key", "f1", "v1", "f2", "v2").Val()) - src := rand.NewSource(time.Now().UnixNano()) - dd := make([]byte, 5000) - for i := 1; i <= 50; i++ { - for j := range dd { - dd[j] = byte(src.Int63()) - } - key := util.RandString(10, 20, util.Alpha) - require.NoError(t, rdb.HSet(ctx, testKey, key, string(dd)).Err()) - } + keys, cursor := rdb.HScan(ctx, "hfe-key", 0, "*", 10).Val() + require.Equal(t, []string{"f1", "v1", "f2", "v2"}, keys) + require.Equal(t, uint64(0), cursor) + + rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "FIELDS", 1, "f1") + time.Sleep(1 * time.Second) - require.EqualValues(t, 50, rdb.HLen(ctx, testKey).Val()) - require.Len(t, rdb.HGetAll(ctx, testKey).Val(), 50) - require.Len(t, rdb.HKeys(ctx, testKey).Val(), 50) - require.Len(t, rdb.HVals(ctx, testKey).Val(), 50) + keys, cursor = rdb.HScan(ctx, "hfe-key", 0, "*", 10).Val() + require.Equal(t, []string{"f2", "v2"}, keys) + require.Equal(t, uint64(0), cursor) + }) - require.NoError(t, rdb.Do(ctx, "COMPACT").Err()) + t.Run("HFE expire or ttl a not hash object", func(t *testing.T) { + require.Equal(t, "OK", rdb.Set(ctx, "k", "v", 0).Val()) + require.ErrorContains(t, rdb.Do(ctx, "HEXPIRE", "k", 1, "FIELDS", 1, "f").Err(), "WRONGTYPE") + require.ErrorContains(t, rdb.Do(ctx, "HPEXPIRE", "k", 1, "FIELDS", 1, "f").Err(), "WRONGTYPE") + require.ErrorContains(t, rdb.Do(ctx, "HEXPIREAT", "k", 1, "FIELDS", 1, "f").Err(), "WRONGTYPE") + require.ErrorContains(t, rdb.Do(ctx, "HPEXPIREAT", "k", 1, "FIELDS", 1, "f").Err(), "WRONGTYPE") + require.ErrorContains(t, rdb.Do(ctx, "HTTL", "k", "FIELDS", 1, "f").Err(), "WRONGTYPE") + require.ErrorContains(t, rdb.Do(ctx, "HPTTL", "k", "FIELDS", 1, "f").Err(), "WRONGTYPE") + require.ErrorContains(t, rdb.Do(ctx, "HEXPIRETIME", "k", "FIELDS", 1, "f").Err(), "WRONGTYPE") + require.ErrorContains(t, rdb.Do(ctx, "HPEXPIRETIME", "k", "FIELDS", 1, "f").Err(), "WRONGTYPE") + }) - time.Sleep(5 * time.Second) + t.Run("HEF syntax check", func(t *testing.T) { + require.ErrorContains(t, rdb.Do(ctx, "HEXPIRE", "k", 1, "FIELDSS", 1, "f1").Err(), "syntax error") + require.ErrorContains(t, rdb.Do(ctx, "HTTL", "k", 1, "FIELDSS", 1, "f1").Err(), "syntax error") + require.ErrorContains(t, rdb.Do(ctx, "HEXPIRE", "k", "FIELDSS", 1, "f1").Err(), "wrong number of arguments") + require.ErrorContains(t, rdb.Do(ctx, "HEXPIRE", "k", 1, "FIELDS", 1, "f1", "f2").Err(), "wrong number of arguments") + require.ErrorContains(t, rdb.Do(ctx, "HEXPIRE", "k", 1, "FIELDS", 2, "f1").Err(), "wrong number of arguments") + require.ErrorContains(t, rdb.Do(ctx, "HEXPIRE", "k", 1, "FIELDS", 0, "f1").Err(), "wrong number of arguments") + require.ErrorContains(t, rdb.Do(ctx, "HEXPIRE", "k", 1, "FIELDS", 0).Err(), "wrong number of arguments") + require.ErrorContains(t, rdb.Do(ctx, "HEXPIRE", "k", 1, "FIELDS", -1, "f1").Err(), "wrong number of arguments") + }) + + t.Run("HFE expire or expireat with a pass time", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + + require.Equal(t, int64(1), rdb.HSet(ctx, "hfe-key", "f1", "v1").Val()) + require.Equal(t, int64(2), rdb.Do(ctx, "HEXPIRE", "hfe-key", 0, "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, int64(-2), rdb.Do(ctx, "HTTL", "hfe-key", "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, "", rdb.HGet(ctx, "hfe-key", "f1").Val()) + + require.Equal(t, int64(1), rdb.HSet(ctx, "hfe-key", "f1", "v1").Val()) + require.Equal(t, int64(2), rdb.Do(ctx, "HEXPIREAT", "hfe-key", 0, "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, int64(-2), rdb.Do(ctx, "HTTL", "hfe-key", "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, "", rdb.HGet(ctx, "hfe-key", "f1").Val()) + }) + + t.Run("HFE Test hincrby and hincrbyfloat a field with expiration", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, int64(5), rdb.HIncrBy(ctx, "hfe-key", "f", 5).Val()) + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "FIELDS", 1, "f").Val().([]interface{})[0]) + require.Equal(t, "5", rdb.HGet(ctx, "hfe-key", "f").Val()) + require.Equal(t, float64(6.5), rdb.HIncrByFloat(ctx, "hfe-key", "f", 1.5).Val()) + f, _ := rdb.HGet(ctx, "hfe-key", "f").Float64() + require.Equal(t, float64(6.5), f) + time.Sleep(1 * time.Second) + require.Equal(t, "", rdb.HGet(ctx, "hfe-key", "f").Val()) + }) - require.EqualValues(t, 50, rdb.HLen(ctx, testKey).Val()) - require.Len(t, rdb.HGetAll(ctx, testKey).Val(), 50) - require.Len(t, rdb.HKeys(ctx, testKey).Val(), 50) - require.Len(t, rdb.HVals(ctx, testKey).Val(), 50) + t.Run("HFE expire a field with NX/XX/GT/LT option", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1", "f2", "v2").Val()) + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "FIELDS", 1, "f1").Val().([]interface{})[0]) + nxResult := rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "NX", "FIELDS", 2, "f1", "f2").Val().([]interface{}) + require.Equal(t, []interface{}{int64(0), int64(1)}, nxResult) + + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1", "f2", "v2").Val()) + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "FIELDS", 1, "f1").Val().([]interface{})[0]) + xxResult := rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "XX", "FIELDS", 2, "f1", "f2").Val().([]interface{}) + require.Equal(t, []interface{}{int64(1), int64(0)}, xxResult) + + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1", "f2", "v2").Val()) + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIRE", "hfe-key", 10, "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIRE", "hfe-key", 20, "FIELDS", 1, "f2").Val().([]interface{})[0]) + gtResult := rdb.Do(ctx, "HEXPIRE", "hfe-key", 15, "GT", "FIELDS", 2, "f1", "f2").Val().([]interface{}) + require.Equal(t, []interface{}{int64(1), int64(0)}, gtResult) + + require.NoError(t, rdb.Del(ctx, "hfe-key").Err()) + require.Equal(t, true, rdb.HMSet(ctx, "hfe-key", "f1", "v1", "f2", "v2").Val()) + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIRE", "hfe-key", 10, "FIELDS", 1, "f1").Val().([]interface{})[0]) + require.Equal(t, int64(1), rdb.Do(ctx, "HEXPIRE", "hfe-key", 20, "FIELDS", 1, "f2").Val().([]interface{})[0]) + ltResult := rdb.Do(ctx, "HEXPIRE", "hfe-key", 15, "LT", "FIELDS", 2, "f1", "f2").Val().([]interface{}) + require.Equal(t, []interface{}{int64(0), int64(1)}, ltResult) }) }