Skip to content

Commit

Permalink
Support hash field expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 committed Dec 20, 2024
1 parent a45b2c6 commit 2208cdb
Show file tree
Hide file tree
Showing 11 changed files with 1,050 additions and 93 deletions.
5 changes: 5 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ json-storage-format json
# Default: no
txn-context-enabled no

# Whether to enable hash field expiration feature.
# NOTE: This option only affects newly hash object
# Default: no
hash-field-expiration no

################################## TLS ###################################

# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
Expand Down
244 changes: 243 additions & 1 deletion src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "error_constants.h"
#include "scan_base.h"
#include "server/server.h"
#include "time_util.h"
#include "types/redis_hash.h"

namespace redis {
Expand Down Expand Up @@ -444,6 +445,238 @@ class CommandHRandField : public Commander {
bool no_parameters_ = true;
};

class CommandFieldExpireBase : public Commander {
protected:
Status commonParse(const std::vector<std::string> &args, int start_idx) {
CommandParser parser(args, start_idx);
std::string_view expire_flag, num_flag;
uint64_t fields_num = 0;
while (parser.Good()) {
if (parser.EatEqICaseFlag("FIELDS", num_flag)) {
fields_num = GET_OR_RET(parser.template TakeInt<uint64_t>());
break;
} else if (parser.EatEqICaseFlag("NX", expire_flag)) {
field_expire_type_ = HashFieldExpireType::NX;
} else if (parser.EatEqICaseFlag("XX", expire_flag)) {
field_expire_type_ = HashFieldExpireType::XX;
} else if (parser.EatEqICaseFlag("GT", expire_flag)) {
field_expire_type_ = HashFieldExpireType::GT;
} else if (parser.EatEqICaseFlag("LT", expire_flag)) {
field_expire_type_ = HashFieldExpireType::LT;
} else {
return parser.InvalidSyntax();
}
}

auto remains = parser.Remains();
auto size = args.size();
if (remains != fields_num) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

for (size_t i = size - remains; i < size; i++) {
fields_.emplace_back(args_[i]);
}

return Status::OK();
}

Status expireFieldExecute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) {
if (!srv->storage->GetConfig()->hash_field_expiration) {
return {Status::RedisExecErr, "field expiration feature is disabled"};
}

std::vector<int8_t> ret;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.ExpireFields(ctx, args_[1], expire_, fields_, field_expire_type_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiLen(ret.size());
for (const auto &i : ret) {
output->append(redis::Integer(i));
}

return Status::OK();
}

Status ttlExpireExecute(engine::Context &ctx, Server *srv, Connection *conn, std::vector<int64_t> &ret) {
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.TTLFields(ctx, args_[1], fields_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
return Status::OK();
}

uint64_t expire_ = 0;
HashFieldExpireType field_expire_type_ = HashFieldExpireType::None;
std::vector<Slice> fields_;
};

class CommandHExpire : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result * 1000 + util::GetTimeStampMS();
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHExpireAt : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result * 1000;
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHPExpire : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result + util::GetTimeStampMS();
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHPExpireAt : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

expire_ = *parse_result;
return CommandFieldExpireBase::commonParse(args, 3);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(ctx, srv, conn, output);
}
};

class CommandHExpireTime : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
if (ttl > 0) {
output->append(redis::Integer((now + ttl) / 1000));
} else {
output->append(redis::Integer(ttl));
}
}
return Status::OK();
}
};

class CommandHPExpireTime : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
if (ttl > 0) {
output->append(redis::Integer(now + ttl));
} else {
output->append(redis::Integer(ttl));
}
}
return Status::OK();
}
};

class CommandHTTL : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
output->append(redis::Integer(ttl > 0 ? ttl / 1000 : ttl));
}
return Status::OK();
}
};

class CommandHPTTL : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(ctx, srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto &ttl : ret) {
output->append(redis::Integer(ttl));
}
return Status::OK();
}
};

class CommandHPersist : public CommandFieldExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override { return CommandFieldExpireBase::commonParse(args, 2); }

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
std::vector<int8_t> ret;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.PersistFields(ctx, args_[1], fields_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiLen(ret.size());
for (const auto &i : ret) {
output->append(redis::Integer(i));
}
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, "write", 1, 1, 1),
Expand All @@ -460,6 +693,15 @@ REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
MakeCmdAttr<CommandHGetAll>("hgetall", 2, "read-only slow", 1, 1, 1),
MakeCmdAttr<CommandHScan>("hscan", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRangeByLex>("hrangebylex", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only slow", 1, 1, 1), )
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only slow", 1, 1, 1),
MakeCmdAttr<CommandHExpire>("hexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireAt>("hexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireTime>("hexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPExpire>("hpexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireAt>("hpexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireTime>("hpexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandHTTL>("httl", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPTTL>("hpttl", -5, "read-only", 1, 1, 1), )

} // namespace redis
1 change: 1 addition & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ Config::Config() {
new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)},
{"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)},
{"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)},
{"hash-field-expiration", false, new YesNoField(&hash_field_expiration, false)},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down
3 changes: 3 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ struct Config {

bool skip_block_cache_deallocation_on_close = false;

// whether to enable hash field expiration feature
bool hash_field_expiration = false;

struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
Expand Down
5 changes: 4 additions & 1 deletion src/storage/compact_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "db_util.h"
#include "time_util.h"
#include "types/redis_bitmap.h"
#include "types/redis_hash.h"

namespace engine {

Expand Down Expand Up @@ -132,7 +133,9 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const Slice &key, const Sl
return false;
}

return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value));
return IsMetadataExpired(ikey, metadata) ||
(metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value)) ||
(metadata.Type() == kRedisHash && redis::Hash::IsFieldExpired(cached_metadata_, value));
}

} // namespace engine
Loading

0 comments on commit 2208cdb

Please sign in to comment.