-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat:zmax && zmin #2966
feat:zmax && zmin #2966
Conversation
WalkthroughThe changes in this pull request enhance the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (1)
tests/integration/zset_test.go (1)
Line range hint
400-600
: Consider adding more test scenarios for ZPopMax/ZPopMin.While the existing tests cover basic functionality well, consider adding tests for:
- Empty sets
- Concurrent access patterns
- Large sets with performance implications
- Edge cases with identical scores
This would strengthen the test coverage for the new ZPopMax/ZPopMin functionality mentioned in the PR title.
Example test case for empty sets:
It("should handle ZPopMax/ZPopMin on empty sets", func() { members, err := client.ZPopMax(ctx, "empty_set").Result() Expect(err).NotTo(HaveOccurred()) Expect(members).To(BeEmpty()) members, err = client.ZPopMin(ctx, "empty_set").Result() Expect(err).NotTo(HaveOccurred()) Expect(members).To(BeEmpty()) })
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
include/pika_cache.h
(1 hunks)include/pika_zset.h
(2 hunks)src/cache/src/zset.cc
(1 hunks)src/pika_cache.cc
(1 hunks)src/pika_command.cc
(1 hunks)src/pika_zset.cc
(2 hunks)tests/integration/zset_test.go
(1 hunks)
🔇 Additional comments (11)
include/pika_cache.h (2)
169-172
: LGTM: Method signature formatting changes
The formatting changes to ZRevrank
and ZRangebylex
improve readability while maintaining functionality.
176-179
: LGTM: Well-structured new method declarations
The new ZPopMin
and ZPopMax
method declarations follow the established pattern of other Z* methods in the class, maintaining consistency in:
- Parameter types and order
- Return type (rocksdb::Status)
- Use of shared_ptr
include/pika_zset.h (1)
606-607
: LGTM: Consistent command pattern implementation
The addition of DoThroughDB
and DoUpdateCache
methods to both command classes follows the established command pattern and properly overrides the base class methods.
Also applies to: 628-629
src/pika_zset.cc (4)
1510-1512
: LGTM!
The implementation of DoThroughDB
for ZPopmaxCmd
correctly delegates to the Do
method.
1514-1519
: LGTM!
The DoUpdateCache
implementation for ZPopmaxCmd
correctly:
- Checks for successful operation status
- Updates cache using ZPopMax with proper parameters
1537-1539
: LGTM!
The implementation of DoThroughDB
for ZPopminCmd
correctly delegates to the Do
method.
1541-1546
: LGTM!
The DoUpdateCache
implementation for ZPopminCmd
correctly:
- Checks for successful operation status
- Updates cache using ZPopMin with proper parameters
src/pika_cache.cc (2)
1468-1480
: LGTM!
The ZPopMin
implementation correctly:
- Uses proper locking mechanism for thread safety
- Handles cache miss cases appropriately
- Returns proper status codes
1482-1494
: LGTM!
The ZPopMax
implementation correctly:
- Uses proper locking mechanism for thread safety
- Handles cache miss cases appropriately
- Returns proper status codes
src/pika_command.cc (1)
603-603
: LGTM!
The command flags for both ZPopmaxCmd and ZPopminCmd are correctly updated to include:
- kCmdFlagsDoThroughDB: Enables database operations
- kCmdFlagsUpdateCache: Enables cache updates
Also applies to: 607-607
tests/integration/zset_test.go (1)
1443-1468
: LGTM! Well-structured test case.
The test case is well-implemented with:
- Clear setup of test data
- Proper error handling
- Comprehensive assertions for both return value and remaining set members
- Consistent style with other test cases
Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) { | ||
zitem* items = nullptr; | ||
unsigned long items_size = 0; | ||
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); | ||
DEFER { | ||
DecrObjectsRefCount(kobj); | ||
}; | ||
|
||
int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size); | ||
if (C_OK != ret) { | ||
if (REDIS_KEY_NOT_EXIST == ret) { | ||
return Status::NotFound("key not in cache"); | ||
} | ||
return Status::Corruption("RcZrange failed"); | ||
} | ||
|
||
unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size); | ||
for (unsigned long i = 0; i < to_return; ++i) { | ||
storage::ScoreMember sm; | ||
sm.score = items[i].score; | ||
sm.member.assign(items[i].member, sdslen(items[i].member)); | ||
score_members->push_back(sm); | ||
} | ||
|
||
robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); | ||
for (unsigned long i = 0; i < items_size; ++i) { | ||
members_obj[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member))); | ||
} | ||
DEFER { | ||
FreeObjectList(members_obj, items_size); | ||
}; | ||
|
||
RcZRem(cache_, kobj, members_obj, to_return); | ||
|
||
FreeZitemList(items, items_size); | ||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Performance concern: Consider optimizing range retrieval
The implementation fetches the entire sorted set using RcZrange(0, -1)
even when only a small count is needed. This could be inefficient for large sets.
Consider implementing a more efficient approach:
- int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
+ int ret = RcZrange(cache_, kobj, 0, count - 1, &items, &items_size);
Committable suggestion skipped: line range outside the PR's diff.
Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) { | ||
zitem* items = nullptr; | ||
unsigned long items_size = 0; | ||
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); | ||
DEFER { | ||
DecrObjectsRefCount(kobj); | ||
}; | ||
|
||
int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size); | ||
if (C_OK != ret) { | ||
if (REDIS_KEY_NOT_EXIST == ret) { | ||
return Status::NotFound("key not in cache"); | ||
} | ||
return Status::Corruption("RcZrange failed"); | ||
} | ||
|
||
unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size); | ||
for (unsigned long i = items_size - to_return; i < items_size; ++i) { | ||
storage::ScoreMember sm; | ||
sm.score = items[i].score; | ||
sm.member.assign(items[i].member, sdslen(items[i].member)); | ||
score_members->push_back(sm); | ||
} | ||
|
||
robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); | ||
for (unsigned long i = items_size - 1; i >= 0; --i) { | ||
members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member))); | ||
} | ||
|
||
DEFER { | ||
FreeObjectList(members_obj, items_size); | ||
}; | ||
|
||
RcZRem(cache_, kobj, members_obj, to_return); | ||
|
||
FreeZitemList(items, items_size); | ||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Potential integer underflow in ZPopMax
The loop for creating member objects in ZPopMax
has a potential integer underflow issue when decrementing an unsigned long.
Fix the loop to prevent integer underflow:
- for (unsigned long i = items_size - 1; i >= 0; --i) {
+ for (unsigned long i = items_size; i > 0; --i) {
+ unsigned long idx = i - 1;
- members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
+ members_obj[items_size - i] = createObject(OBJ_STRING, sdsnewlen(items[idx].member, sdslen(items[idx].member)));
}
Additionally, consider the same performance optimization as suggested for ZPopMin
:
- int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size);
+ int ret = RcZrange(cache_, kobj, -count, -1, &items, &items_size);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) { | |
zitem* items = nullptr; | |
unsigned long items_size = 0; | |
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); | |
DEFER { | |
DecrObjectsRefCount(kobj); | |
}; | |
int ret = RcZrange(cache_, kobj, 0, -1, &items, &items_size); | |
if (C_OK != ret) { | |
if (REDIS_KEY_NOT_EXIST == ret) { | |
return Status::NotFound("key not in cache"); | |
} | |
return Status::Corruption("RcZrange failed"); | |
} | |
unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size); | |
for (unsigned long i = items_size - to_return; i < items_size; ++i) { | |
storage::ScoreMember sm; | |
sm.score = items[i].score; | |
sm.member.assign(items[i].member, sdslen(items[i].member)); | |
score_members->push_back(sm); | |
} | |
robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); | |
for (unsigned long i = items_size - 1; i >= 0; --i) { | |
members_obj[items_size - 1 - i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member))); | |
} | |
DEFER { | |
FreeObjectList(members_obj, items_size); | |
}; | |
RcZRem(cache_, kobj, members_obj, to_return); | |
FreeZitemList(items, items_size); | |
return Status::OK(); | |
} | |
Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) { | |
zitem* items = nullptr; | |
unsigned long items_size = 0; | |
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size())); | |
DEFER { | |
DecrObjectsRefCount(kobj); | |
}; | |
int ret = RcZrange(cache_, kobj, -count, -1, &items, &items_size); | |
if (C_OK != ret) { | |
if (REDIS_KEY_NOT_EXIST == ret) { | |
return Status::NotFound("key not in cache"); | |
} | |
return Status::Corruption("RcZrange failed"); | |
} | |
unsigned long to_return = std::min(static_cast<unsigned long>(count), items_size); | |
for (unsigned long i = items_size - to_return; i < items_size; ++i) { | |
storage::ScoreMember sm; | |
sm.score = items[i].score; | |
sm.member.assign(items[i].member, sdslen(items[i].member)); | |
score_members->push_back(sm); | |
} | |
robj** members_obj = (robj**)zcallocate(sizeof(robj*) * items_size); | |
for (unsigned long i = items_size; i > 0; --i) { | |
unsigned long idx = i - 1; | |
members_obj[items_size - i] = createObject(OBJ_STRING, sdsnewlen(items[idx].member, sdslen(items[idx].member))); | |
} | |
DEFER { | |
FreeObjectList(members_obj, items_size); | |
}; | |
RcZRem(cache_, kobj, members_obj, to_return); | |
FreeZitemList(items, items_size); | |
return Status::OK(); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (1)
src/cache/include/cache.h (1)
150-151
: LGTM! Consider adding method documentation.The new
ZPopMin
andZPopMax
methods have well-designed signatures that align with the existing Zset command patterns. The parameter types and order are consistent with other methods in the class.Consider adding documentation comments to explain:
- The behavior when count > number of available elements
- Whether the operations are atomic
- The ordering of elements with equal scores
+ // Removes and returns up to 'count' members with the lowest scores from the sorted set at 'key'. + // Returns removed members and their scores in 'score_members'. Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members); + // Removes and returns up to 'count' members with the highest scores from the sorted set at 'key'. + // Returns removed members and their scores in 'score_members'. Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
src/cache/include/cache.h
(1 hunks)src/pika_cache.cc
(1 hunks)tests/integration/zset_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/integration/zset_test.go
🔇 Additional comments (2)
src/cache/include/cache.h (1)
150-151
: Verify integration with existing Zset methods.
Let's verify the implementation details and integration with other Zset methods.
✅ Verification successful
Let me gather more information to verify the implementation details of ZPopMin/ZPopMax.
Integration with Zset methods is properly implemented
Based on the code analysis:
- ZPopMin/ZPopMax implementations correctly use existing Zset methods (ZRange internally)
- Error handling is consistent with other Zset operations, following the same Status patterns
- The implementation is well-tested as evidenced by comprehensive test coverage in storage/tests/zsets_test.cc
- The methods are properly integrated across different layers (storage, cache, and pika)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check implementation details and integration with other Zset methods
# Check if ZPopMin/ZPopMax use existing Zset methods internally
ast-grep --pattern 'ZPopMin($$$) {
$$$
ZRange($$$)
$$$
ZRem($$$)
$$$
}'
# Check for consistent error handling with other Zset methods
rg -A 2 'return Status::' src/cache/src/zset.cc
# Check test coverage
fd -e cc -e cpp test_zset
Length of output: 3379
Script:
#!/bin/bash
# Search for ZPopMin/ZPopMax implementations
rg -A 10 "Status.*ZPopMin|Status.*ZPopMax" src/cache/src/
# Check for test files specifically related to ZPop operations
rg -l "ZPopMin|ZPopMax" tests/
# Look for any existing usage patterns
rg -A 3 "ZPopMin|ZPopMax" src/ --type cpp
Length of output: 12317
src/pika_cache.cc (1)
1468-1496
: Implementation follows good practices
The new methods demonstrate several positive aspects:
- Proper thread safety using mutex locks
- Consistent error handling using Status objects
- Clean integration with the existing cache architecture
Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members, | ||
const std::shared_ptr<DB> &db) { | ||
int cache_index = CacheIndex(key); | ||
std::lock_guard lm(*cache_mutexs_[cache_index]); | ||
|
||
auto cache_obj = caches_[cache_index]; | ||
Status s; | ||
|
||
if (cache_obj->Exists(key)) { | ||
return cache_obj->ZPopMax(key, count, score_members); | ||
} else { | ||
return Status::NotFound("key not in cache"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add parameter validation and cache state management
The implementation has the same issues as ZPopMin and needs similar improvements for robustness and consistency:
- Validate that count is positive
- Handle potential memory allocation failures
- Add cache state reload after pop operation
Consider this implementation:
Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);
auto cache_obj = caches_[cache_index];
- Status s;
+ if (count <= 0) {
+ return Status::InvalidArgument("count should be positive");
+ }
if (cache_obj->Exists(key)) {
- return cache_obj->ZPopMax(key, count, score_members);
+ auto s = cache_obj->ZPopMax(key, count, score_members);
+ if (s.ok()) {
+ ReloadCacheKeyIfNeeded(cache_obj, key, -1, -1, db);
+ }
+ return s;
} else {
return Status::NotFound("key not in cache");
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members, | |
const std::shared_ptr<DB> &db) { | |
int cache_index = CacheIndex(key); | |
std::lock_guard lm(*cache_mutexs_[cache_index]); | |
auto cache_obj = caches_[cache_index]; | |
Status s; | |
if (cache_obj->Exists(key)) { | |
return cache_obj->ZPopMax(key, count, score_members); | |
} else { | |
return Status::NotFound("key not in cache"); | |
} | |
} | |
Status PikaCache::ZPopMax(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members, | |
const std::shared_ptr<DB> &db) { | |
int cache_index = CacheIndex(key); | |
std::lock_guard lm(*cache_mutexs_[cache_index]); | |
auto cache_obj = caches_[cache_index]; | |
if (count <= 0) { | |
return Status::InvalidArgument("count should be positive"); | |
} | |
if (cache_obj->Exists(key)) { | |
auto s = cache_obj->ZPopMax(key, count, score_members); | |
if (s.ok()) { | |
ReloadCacheKeyIfNeeded(cache_obj, key, -1, -1, db); | |
} | |
return s; | |
} else { | |
return Status::NotFound("key not in cache"); | |
} | |
} |
Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members, | ||
const std::shared_ptr<DB> &db) { | ||
int cache_index = CacheIndex(key); | ||
std::lock_guard lm(*cache_mutexs_[cache_index]); | ||
|
||
auto cache_obj = caches_[cache_index]; | ||
Status s; | ||
|
||
if (cache_obj->Exists(key)) { | ||
return cache_obj->ZPopMin(key, count, score_members); | ||
} else { | ||
return Status::NotFound("key not in cache"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add parameter validation and cache state management
The implementation needs additional robustness checks and consistency with other cache operations:
- Validate that count is positive
- Handle potential memory allocation failures
- Add cache state reload after pop operation like other modifying operations
Consider this implementation:
Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<DB> &db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);
auto cache_obj = caches_[cache_index];
- Status s;
+ if (count <= 0) {
+ return Status::InvalidArgument("count should be positive");
+ }
if (cache_obj->Exists(key)) {
- return cache_obj->ZPopMin(key, count, score_members);
+ auto s = cache_obj->ZPopMin(key, count, score_members);
+ if (s.ok()) {
+ ReloadCacheKeyIfNeeded(cache_obj, key, -1, -1, db);
+ }
+ return s;
} else {
return Status::NotFound("key not in cache");
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members, | |
const std::shared_ptr<DB> &db) { | |
int cache_index = CacheIndex(key); | |
std::lock_guard lm(*cache_mutexs_[cache_index]); | |
auto cache_obj = caches_[cache_index]; | |
Status s; | |
if (cache_obj->Exists(key)) { | |
return cache_obj->ZPopMin(key, count, score_members); | |
} else { | |
return Status::NotFound("key not in cache"); | |
} | |
} | |
Status PikaCache::ZPopMin(std::string &key, int64_t count, std::vector<storage::ScoreMember> *score_members, | |
const std::shared_ptr<DB> &db) { | |
int cache_index = CacheIndex(key); | |
std::lock_guard lm(*cache_mutexs_[cache_index]); | |
auto cache_obj = caches_[cache_index]; | |
if (count <= 0) { | |
return Status::InvalidArgument("count should be positive"); | |
} | |
if (cache_obj->Exists(key)) { | |
auto s = cache_obj->ZPopMin(key, count, score_members); | |
if (s.ok()) { | |
ReloadCacheKeyIfNeeded(cache_obj, key, -1, -1, db); | |
} | |
return s; | |
} else { | |
return Status::NotFound("key not in cache"); | |
} | |
} |
* zmax && zmin * zopmax && zpopmin --------- Co-authored-by: chejinge <[email protected]>
#2892
Summary by CodeRabbit
ZPopMin
andZPopMax
methods to remove and return elements from sorted sets.ZPopmaxCmd
andZPopminCmd
with new methods for database interaction and cache updates.ZPopMax
andZPopMin
methods for sorted sets.