Skip to content

Commit

Permalink
Final api polish (#107)
Browse files Browse the repository at this point in the history
* Add Encoding: from_string, with_schema, remove into_encoding, rename Handler size -> Capacity, add Parameters.empty, Parameters.equals

* add declare_keyexpr, Query toString, default comment for QueryTarget

* Added callback handler to querier.get as part of QuerierGetOptions

* sync main
  • Loading branch information
Charles-Schleich authored Jan 29, 2025
1 parent 3b90efb commit 20825e4
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 88 deletions.
45 changes: 34 additions & 11 deletions zenoh-plugin-remote-api/src/handle_control_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ pub(crate) async fn handle_control_message(
encoding,
payload,
attachment,
handler,
} => {
if let Some(querier) = state_map.queriers.get(&querier_id) {
let mut get_builder = querier.get();
Expand All @@ -493,22 +494,44 @@ pub(crate) async fn handle_control_message(
add_if_some!(encoding, get_builder);
add_if_some!(payload, get_builder);
add_if_some!(attachment, get_builder);
let receiver = get_builder.await?;

let ws_tx = state_map.websocket_tx.clone();
let finish_msg = RemoteAPIMsg::Control(ControlMsg::GetFinished { id: get_id });

spawn_future(async move {
while let Ok(reply) = receiver.recv_async().await {
let reply_ws = ReplyWS::from((reply, get_id));
let remote_api_msg = RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws));
if let Err(err) = ws_tx.send(remote_api_msg) {
tracing::error!("{}", err);
}
match handler {
HandlerChannel::Fifo(size) => {
let receiver = get_builder.with(FifoChannel::new(size)).await?;
spawn_future(async move {
while let Ok(reply) = receiver.recv_async().await {
let reply_ws = ReplyWS::from((reply, get_id));
let remote_api_msg =
RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws));
if let Err(err) = ws_tx.send(remote_api_msg) {
tracing::error!("{}", err);
}
}
if let Err(err) = ws_tx.send(finish_msg) {
tracing::error!("{}", err);
}
});
}
if let Err(err) = ws_tx.send(finish_msg) {
tracing::error!("{}", err);
HandlerChannel::Ring(size) => {
let receiver = get_builder.with(RingChannel::new(size)).await?;
spawn_future(async move {
while let Ok(reply) = receiver.recv_async().await {
let reply_ws = ReplyWS::from((reply, get_id));
let remote_api_msg =
RemoteAPIMsg::Data(DataMsg::GetReply(reply_ws));
if let Err(err) = ws_tx.send(remote_api_msg) {
tracing::error!("{}", err);
}
}
if let Err(err) = ws_tx.send(finish_msg) {
tracing::error!("{}", err);
}
});
}
});
};
} else {
// TODO: Do we want to add an error here ?
warn!("No Querier With ID {querier_id} found")
Expand Down
1 change: 1 addition & 0 deletions zenoh-plugin-remote-api/src/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ pub enum ControlMsg {
payload: Option<B64String>,
#[ts(type = "string | undefined")]
attachment: Option<B64String>,
handler: HandlerChannel,
},

// Liveliness
Expand Down
10 changes: 3 additions & 7 deletions zenoh-ts/src/encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ export class Encoding {
this._schema = str_rep;
}

static intoEncoding(input: IntoEncoding): Encoding {
if (input instanceof Encoding) {
return input;
} else {
return new Encoding(input.toString());
}
with_schema(input: string){
this._schema = input;
}

static default(): Encoding {
Expand All @@ -95,7 +91,7 @@ export class Encoding {
toString(): string {
return this._schema;
}
static from_str(input: string): Encoding {
static from_string(input: string): Encoding {
return new Encoding(input);
}

Expand Down
Binary file modified zenoh-ts/src/key_expr/zenoh_keyexpr_wrapper_bg.wasm
Binary file not shown.
45 changes: 38 additions & 7 deletions zenoh-ts/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from "./sample.js";
import { Encoding, IntoEncoding } from "./encoding.js";
import { Timestamp } from "./timestamp.js";
import { HandlerChannel } from "./remote_api/interface/HandlerChannel.js";


// ███████ ██ ██ ██████ ███████ ██████ ██████ ██ ██████ ███████ ██████
Expand Down Expand Up @@ -149,7 +150,7 @@ export enum ChannelType {
* @ignore
*/
export interface Handler {
size: number;
capacity: number;
channel_type: ChannelType;
}

Expand All @@ -158,10 +159,10 @@ export interface Handler {
* Semantic: will drop oldest data when full
*/
export class RingChannel implements Handler {
size: number
capacity: number
channel_type: ChannelType = ChannelType.Ring;
constructor(size: number) {
this.size = size;
this.capacity = size;
}
}

Expand All @@ -170,13 +171,43 @@ export class RingChannel implements Handler {
* Semantic: will block incoming messages when full
*/
export class FifoChannel implements Handler {
size: number
capacity: number
channel_type: ChannelType = ChannelType.Fifo;
constructor(size: number) {
this.size = size;
this.capacity = size;
}
}

/**
* @ignore internal function for handlers
*/
export function check_handler_or_callback<T>(handler?: FifoChannel | RingChannel | ((sample: T) => Promise<void>)):
[undefined | ((callback: T) => Promise<void>), HandlerChannel] {

let handler_type: HandlerChannel;
let callback = undefined;
if (handler instanceof FifoChannel || handler instanceof RingChannel) {
switch (handler.channel_type) {
case ChannelType.Ring: {
handler_type = { "Ring": handler.capacity };
break;
}
case ChannelType.Fifo: {
handler_type = { "Fifo": handler.capacity };
break;
}
default: {
throw "channel type undetermined"
}
}
} else {
handler_type = { "Fifo": 256 };
callback = handler;
}
return [callback, handler_type]
}


// ██████ ██ ██ ██████ ██ ██ ███████ ██ ██ ███████ ██████
// ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
// ██████ ██ ██ ██████ ██ ██ ███████ ███████ █████ ██████
Expand Down Expand Up @@ -288,7 +319,7 @@ export class Publisher {
}

if (put_options?.encoding != null) {
_encoding = Encoding.intoEncoding(put_options.encoding);
_encoding = Encoding.from_string(put_options.encoding.toString());
} else {
_encoding = Encoding.default();
}
Expand Down Expand Up @@ -361,7 +392,7 @@ export class Publisher {
if (delete_options.timestamp != null) {
_timestamp = delete_options.timestamp.get_resource_uuid() as unknown as string;
}

return this._remote_publisher.delete(
_attachment,
_timestamp
Expand Down
65 changes: 42 additions & 23 deletions zenoh-ts/src/querier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ import { RemoteQuerier } from "./remote_api/querier.js";
import { KeyExpr } from "./key_expr.js";
import { Encoding } from "crypto";
import { Receiver } from "./session.js";
import { Parameters } from "./query.js";
import { Parameters, Reply } from "./query.js";
import { check_handler_or_callback, FifoChannel, Handler } from "./pubsub.js";

/**
* Target For Get queries
* @default BestMatching
*/
export enum QueryTarget {
/// Let Zenoh find the BestMatching queryable capabale of serving the query.
BestMatching,
Expand Down Expand Up @@ -100,7 +105,10 @@ export function reply_key_expr_to_int(query_target?: ReplyKeyExpr): number {
}
}


/**
* QuerierOptions When initializing a Querier
*
*/
export interface QuerierOptions {
congestion_control?: CongestionControl,
consolidation?: ConsolidationMode,
Expand All @@ -117,6 +125,7 @@ export interface QuerierGetOptions {
encoding?: Encoding,
payload?: IntoZBytes,
attachment?: IntoZBytes,
handler?: ((sample: Reply) => Promise<void>) | Handler
}

/**
Expand Down Expand Up @@ -226,36 +235,46 @@ export class Querier {
_parameters = parameters.toString();
}

let handler;
if (get_options?.handler !== undefined) {
handler = get_options?.handler;
} else {
handler = new FifoChannel(256);
}
let [callback, handler_type] = check_handler_or_callback<Reply>(handler);

let chan: SimpleChannel<ReplyWS> = this._remote_querier.get(
handler_type,
_encoding,
_parameters,
_attachment,
_payload,
);

let receiver = new Receiver(chan);

return receiver;
// if (callback != undefined) {
// executeAsync(async () => {
// for await (const message of chan) {
// // This horribleness comes from SimpleChannel sending a 0 when the channel is closed
// if (message != undefined && (message as unknown as number) != 0) {
// let reply = new Reply(message);
// if (callback != undefined) {
// callback(reply);
// }
// } else {
// break
// }
// }
// });
// return undefined;
// } else {
// return receiver;
// }
if (callback != undefined) {
executeAsync(async () => {
for await (const message of chan) {
// This horribleness comes from SimpleChannel sending a 0 when the channel is closed
if (message != undefined && (message as unknown as number) != 0) {
let reply = new Reply(message);
if (callback != undefined) {
callback(reply);
}
} else {
break
}
}
});
return undefined;
} else {
let receiver = new Receiver(chan);
return receiver;
}

}
}


function executeAsync(func: any) {
setTimeout(func, 0);
}
26 changes: 22 additions & 4 deletions zenoh-ts/src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export function QueryWS_to_Query(
attachment = new ZBytes(query_ws.attachment);
}
if (query_ws.encoding != null) {
encoding = Encoding.from_str(query_ws.encoding);
encoding = Encoding.from_string(query_ws.encoding);
}

return new Query(
Expand Down Expand Up @@ -276,6 +276,9 @@ export class Query {
this.reply_ws(qr_variant);
}

toString(): string {
return this.key_expr.toString() + "?" + this.parameters.toString()
}
}


Expand Down Expand Up @@ -316,6 +319,21 @@ export class Parameters {
}
}

/**
* Creates empty Parameters Structs
* @returns void
*/
static empty() {
return new Parameters("");
}

/**
* Creates empty Parameters Structs
* @returns void
*/
static equals() {
return new Parameters("");
}

/**
* removes a key from the parameters
Expand Down Expand Up @@ -431,7 +449,7 @@ export class ReplyError {
*/
constructor(reply_err_ws: ReplyErrorWS) {
let payload = new ZBytes(new Uint8Array(b64_bytes_from_str(reply_err_ws.payload)));
let encoding = Encoding.from_str(reply_err_ws.encoding);
let encoding = Encoding.from_string(reply_err_ws.encoding);
this._encoding = encoding;
this._payload = payload;
}
Expand Down Expand Up @@ -510,7 +528,7 @@ export class Selector {
}

toString(): string {
if(this._parameters !=undefined) {
if (this._parameters != undefined) {
return this._key_expr.toString() + "?" + this._parameters?.toString()
} else {
return this._key_expr.toString()
Expand All @@ -526,7 +544,7 @@ export class Selector {
if (selector instanceof Selector) {
this._key_expr = selector._key_expr;
this._parameters = selector._parameters;
return ;
return;
} else if (selector instanceof KeyExpr) {
key_expr = selector;
} else {
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ts/src/remote_api/interface/ControlMsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import type { HandlerChannel } from "./HandlerChannel.js";
import type { LivelinessMsg } from "./LivelinessMsg.js";
import type { OwnedKeyExprWrapper } from "./OwnedKeyExprWrapper.js";

export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "NewTimestamp": string } | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, target: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, handler: HandlerChannel, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "Liveliness": LivelinessMsg };
export type ControlMsg = "OpenSession" | "CloseSession" | { "Session": string } | { "NewTimestamp": string } | "SessionInfo" | { "Get": { key_expr: OwnedKeyExprWrapper, parameters: string | null, handler: HandlerChannel, id: string, consolidation: number | undefined, timeout: number | undefined, congestion_control: number | undefined, priority: number | undefined, target: number | undefined, express: boolean | undefined, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, } } | { "GetFinished": { id: string, } } | { "Put": { key_expr: OwnedKeyExprWrapper, payload: B64String, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "Delete": { key_expr: OwnedKeyExprWrapper, congestion_control: number | undefined, priority: number | undefined, express: boolean | undefined, attachment: string | undefined, timestamp: string | undefined, } } | { "DeclareSubscriber": { key_expr: OwnedKeyExprWrapper, handler: HandlerChannel, id: string, } } | { "Subscriber": string } | { "UndeclareSubscriber": string } | { "DeclarePublisher": { key_expr: OwnedKeyExprWrapper, encoding: string | undefined, congestion_control: number | undefined, priority: number | undefined, reliability: number | undefined, express: boolean | undefined, id: string, } } | { "UndeclarePublisher": string } | { "DeclareQueryable": { key_expr: OwnedKeyExprWrapper, id: string, complete: boolean, handler: HandlerChannel, } } | { "UndeclareQueryable": string } | { "DeclareQuerier": { id: string, key_expr: OwnedKeyExprWrapper, target: number | undefined, timeout: number | undefined, accept_replies: number | undefined, allowed_destination: number | undefined, congestion_control: number | undefined, priority: number | undefined, consolidation: number | undefined, express: boolean | undefined, } } | { "UndeclareQuerier": string } | { "QuerierGet": { querier_id: string, get_id: string, encoding: string | undefined, payload: string | undefined, attachment: string | undefined, handler: HandlerChannel, } } | { "Liveliness": LivelinessMsg };
3 changes: 3 additions & 0 deletions zenoh-ts/src/remote_api/querier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { ControlMsg } from "./interface/ControlMsg.js"
import { SimpleChannel } from "channel-ts";
import { ReplyWS } from "./interface/ReplyWS.js";
import { encode as b64_str_from_bytes } from "base64-arraybuffer";
import { HandlerChannel } from "./interface/HandlerChannel.js";

export class RemoteQuerier {
private querier_id: UUIDv4;
Expand All @@ -41,6 +42,7 @@ export class RemoteQuerier {
}

get(
_handler_type: HandlerChannel,
_encoding?: string,
_parameters?: string,
_attachment?: Array<number>,
Expand All @@ -66,6 +68,7 @@ export class RemoteQuerier {
encoding: _encoding,
payload: payload,
attachment: attachment,
handler: _handler_type
}
};

Expand Down
Loading

0 comments on commit 20825e4

Please sign in to comment.