diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt index 4ad182514..db7797290 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt @@ -519,6 +519,28 @@ class Session private constructor(private val config: Config) : AutoCloseable { ) } + fun get( + selector: Selector, + callback: Callback, + payload: String, + encoding: Encoding? = null, + attachment: String? = null, + timeout: Duration = Duration.ofMillis(10000), + target: QueryTarget = QueryTarget.BEST_MATCHING, + consolidation: ConsolidationMode = ConsolidationMode.AUTO, + onClose: (() -> Unit)? = null + ): Result = get( + selector, + callback, + ZBytes.from(payload), + encoding, + attachment?.let { ZBytes.from(it) }, + timeout, + target, + consolidation, + onClose + ) + /** * Performs a Get query on the [selector], handling the replies with a [Handler]. * @@ -599,6 +621,28 @@ class Session private constructor(private val config: Config) : AutoCloseable { ) } + fun get( + selector: Selector, + handler: Handler, + payload: String, + encoding: Encoding? = null, + attachment: String? = null, + timeout: Duration = Duration.ofMillis(10000), + target: QueryTarget = QueryTarget.BEST_MATCHING, + consolidation: ConsolidationMode = ConsolidationMode.AUTO, + onClose: (() -> Unit)? = null + ): Result = get( + selector, + handler, + ZBytes.from(payload), + encoding, + attachment?.let { ZBytes.from(it) }, + timeout, + target, + consolidation, + onClose + ) + /** * Performs a Get query on the [selector], handling the replies with a blocking [Channel]. * @@ -663,6 +707,28 @@ class Session private constructor(private val config: Config) : AutoCloseable { ) } + fun get( + selector: Selector, + channel: Channel, + payload: String, + encoding: Encoding? = null, + attachment: String? = null, + timeout: Duration = Duration.ofMillis(10000), + target: QueryTarget = QueryTarget.BEST_MATCHING, + consolidation: ConsolidationMode = ConsolidationMode.AUTO, + onClose: (() -> Unit)? = null + ): Result> = get( + selector, + channel, + ZBytes.from(payload), + encoding, + attachment?.let { ZBytes.from(it) }, + timeout, + target, + consolidation, + onClose + ) + /** * Declare a [Put] with the provided value on the specified key expression. * @@ -698,6 +764,16 @@ class Session private constructor(private val config: Config) : AutoCloseable { return resolvePut(keyExpr, put) } + fun put( + keyExpr: KeyExpr, + payload: String, + encoding: Encoding = Encoding.default(), + qos: QoS = QoS.default(), + attachment: String? = null, + reliability: Reliability = Reliability.RELIABLE + ): Result = + put(keyExpr, ZBytes.from(payload), encoding, qos, attachment?.let { ZBytes.from(it) }, reliability) + /** * Perform a delete operation. * @@ -730,6 +806,16 @@ class Session private constructor(private val config: Config) : AutoCloseable { return resolveDelete(keyExpr, delete) } + fun delete( + keyExpr: KeyExpr, + qos: QoS = QoS.default(), + attachment: String, + reliability: Reliability = Reliability.RELIABLE + ): Result { + val delete = Delete(keyExpr, qos, ZBytes.from(attachment), reliability) + return resolveDelete(keyExpr, delete) + } + /** * Obtain a [Liveliness] instance tied to this Zenoh session. */ @@ -749,7 +835,12 @@ class Session private constructor(private val config: Config) : AutoCloseable { return SessionInfo(this) } - private fun resolvePublisher(keyExpr: KeyExpr, qos: QoS, encoding: Encoding, reliability: Reliability): Result { + private fun resolvePublisher( + keyExpr: KeyExpr, + qos: QoS, + encoding: Encoding, + reliability: Reliability + ): Result { return jniSession?.run { declarePublisher(keyExpr, qos, encoding, reliability).onSuccess { declarations.add(it) } } ?: Result.failure(sessionClosedException) diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/pubsub/Publisher.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/pubsub/Publisher.kt index 270b53b0a..20b973ab2 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/pubsub/Publisher.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/pubsub/Publisher.kt @@ -81,13 +81,19 @@ class Publisher internal constructor( fun priority() = qos.priority /** Performs a PUT operation on the specified [keyExpr] with the specified [payload]. */ - fun put(payload: IntoZBytes, encoding: Encoding? = null, attachment: IntoZBytes? = null) = jniPublisher?.put(payload, encoding ?: this.encoding, attachment) ?: InvalidPublisherResult + fun put(payload: IntoZBytes, encoding: Encoding? = null, attachment: IntoZBytes? = null) = + jniPublisher?.put(payload, encoding ?: this.encoding, attachment) ?: InvalidPublisherResult + + fun put(payload: String, encoding: Encoding? = null, attachment: String? = null) = + put(ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(attachment) }) /** * Performs a DELETE operation on the specified [keyExpr]. */ fun delete(attachment: IntoZBytes? = null) = jniPublisher?.delete(attachment) ?: InvalidPublisherResult + fun delete(attachment: String) = delete(ZBytes.from(attachment)) + /** * Returns `true` if the publisher is still running. */ diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt index a8503865e..5d0fed18f 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt @@ -17,6 +17,7 @@ package io.zenoh.query import io.zenoh.annotations.Unstable import io.zenoh.bytes.Encoding import io.zenoh.bytes.IntoZBytes +import io.zenoh.bytes.ZBytes import io.zenoh.exceptions.ZError import io.zenoh.handlers.Callback import io.zenoh.handlers.ChannelHandler @@ -84,6 +85,14 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v ) ?: throw ZError("Querier is not valid.") } + fun get( + channel: Channel, + parameters: Parameters? = null, + payload: String, + encoding: Encoding? = null, + attachment: String? = null + ): Result> = get(channel, parameters, ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(it) }) + /** * Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies * with the [callback] provided. @@ -114,6 +123,14 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v ) ?: throw ZError("Querier is not valid.") } + fun get( + callback: Callback, + parameters: Parameters? = null, + payload: String, + encoding: Encoding? = null, + attachment: String? = null + ): Result = get(callback, parameters, ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(it) }) + /** * Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies * with the [handler] provided. @@ -144,6 +161,14 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v ) ?: throw ZError("Querier is not valid.") } + fun get( + handler: Handler, + parameters: Parameters? = null, + payload: String, + encoding: Encoding? = null, + attachment: String? = null + ): Result = get(handler, parameters, ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(it) }) + /** * Get the [QoS.congestionControl] of the querier. */ diff --git a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Query.kt b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Query.kt index 84d4ca19e..6fa9f43bf 100644 --- a/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Query.kt +++ b/zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Query.kt @@ -79,6 +79,16 @@ class Query internal constructor( } ?: Result.failure(ZError("Query is invalid")) } + fun reply( + keyExpr: KeyExpr, + payload: String, + encoding: Encoding = Encoding.default(), + qos: QoS = QoS.default(), + timestamp: TimeStamp? = null, + attachment: String? = null + ): Result = + reply(keyExpr, ZBytes.from(payload), encoding, qos, timestamp, attachment?.let { ZBytes.from(it) }) + /** * Reply error to the remote [Query]. * @@ -96,6 +106,10 @@ class Query internal constructor( } ?: Result.failure(ZError("Query is invalid")) } + + fun replyErr(error: String, encoding: Encoding = Encoding.default()): Result = + replyErr(ZBytes.from(error), encoding) + /** * Perform a delete reply operation to the remote [Query]. * @@ -121,6 +135,13 @@ class Query internal constructor( } ?: Result.failure(ZError("Query is invalid")) } + fun replyDel( + keyExpr: KeyExpr, + qos: QoS = QoS.default(), + timestamp: TimeStamp? = null, + attachment: String, + ): Result = replyDel(keyExpr, qos, timestamp, ZBytes.from(attachment)) + override fun close() { jniQuery?.apply { this.close()