Skip to content

Commit

Permalink
implementation of cluster scan (#2257)
Browse files Browse the repository at this point in the history
* implementation of cluster scan

Signed-off-by: avifenesh <[email protected]>

* fix linter

Signed-off-by: avifenesh <[email protected]>

* added cluster scan tests

Signed-off-by: lior sventitzky <[email protected]>
Signed-off-by: avifenesh <[email protected]>

* added scan standalone impl

Signed-off-by: avifenesh <[email protected]>

* fix decoding

Signed-off-by: avifenesh <[email protected]>

* add standalone scan tests and fix tests

Signed-off-by: lior sventitzky <[email protected]>

* fix lints

Signed-off-by: avifenesh <[email protected]>

* round 1

Signed-off-by: avifenesh <[email protected]>

* round 2

Signed-off-by: avifenesh <[email protected]>

* round 4

Signed-off-by: avifenesh <[email protected]>

---------

Signed-off-by: avifenesh <[email protected]>
Signed-off-by: lior sventitzky <[email protected]>
Co-authored-by: lior sventitzky <[email protected]>
  • Loading branch information
avifenesh and liorsve authored Sep 12, 2024
1 parent 747cd14 commit 13a2a86
Show file tree
Hide file tree
Showing 8 changed files with 1,033 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
* Java, Node, Python: Add SCRIPT SHOW command (Valkey-8) ([#2171](https://github.com/valkey-io/valkey-glide/pull/2171))
* Java, Node, Python: Change BITCOUNT end param to optional (Valkey-8) ([#2248](https://github.com/valkey-io/valkey-glide/pull/2248))
* Java, Node, Python: Add NOSCORES option to ZSCAN & NOVALUES option to HSCAN (Valkey-8) ([#2174](https://github.com/valkey-io/valkey-glide/pull/2174))
* Node: Add SCAN command ([#2257](https://github.com/valkey-io/valkey-glide/pull/2257))

#### Breaking Changes
* Java: Update INFO command ([#2274](https://github.com/valkey-io/valkey-glide/pull/2274))
Expand Down
2 changes: 1 addition & 1 deletion node/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
*/

export { Script } from "glide-rs";
export { ClusterScanCursor, Script } from "glide-rs";
export * from "./src/BaseClient";
export * from "./src/Commands";
export * from "./src/Errors";
Expand Down
60 changes: 59 additions & 1 deletion node/rust-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tikv_jemallocator::Jemalloc;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

pub const FINISHED_SCAN_CURSOR: &str = "finished";
use byteorder::{LittleEndian, WriteBytesExt};
use bytes::Bytes;
use glide_core::start_socket_listener;
Expand Down Expand Up @@ -420,3 +420,61 @@ impl Drop for Script {
glide_core::scripts_container::remove_script(&self.hash);
}
}

/// This struct is used to keep track of the cursor of a cluster scan.
/// We want to avoid passing the cursor between layers of the application,
/// So we keep the state in the container and only pass the id of the cursor.
/// The cursor is stored in the container and can be retrieved using the id.
/// The cursor is removed from the container when the object is deleted (dropped).
/// To create a cursor:
/// ```typescript
/// // For a new cursor
/// let cursor = new ClusterScanCursor();
/// // Using an existing id
/// let cursor = new ClusterScanCursor("cursor_id");
/// ```
/// To get the cursor id:
/// ```typescript
/// let cursorId = cursor.getCursor();
/// ```
/// To check if the scan is finished:
/// ```typescript
/// let isFinished = cursor.isFinished(); // true if the scan is finished
/// ```
#[napi]
#[derive(Default)]
pub struct ClusterScanCursor {
cursor: String,
}

#[napi]
impl ClusterScanCursor {
#[napi(constructor)]
#[allow(dead_code)]
pub fn new(new_cursor: Option<String>) -> Self {
match new_cursor {
Some(cursor) => ClusterScanCursor { cursor },
None => ClusterScanCursor::default(),
}
}

/// Returns the cursor id.
#[napi]
#[allow(dead_code)]
pub fn get_cursor(&self) -> String {
self.cursor.clone()
}

#[napi]
#[allow(dead_code)]
/// Returns true if the scan is finished.
pub fn is_finished(&self) -> bool {
self.cursor.eq(FINISHED_SCAN_CURSOR)
}
}

impl Drop for ClusterScanCursor {
fn drop(&mut self) {
glide_core::cluster_scan_container::remove_scan_state_cursor(self.cursor.clone());
}
}
60 changes: 50 additions & 10 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
*/
import {
ClusterScanCursor,
DEFAULT_TIMEOUT_IN_MILLISECONDS,
Script,
StartSocketConnection,
Expand Down Expand Up @@ -575,6 +576,24 @@ export interface ScriptOptions {
args?: GlideString[];
}

/**
* Enum of Valkey data types
* `STRING`
* `LIST`
* `SET`
* `ZSET`
* `HASH`
* `STREAM`
*/
export enum ObjectType {
STRING = "String",
LIST = "List",
SET = "Set",
ZSET = "ZSet",
HASH = "Hash",
STREAM = "Stream",
}

function getRequestErrorClass(
type: response.RequestErrorType | null | undefined,
): typeof RequestError {
Expand Down Expand Up @@ -686,7 +705,7 @@ export type WritePromiseOptions = RouteOption & DecoderOption;

export class BaseClient {
private socket: net.Socket;
private readonly promiseCallbackFunctions: [
protected readonly promiseCallbackFunctions: [
PromiseFunction,
ErrorFunction,
][] = [];
Expand All @@ -695,7 +714,7 @@ export class BaseClient {
private writeInProgress = false;
private remainingReadData: Uint8Array | undefined;
private readonly requestTimeout: number; // Timeout in milliseconds
private isClosed = false;
protected isClosed = false;
protected defaultDecoder = Decoder.String;
private readonly pubsubFutures: [PromiseFunction, ErrorFunction][] = [];
private pendingPushNotification: response.Response[] = [];
Expand Down Expand Up @@ -867,7 +886,7 @@ export class BaseClient {
this.defaultDecoder = options?.defaultDecoder ?? Decoder.String;
}

private getCallbackIndex(): number {
protected getCallbackIndex(): number {
return (
this.availableCallbackSlots.pop() ??
this.promiseCallbackFunctions.length
Expand Down Expand Up @@ -895,7 +914,8 @@ export class BaseClient {
command:
| command_request.Command
| command_request.Command[]
| command_request.ScriptInvocation,
| command_request.ScriptInvocation
| command_request.ClusterScan,
options: WritePromiseOptions = {},
): Promise<T> {
const route = toProtobufRoute(options?.route);
Expand All @@ -914,6 +934,10 @@ export class BaseClient {
(resolveAns: T) => {
try {
if (resolveAns instanceof PointerResponse) {
// valueFromSplitPointer method is used to convert a pointer from a protobuf response into a TypeScript object.
// The protobuf response is received on a socket and the value in the response is a pointer to a Rust object.
// The pointer is a split pointer because JavaScript doesn't support `u64` and pointers in Rust can be `u64`,
// so we represent it with two`u32`(`high` and`low`).
if (typeof resolveAns === "number") {
resolveAns = valueFromSplitPointer(
0,
Expand All @@ -929,6 +953,16 @@ export class BaseClient {
}
}

if (command instanceof command_request.ClusterScan) {
const resolveAnsArray = resolveAns as [
ClusterScanCursor,
GlideString[],
];
resolveAnsArray[0] = new ClusterScanCursor(
resolveAnsArray[0].toString(),
);
}

resolve(resolveAns);
} catch (err) {
Logger.log(
Expand All @@ -945,12 +979,13 @@ export class BaseClient {
});
}

private writeOrBufferCommandRequest(
protected writeOrBufferCommandRequest(
callbackIdx: number,
command:
| command_request.Command
| command_request.Command[]
| command_request.ScriptInvocation,
| command_request.ScriptInvocation
| command_request.ClusterScan,
route?: command_request.Routes,
) {
const message = Array.isArray(command)
Expand All @@ -965,10 +1000,15 @@ export class BaseClient {
callbackIdx,
singleCommand: command,
})
: command_request.CommandRequest.create({
callbackIdx,
scriptInvocation: command,
});
: command instanceof command_request.ClusterScan
? command_request.CommandRequest.create({
callbackIdx,
clusterScan: command,
})
: command_request.CommandRequest.create({
callbackIdx,
scriptInvocation: command,
});
message.route = route;

this.writeOrBufferRequest(
Expand Down
32 changes: 32 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
GlideRecord,
GlideString,
HashDataType,
ObjectType,
SortedSetDataType,
} from "./BaseClient";
/* eslint-disable-next-line @typescript-eslint/no-unused-vars */
Expand Down Expand Up @@ -1655,6 +1656,26 @@ export function createZMScore(
return createCommand(RequestType.ZMScore, [key, ...members]);
}

/**
* @internal
*/
export function createScan(
cursor: GlideString,
options?: ScanOptions,
): command_request.Command {
let args: GlideString[] = [cursor];

if (options) {
args = args.concat(convertBaseScanOptionsToArgsArray(options));
}

if (options?.type) {
args.push("TYPE", options.type);
}

return createCommand(RequestType.Scan, args);
}

export enum InfBoundary {
/**
* Positive infinity bound.
Expand Down Expand Up @@ -3810,6 +3831,17 @@ export interface BaseScanOptions {
readonly count?: number;
}

/**
* Options for the SCAN command.
* `match`: The match filter is applied to the result of the command and will only include keys that match the pattern specified.
* `count`: `COUNT` is a just a hint for the command for how many elements to fetch from the server, the default is 10.
* `type`: The type of the object to scan.
* Types are the data types of Valkey: `string`, `list`, `set`, `zset`, `hash`, `stream`.
*/
export interface ScanOptions extends BaseScanOptions {
type?: ObjectType;
}

/**
* Options specific to the ZSCAN command, extending from the base scan options.
*/
Expand Down
50 changes: 50 additions & 0 deletions node/src/GlideClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
createPing,
createPublish,
createRandomKey,
createScan,
createScriptExists,
createScriptFlush,
createScriptKill,
Expand All @@ -55,6 +56,7 @@ import {
FunctionStatsFullResponse,
InfoOptions,
LolwutOptions,
ScanOptions,
} from "./Commands";
import { connection_request } from "./ProtobufMessage";
import { Transaction } from "./Transaction";
Expand Down Expand Up @@ -976,4 +978,52 @@ export class GlideClient extends BaseClient {
decoder: Decoder.String,
});
}

/**
* Incrementally iterate over a collection of keys.
* `SCAN` is a cursor based iterator. This means that at every call of the method,
* the server returns an updated cursor that the user needs to use as the cursor argument in the next call.
* An iteration starts when the cursor is set to "0", and terminates when the cursor returned by the server is "0".
*
* A full iteration always retrieves all the elements that were present
* in the collection from the start to the end of a full iteration.
* Elements that were not constantly present in the collection during a full iteration, may be returned or not.
*
* @see {@link https://valkey.io/commands/scan|valkey.io} for more details.
*
* @param cursor - The `cursor` used for iteration. For the first iteration, the cursor should be set to "0".
* Using a non-zero cursor in the first iteration,
* or an invalid cursor at any iteration, will lead to undefined results.
* Using the same cursor in multiple iterations will, in case nothing changed between the iterations,
* return the same elements multiple times.
* If the the db has changed, it may result in undefined behavior.
* @param options - (Optional) The options to use for the scan operation, see {@link ScanOptions} and {@link DecoderOption}.
* @returns A List containing the next cursor value and a list of keys,
* formatted as [cursor, [key1, key2, ...]]
*
* @example
* ```typescript
* // Example usage of scan method
* let result = await client.scan('0');
* console.log(result); // Output: ['17', ['key1', 'key2', 'key3', 'key4', 'key5', 'set1', 'set2', 'set3']]
* let firstCursorResult = result[0];
* result = await client.scan(firstCursorResult);
* console.log(result); // Output: ['349', ['key4', 'key5', 'set1', 'hash1', 'zset1', 'list1', 'list2',
* // 'list3', 'zset2', 'zset3', 'zset4', 'zset5', 'zset6']]
* result = await client.scan(result[0]);
* console.log(result); // Output: ['0', ['key6', 'key7']]
*
* result = await client.scan(firstCursorResult, {match: 'key*', count: 2});
* console.log(result); // Output: ['6', ['key4', 'key5']]
*
* result = await client.scan("0", {type: ObjectType.Set});
* console.log(result); // Output: ['362', ['set1', 'set2', 'set3']]
* ```
*/
public async scan(
cursor: GlideString,
options?: ScanOptions & DecoderOption,
): Promise<[GlideString, GlideString[]]> {
return this.createWritePromise(createScan(cursor, options), options);
}
}
Loading

0 comments on commit 13a2a86

Please sign in to comment.