From 63aaf144a9f8ca9082fedcdf1dba3a1bf977c8a3 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Thu, 30 Jan 2025 00:41:25 +0100 Subject: [PATCH 1/2] API alignment: handler is always in options --- README.md | 2 +- zenoh-ts/examples/chat/src/chat_session.ts | 24 ++++++++++++---------- zenoh-ts/examples/deno/src/z_sub.ts | 2 +- zenoh-ts/src/liveliness.ts | 10 ++++----- zenoh-ts/src/session.ts | 17 ++++++++++++--- 5 files changed, 34 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 64e0a66..28c06c7 100644 --- a/README.md +++ b/README.md @@ -145,7 +145,7 @@ The file `EXAMPLE_CONFIG.json5` references the `zenoh-plugin-remote-api\EXAMPLE_ yarn build ``` - The result are placed into the `zenoh-ts/dist` directory. + The result is placed into the `zenoh-ts/dist` directory. This library is currently compatible with browsers, but not with NodeJS due to websocket library limitations. diff --git a/zenoh-ts/examples/chat/src/chat_session.ts b/zenoh-ts/examples/chat/src/chat_session.ts index 4219976..ba04660 100644 --- a/zenoh-ts/examples/chat/src/chat_session.ts +++ b/zenoh-ts/examples/chat/src/chat_session.ts @@ -121,18 +121,20 @@ export class ChatSession { log(`[Session] Declare publisher on ${keyexpr}`); this.message_subscriber = await this.session.declare_subscriber(KEYEXPR_CHAT_USER.join("*"), - (sample: Sample) => { - let message = deserialize_string(sample.payload().to_bytes()); - log(`[Subscriber] Received message: ${message} from ${sample.keyexpr().toString()}`); - let user = ChatUser.fromKeyexpr(sample.keyexpr()); - if (user) { - const timestamp = new Date().toISOString(); - this.messages.push({ t: timestamp, u: user.username, m: message }); - if (this.messageCallback) { - this.messageCallback(user, message); + { + handler: (sample: Sample) => { + let message = deserialize_string(sample.payload().to_bytes()); + log(`[Subscriber] Received message: ${message} from ${sample.keyexpr().toString()}`); + let user = ChatUser.fromKeyexpr(sample.keyexpr()); + if (user) { + const timestamp = new Date().toISOString(); + this.messages.push({ t: timestamp, u: user.username, m: message }); + if (this.messageCallback) { + this.messageCallback(user, message); + } } + return Promise.resolve(); } - return Promise.resolve(); } ); log(`[Session] Declare Subscriber on ${KEYEXPR_CHAT_USER.join("*").toString()}`); @@ -142,7 +144,7 @@ export class ChatSession { // Subscribe to changes of users presence this.liveliness_subscriber = this.session.liveliness().declare_subscriber(KEYEXPR_CHAT_USER.join("*"), { - callback: (sample: Sample) => { + handler: (sample: Sample) => { let keyexpr = sample.keyexpr(); let user = ChatUser.fromKeyexpr(keyexpr); if (!user) { diff --git a/zenoh-ts/examples/deno/src/z_sub.ts b/zenoh-ts/examples/deno/src/z_sub.ts index c14574e..c9b41d4 100644 --- a/zenoh-ts/examples/deno/src/z_sub.ts +++ b/zenoh-ts/examples/deno/src/z_sub.ts @@ -24,7 +24,7 @@ export async function main() { const session = await Session.open(new Config("ws/127.0.0.1:10000")); const key_expr = new KeyExpr(args.key); - const poll_subscriber: Subscriber = session.declare_subscriber(key_expr, new RingChannel(10)); + const poll_subscriber: Subscriber = session.declare_subscriber(key_expr, { handler: new RingChannel(10) }); let sample = await poll_subscriber.receive(); diff --git a/zenoh-ts/src/liveliness.ts b/zenoh-ts/src/liveliness.ts index b2d0a88..3692cc2 100644 --- a/zenoh-ts/src/liveliness.ts +++ b/zenoh-ts/src/liveliness.ts @@ -25,12 +25,12 @@ function executeAsync(func: any) { } interface LivelinessSubscriberOptions { - callback?: (sample: Sample) => Promise, + handler?: (sample: Sample) => Promise, // TODO: add | Handler, history: boolean, } interface LivelinessGetOptions { - callback?: (reply: Reply) => Promise, + handler?: (reply: Reply) => Promise, // TODO: add | Handler, timeout?: TimeDuration, } @@ -61,8 +61,8 @@ export class Liveliness { let remote_subscriber; let callback_subscriber = false; - if (options?.callback !== undefined) { - let callback = options?.callback; + if (options?.handler !== undefined) { + let callback = options?.handler; callback_subscriber = true; const callback_conversion = async function (sample_ws: SampleWS,): Promise { let sample: Sample = Sample_from_SampleWS(sample_ws); @@ -102,7 +102,7 @@ export class Liveliness { let receiver = Receiver.new(chan); - let callback = options?.callback; + let callback = options?.handler; if (callback !== undefined) { executeAsync(async () => { for await (const message of chan) { diff --git a/zenoh-ts/src/session.ts b/zenoh-ts/src/session.ts index 934ddef..68434d8 100644 --- a/zenoh-ts/src/session.ts +++ b/zenoh-ts/src/session.ts @@ -150,6 +150,14 @@ export interface PublisherOptions { reliability?: Reliability, } +/** + * Options for a Subscriber + * @prop handler - Callback function for this subscriber + */ +export interface SubscriberOptions { + handler?: ((sample: Sample) => Promise) | Handler, +} + // ███████ ███████ ███████ ███████ ██ ██████ ███ ██ // ██ ██ ██ ██ ██ ██ ██ ████ ██ // ███████ █████ ███████ ███████ ██ ██ ██ ██ ██ ██ @@ -422,20 +430,23 @@ export class Session { * If a Subscriber is created with a callback, it cannot be simultaneously polled for new values * * @param {IntoKeyExpr} key_expr - string of key_expression - * @param {((sample: Sample) => Promise) | Handler} handler - Either a HandlerChannel or a Callback Function to be called for all samples + * @param {SubscriberOptions} subscriber_opts - Options for the subscriber, including a handler * * @returns Subscriber */ // Handler size : This is to match the API_DATA_RECEPTION_CHANNEL_SIZE of zenoh internally declare_subscriber( key_expr: IntoKeyExpr, - handler?: ((sample: Sample) => Promise) | Handler + subscriber_opts?: SubscriberOptions ): Subscriber { let _key_expr = new KeyExpr(key_expr); let remote_subscriber: RemoteSubscriber; let callback_subscriber = false; - if (handler === undefined) { + let handler; + if (subscriber_opts?.handler !== undefined) { + handler = subscriber_opts?.handler; + } else { handler = new FifoChannel(256); } let [callback, handler_type] = check_handler_or_callback(handler); From caaa6566ca598c07e4f9d2cd7be0a3367ef8da3a Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Thu, 30 Jan 2025 00:51:46 +0100 Subject: [PATCH 2/2] build fix --- zenoh-ts/examples/deno/src/z_ping.ts | 2 +- zenoh-ts/examples/deno/src/z_pong.ts | 2 +- zenoh-ts/examples/deno/src/z_sub_thr.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/zenoh-ts/examples/deno/src/z_ping.ts b/zenoh-ts/examples/deno/src/z_ping.ts index 8dbee5a..b269f96 100644 --- a/zenoh-ts/examples/deno/src/z_ping.ts +++ b/zenoh-ts/examples/deno/src/z_ping.ts @@ -18,7 +18,7 @@ import { Encoding, CongestionControl, Config, Session } from "@eclipse-zenoh/zen export async function main() { const session = await Session.open(new Config("ws/127.0.0.1:10000")); - const sub = session.declare_subscriber("test/pong", new FifoChannel(256) ); + const sub = session.declare_subscriber("test/pong", { handler: new FifoChannel(256) } ); const pub = session.declare_publisher( "test/ping", { diff --git a/zenoh-ts/examples/deno/src/z_pong.ts b/zenoh-ts/examples/deno/src/z_pong.ts index c5c27c8..83e68be 100644 --- a/zenoh-ts/examples/deno/src/z_pong.ts +++ b/zenoh-ts/examples/deno/src/z_pong.ts @@ -29,7 +29,7 @@ export async function main() { pub.put(sample.payload()); }; - session.declare_subscriber("test/pong", subscriber_callback); + session.declare_subscriber("test/pong", { handler: subscriber_callback } ); let count = 0; while (true) { diff --git a/zenoh-ts/examples/deno/src/z_sub_thr.ts b/zenoh-ts/examples/deno/src/z_sub_thr.ts index 1a904dc..07c2274 100644 --- a/zenoh-ts/examples/deno/src/z_sub_thr.ts +++ b/zenoh-ts/examples/deno/src/z_sub_thr.ts @@ -63,7 +63,7 @@ export async function main() { console.warn("Declare subscriber"); session.declare_subscriber( "test/thr", - subscriber_callback + { handler: subscriber_callback } ); let count = 0;