From 20825e4ad23b62ce13f0a82726b0d7730d4521c1 Mon Sep 17 00:00:00 2001 From: Charles Schleich Date: Wed, 29 Jan 2025 17:06:12 +0100 Subject: [PATCH] Final api polish (#107) * Add Encoding: from_string, with_schema, remove into_encoding, rename Handler size -> Capacity, add Parameters.empty, Parameters.equals * add declare_keyexpr, Query toString, default comment for QueryTarget * Added callback handler to querier.get as part of QuerierGetOptions * sync main --- .../src/handle_control_message.rs | 45 +++++++++--- zenoh-plugin-remote-api/src/interface/mod.rs | 1 + zenoh-ts/src/encoding.ts | 10 +-- .../key_expr/zenoh_keyexpr_wrapper_bg.wasm | Bin 99230 -> 99242 bytes zenoh-ts/src/pubsub.ts | 45 ++++++++++-- zenoh-ts/src/querier.ts | 65 +++++++++++------- zenoh-ts/src/query.ts | 26 +++++-- .../src/remote_api/interface/ControlMsg.ts | 2 +- zenoh-ts/src/remote_api/querier.ts | 3 + zenoh-ts/src/sample.ts | 3 +- zenoh-ts/src/session.ts | 47 ++++--------- 11 files changed, 159 insertions(+), 88 deletions(-) diff --git a/zenoh-plugin-remote-api/src/handle_control_message.rs b/zenoh-plugin-remote-api/src/handle_control_message.rs index b45155f..5fcb100 100644 --- a/zenoh-plugin-remote-api/src/handle_control_message.rs +++ b/zenoh-plugin-remote-api/src/handle_control_message.rs @@ -467,6 +467,7 @@ pub(crate) async fn handle_control_message( encoding, payload, attachment, + handler, } => { if let Some(querier) = state_map.queriers.get(&querier_id) { let mut get_builder = querier.get(); @@ -493,22 +494,44 @@ pub(crate) async fn handle_control_message( add_if_some!(encoding, get_builder); add_if_some!(payload, get_builder); add_if_some!(attachment, get_builder); - let receiver = get_builder.await?; + let ws_tx = state_map.websocket_tx.clone(); let finish_msg = RemoteAPIMsg::Control(ControlMsg::GetFinished { id: get_id }); - spawn_future(async move { - while let Ok(reply) = receiver.recv_async().await { - let reply_ws = ReplyWS::from((reply, get_id)); - let remote_api_msg = RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws)); - if let Err(err) = ws_tx.send(remote_api_msg) { - tracing::error!("{}", err); - } + match handler { + HandlerChannel::Fifo(size) => { + let receiver = get_builder.with(FifoChannel::new(size)).await?; + spawn_future(async move { + while let Ok(reply) = receiver.recv_async().await { + let reply_ws = ReplyWS::from((reply, get_id)); + let remote_api_msg = + RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws)); + if let Err(err) = ws_tx.send(remote_api_msg) { + tracing::error!("{}", err); + } + } + if let Err(err) = ws_tx.send(finish_msg) { + tracing::error!("{}", err); + } + }); } - if let Err(err) = ws_tx.send(finish_msg) { - tracing::error!("{}", err); + HandlerChannel::Ring(size) => { + let receiver = get_builder.with(RingChannel::new(size)).await?; + spawn_future(async move { + while let Ok(reply) = receiver.recv_async().await { + let reply_ws = ReplyWS::from((reply, get_id)); + let remote_api_msg = + RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws)); + if let Err(err) = ws_tx.send(remote_api_msg) { + tracing::error!("{}", err); + } + } + if let Err(err) = ws_tx.send(finish_msg) { + tracing::error!("{}", err); + } + }); } - }); + }; } else { // TODO: Do we want to add an error here ? warn!("No Querier With ID {querier_id} found") diff --git a/zenoh-plugin-remote-api/src/interface/mod.rs b/zenoh-plugin-remote-api/src/interface/mod.rs index bc4044e..c05b32d 100644 --- a/zenoh-plugin-remote-api/src/interface/mod.rs +++ b/zenoh-plugin-remote-api/src/interface/mod.rs @@ -357,6 +357,7 @@ pub enum ControlMsg { payload: Option, #[ts(type = "string | undefined")] attachment: Option, + handler: HandlerChannel, }, // Liveliness diff --git a/zenoh-ts/src/encoding.ts b/zenoh-ts/src/encoding.ts index 2398825..9b3948f 100644 --- a/zenoh-ts/src/encoding.ts +++ b/zenoh-ts/src/encoding.ts @@ -80,12 +80,8 @@ export class Encoding { this._schema = str_rep; } - static intoEncoding(input: IntoEncoding): Encoding { - if (input instanceof Encoding) { - return input; - } else { - return new Encoding(input.toString()); - } + with_schema(input: string){ + this._schema = input; } static default(): Encoding { @@ -95,7 +91,7 @@ export class Encoding { toString(): string { return this._schema; } - static from_str(input: string): Encoding { + static from_string(input: string): Encoding { return new Encoding(input); } diff --git a/zenoh-ts/src/key_expr/zenoh_keyexpr_wrapper_bg.wasm b/zenoh-ts/src/key_expr/zenoh_keyexpr_wrapper_bg.wasm index 3817a9a37c176ce2d73478483685e9eab037118d..c2d9e93ca777834b325544e4dd6a1d2fac116e7c 100644 GIT binary patch delta 510 zcmbQ&&bF$ZZ9>>_4T#&^$ZCt^|cc>{9$1e6z15xf$=1xNE2fdV=H4&D`Nv= zCy?33*v#0{+{?VZmzi;`^z;&WM%n4%AYvkjumTd>*U2+>amG(weRKA{11}r@F_<%R zC@?rZ+1lOw-i#o0OTClAf9; cWT0oHXKAdUk(6d>VPKwYny5KFxS7!#08AgdCIA2c delta 498 zcmZ40&Ni=|Z9*4!ZGAmM0!uxJn%E{kamx=D4na2I%^Mg`GK#b?wlFp^1~oA@Fm?i& z&5W&#ZOy&R+k2TA*Gf;HD9frt_yv3;F9V;5)qf~CvWHcxMExy4}4%%Q;G zwDt4cd+nR9z20AM&dk8A!~hhU|D@~KyGwhvt_KQ%l+69pwPyaD!+qPpN)$m#?%eKv zx?}BxqgSD(bX~u7_0Xde4-&Ue*JM1X!4q1XT2!o`o0(IYnK#{`i7|fr;ULECOsL`? zVi|SWQ;f~ik}V9j?+Rt~Va6>#{Z%xh+Vq@A#xgu=fM%#ncZ*`QA*AM66yp)xYPJLI z`o)MxUL}rE2hFWOKbmcyl*qV90aYJRd@*DG^e0V>QKH5?<%z|)x=ERNDe0+sYzBHp MdX~o1(handler?: FifoChannel | RingChannel | ((sample: T) => Promise)): + [undefined | ((callback: T) => Promise), HandlerChannel] { + + let handler_type: HandlerChannel; + let callback = undefined; + if (handler instanceof FifoChannel || handler instanceof RingChannel) { + switch (handler.channel_type) { + case ChannelType.Ring: { + handler_type = { "Ring": handler.capacity }; + break; + } + case ChannelType.Fifo: { + handler_type = { "Fifo": handler.capacity }; + break; + } + default: { + throw "channel type undetermined" + } + } + } else { + handler_type = { "Fifo": 256 }; + callback = handler; + } + return [callback, handler_type] +} + + // ██████ ██ ██ ██████ ██ ██ ███████ ██ ██ ███████ ██████ // ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ // ██████ ██ ██ ██████ ██ ██ ███████ ███████ █████ ██████ @@ -288,7 +319,7 @@ export class Publisher { } if (put_options?.encoding != null) { - _encoding = Encoding.intoEncoding(put_options.encoding); + _encoding = Encoding.from_string(put_options.encoding.toString()); } else { _encoding = Encoding.default(); } @@ -361,7 +392,7 @@ export class Publisher { if (delete_options.timestamp != null) { _timestamp = delete_options.timestamp.get_resource_uuid() as unknown as string; } - + return this._remote_publisher.delete( _attachment, _timestamp diff --git a/zenoh-ts/src/querier.ts b/zenoh-ts/src/querier.ts index 0955739..4072e1f 100644 --- a/zenoh-ts/src/querier.ts +++ b/zenoh-ts/src/querier.ts @@ -24,8 +24,13 @@ import { RemoteQuerier } from "./remote_api/querier.js"; import { KeyExpr } from "./key_expr.js"; import { Encoding } from "crypto"; import { Receiver } from "./session.js"; -import { Parameters } from "./query.js"; +import { Parameters, Reply } from "./query.js"; +import { check_handler_or_callback, FifoChannel, Handler } from "./pubsub.js"; +/** + * Target For Get queries + * @default BestMatching + */ export enum QueryTarget { /// Let Zenoh find the BestMatching queryable capabale of serving the query. BestMatching, @@ -100,7 +105,10 @@ export function reply_key_expr_to_int(query_target?: ReplyKeyExpr): number { } } - +/** + * QuerierOptions When initializing a Querier + * + */ export interface QuerierOptions { congestion_control?: CongestionControl, consolidation?: ConsolidationMode, @@ -117,6 +125,7 @@ export interface QuerierGetOptions { encoding?: Encoding, payload?: IntoZBytes, attachment?: IntoZBytes, + handler?: ((sample: Reply) => Promise) | Handler } /** @@ -226,36 +235,46 @@ export class Querier { _parameters = parameters.toString(); } + let handler; + if (get_options?.handler !== undefined) { + handler = get_options?.handler; + } else { + handler = new FifoChannel(256); + } + let [callback, handler_type] = check_handler_or_callback(handler); + let chan: SimpleChannel = this._remote_querier.get( + handler_type, _encoding, _parameters, _attachment, _payload, ); - let receiver = new Receiver(chan); - return receiver; - // if (callback != undefined) { - // executeAsync(async () => { - // for await (const message of chan) { - // // This horribleness comes from SimpleChannel sending a 0 when the channel is closed - // if (message != undefined && (message as unknown as number) != 0) { - // let reply = new Reply(message); - // if (callback != undefined) { - // callback(reply); - // } - // } else { - // break - // } - // } - // }); - // return undefined; - // } else { - // return receiver; - // } + if (callback != undefined) { + executeAsync(async () => { + for await (const message of chan) { + // This horribleness comes from SimpleChannel sending a 0 when the channel is closed + if (message != undefined && (message as unknown as number) != 0) { + let reply = new Reply(message); + if (callback != undefined) { + callback(reply); + } + } else { + break + } + } + }); + return undefined; + } else { + let receiver = new Receiver(chan); + return receiver; + } } +} - +function executeAsync(func: any) { + setTimeout(func, 0); } diff --git a/zenoh-ts/src/query.ts b/zenoh-ts/src/query.ts index d2362c9..c519db2 100644 --- a/zenoh-ts/src/query.ts +++ b/zenoh-ts/src/query.ts @@ -124,7 +124,7 @@ export function QueryWS_to_Query( attachment = new ZBytes(query_ws.attachment); } if (query_ws.encoding != null) { - encoding = Encoding.from_str(query_ws.encoding); + encoding = Encoding.from_string(query_ws.encoding); } return new Query( @@ -276,6 +276,9 @@ export class Query { this.reply_ws(qr_variant); } + toString(): string { + return this.key_expr.toString() + "?" + this.parameters.toString() + } } @@ -316,6 +319,21 @@ export class Parameters { } } + /** + * Creates empty Parameters Structs + * @returns void + */ + static empty() { + return new Parameters(""); + } + + /** + * Creates empty Parameters Structs + * @returns void + */ + static equals() { + return new Parameters(""); + } /** * removes a key from the parameters @@ -431,7 +449,7 @@ export class ReplyError { */ constructor(reply_err_ws: ReplyErrorWS) { let payload = new ZBytes(new Uint8Array(b64_bytes_from_str(reply_err_ws.payload))); - let encoding = Encoding.from_str(reply_err_ws.encoding); + let encoding = Encoding.from_string(reply_err_ws.encoding); this._encoding = encoding; this._payload = payload; } @@ -510,7 +528,7 @@ export class Selector { } toString(): string { - if(this._parameters !=undefined) { + if (this._parameters != undefined) { return this._key_expr.toString() + "?" + this._parameters?.toString() } else { return this._key_expr.toString() @@ -526,7 +544,7 @@ export class Selector { if (selector instanceof Selector) { this._key_expr = selector._key_expr; this._parameters = selector._parameters; - return ; + return; } else if (selector instanceof KeyExpr) { key_expr = selector; } else { diff --git a/zenoh-ts/src/remote_api/interface/ControlMsg.ts b/zenoh-ts/src/remote_api/interface/ControlMsg.ts index 36b0ea7..be8e91f 100644 --- a/zenoh-ts/src/remote_api/interface/ControlMsg.ts +++ b/zenoh-ts/src/remote_api/interface/ControlMsg.ts @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel.js"; import type { LivelinessMsg } from "./LivelinessMsg.js"; import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper.js"; -export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "NewTimestamp": string } | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, target: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, handler: HandlerChannel, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg }; +export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "NewTimestamp": string } | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, target: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, handler: HandlerChannel, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, handler: HandlerChannel, } } | { "Liveliness": LivelinessMsg }; diff --git a/zenoh-ts/src/remote_api/querier.ts b/zenoh-ts/src/remote_api/querier.ts index 99250c6..7fd00b2 100644 --- a/zenoh-ts/src/remote_api/querier.ts +++ b/zenoh-ts/src/remote_api/querier.ts @@ -18,6 +18,7 @@ import { ControlMsg } from "./interface/ControlMsg.js" import { SimpleChannel } from "channel-ts"; import { ReplyWS } from "./interface/ReplyWS.js"; import { encode as b64_str_from_bytes } from "base64-arraybuffer"; +import { HandlerChannel } from "./interface/HandlerChannel.js"; export class RemoteQuerier { private querier_id: UUIDv4; @@ -41,6 +42,7 @@ export class RemoteQuerier { } get( + _handler_type: HandlerChannel, _encoding?: string, _parameters?: string, _attachment?: Array, @@ -66,6 +68,7 @@ export class RemoteQuerier { encoding: _encoding, payload: payload, attachment: attachment, + handler: _handler_type } }; diff --git a/zenoh-ts/src/sample.ts b/zenoh-ts/src/sample.ts index aab1653..bf8db3f 100644 --- a/zenoh-ts/src/sample.ts +++ b/zenoh-ts/src/sample.ts @@ -25,6 +25,7 @@ import { encode as b64_str_from_bytes, decode as b64_bytes_from_str, } from "bas /** * Kinds of Samples that can be received from Zenoh * @enum + * @default PUT */ export enum SampleKind { PUT = "PUT", @@ -285,7 +286,7 @@ export function Sample_from_SampleWS(sample_ws: SampleWS) { let key_exr = new KeyExpr(sample_ws.key_expr); - let encoding = Encoding.from_str(sample_ws.encoding); + let encoding = Encoding.from_string(sample_ws.encoding); let priority = priority_from_int(sample_ws.priority); diff --git a/zenoh-ts/src/session.ts b/zenoh-ts/src/session.ts index fef4740..a671748 100644 --- a/zenoh-ts/src/session.ts +++ b/zenoh-ts/src/session.ts @@ -35,7 +35,7 @@ import { Reply, Selector, } from "./query.js"; -import { ChannelType, FifoChannel, Handler, NewSubscriber, Publisher, RingChannel, Subscriber } from "./pubsub.js"; +import { check_handler_or_callback, FifoChannel, Handler, NewSubscriber, Publisher, Subscriber } from "./pubsub.js"; import { priority_to_int, congestion_control_to_int, @@ -52,7 +52,6 @@ import { ChannelState } from "channel-ts"; import { Config } from "./config.js"; import { Encoding } from "./encoding.js"; import { QueryReplyWS } from "./remote_api/interface/QueryReplyWS.js"; -import { HandlerChannel } from "./remote_api/interface/HandlerChannel.js"; import { SessionInfo as SessionInfoIface } from "./remote_api/interface/SessionInfo.js"; // External deps import { Duration, TimeDuration } from 'typed-duration' @@ -252,6 +251,15 @@ export class Session { ); } + /** + * Creates a Key Expression + * + * @returns KeyExpr + */ + declare_keyexpr(key_expr: IntoKeyExpr): KeyExpr { + return new KeyExpr(key_expr) + } + /** * Returns the Zenoh SessionInfo Object * @@ -306,35 +314,6 @@ export class Session { ); } - /** - * @ignore internal function for handlers - */ - private check_handler_or_callback(handler?: FifoChannel | RingChannel | ((sample: T) => Promise)): - [undefined | ((callback: T) => Promise), HandlerChannel] { - - let handler_type: HandlerChannel; - let callback = undefined; - if (handler instanceof FifoChannel || handler instanceof RingChannel) { - switch (handler.channel_type) { - case ChannelType.Ring: { - handler_type = { "Ring": handler.size }; - break; - } - case ChannelType.Fifo: { - handler_type = { "Fifo": handler.size }; - break; - } - default: { - throw "channel type undetermined" - } - } - } else { - handler_type = { "Fifo": 256 }; - callback = handler; - } - return [callback, handler_type] - } - /** * Issues a get query on a Zenoh session * @@ -373,7 +352,7 @@ export class Session { handler = new FifoChannel(256); } - let [callback, handler_type] = this.check_handler_or_callback(handler); + let [callback, handler_type] = check_handler_or_callback(handler); // Optional Parameters @@ -457,7 +436,7 @@ export class Session { if (handler === undefined) { handler = new FifoChannel(256); } - let [callback, handler_type] = this.check_handler_or_callback(handler); + let [callback, handler_type] = check_handler_or_callback(handler); if (callback !== undefined) { callback_subscriber = true; @@ -535,7 +514,7 @@ export class Session { } else { handler = new FifoChannel(256); } - let [callback, handler_type] = this.check_handler_or_callback(handler); + let [callback, handler_type] = check_handler_or_callback(handler); let callback_queryable = false; if (callback != undefined) {