Skip to content

Commit

Permalink
Providing function overloads that receive String params instead of ZB…
Browse files Browse the repository at this point in the history
…ytes (#338)

Allowing users to directly pass strings instead of having to convert them into ZBytes using ZBytes.from(string) each time.

This affects:
- session.get/put/delete
- publisher.put/delete
- query.reply/replyErr/replyDel
- querier.get
  • Loading branch information
DariusIMP authored Jan 15, 2025
1 parent 434a9af commit 869dbad
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 2 deletions.
93 changes: 92 additions & 1 deletion zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,28 @@ class Session private constructor(private val config: Config) : AutoCloseable {
)
}

fun get(
selector: Selector,
callback: Callback<Reply>,
payload: String,
encoding: Encoding? = null,
attachment: String? = null,
timeout: Duration = Duration.ofMillis(10000),
target: QueryTarget = QueryTarget.BEST_MATCHING,
consolidation: ConsolidationMode = ConsolidationMode.AUTO,
onClose: (() -> Unit)? = null
): Result<Unit> = get(
selector,
callback,
ZBytes.from(payload),
encoding,
attachment?.let { ZBytes.from(it) },
timeout,
target,
consolidation,
onClose
)

/**
* Performs a Get query on the [selector], handling the replies with a [Handler].
*
Expand Down Expand Up @@ -599,6 +621,28 @@ class Session private constructor(private val config: Config) : AutoCloseable {
)
}

fun <R> get(
selector: Selector,
handler: Handler<Reply, R>,
payload: String,
encoding: Encoding? = null,
attachment: String? = null,
timeout: Duration = Duration.ofMillis(10000),
target: QueryTarget = QueryTarget.BEST_MATCHING,
consolidation: ConsolidationMode = ConsolidationMode.AUTO,
onClose: (() -> Unit)? = null
): Result<R> = get(
selector,
handler,
ZBytes.from(payload),
encoding,
attachment?.let { ZBytes.from(it) },
timeout,
target,
consolidation,
onClose
)

/**
* Performs a Get query on the [selector], handling the replies with a blocking [Channel].
*
Expand Down Expand Up @@ -663,6 +707,28 @@ class Session private constructor(private val config: Config) : AutoCloseable {
)
}

fun get(
selector: Selector,
channel: Channel<Reply>,
payload: String,
encoding: Encoding? = null,
attachment: String? = null,
timeout: Duration = Duration.ofMillis(10000),
target: QueryTarget = QueryTarget.BEST_MATCHING,
consolidation: ConsolidationMode = ConsolidationMode.AUTO,
onClose: (() -> Unit)? = null
): Result<Channel<Reply>> = get(
selector,
channel,
ZBytes.from(payload),
encoding,
attachment?.let { ZBytes.from(it) },
timeout,
target,
consolidation,
onClose
)

/**
* Declare a [Put] with the provided value on the specified key expression.
*
Expand Down Expand Up @@ -698,6 +764,16 @@ class Session private constructor(private val config: Config) : AutoCloseable {
return resolvePut(keyExpr, put)
}

fun put(
keyExpr: KeyExpr,
payload: String,
encoding: Encoding = Encoding.default(),
qos: QoS = QoS.default(),
attachment: String? = null,
reliability: Reliability = Reliability.RELIABLE
): Result<Unit> =
put(keyExpr, ZBytes.from(payload), encoding, qos, attachment?.let { ZBytes.from(it) }, reliability)

/**
* Perform a delete operation.
*
Expand Down Expand Up @@ -730,6 +806,16 @@ class Session private constructor(private val config: Config) : AutoCloseable {
return resolveDelete(keyExpr, delete)
}

fun delete(
keyExpr: KeyExpr,
qos: QoS = QoS.default(),
attachment: String,
reliability: Reliability = Reliability.RELIABLE
): Result<Unit> {
val delete = Delete(keyExpr, qos, ZBytes.from(attachment), reliability)
return resolveDelete(keyExpr, delete)
}

/**
* Obtain a [Liveliness] instance tied to this Zenoh session.
*/
Expand All @@ -749,7 +835,12 @@ class Session private constructor(private val config: Config) : AutoCloseable {
return SessionInfo(this)
}

private fun resolvePublisher(keyExpr: KeyExpr, qos: QoS, encoding: Encoding, reliability: Reliability): Result<Publisher> {
private fun resolvePublisher(
keyExpr: KeyExpr,
qos: QoS,
encoding: Encoding,
reliability: Reliability
): Result<Publisher> {
return jniSession?.run {
declarePublisher(keyExpr, qos, encoding, reliability).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,19 @@ class Publisher internal constructor(
fun priority() = qos.priority

/** Performs a PUT operation on the specified [keyExpr] with the specified [payload]. */
fun put(payload: IntoZBytes, encoding: Encoding? = null, attachment: IntoZBytes? = null) = jniPublisher?.put(payload, encoding ?: this.encoding, attachment) ?: InvalidPublisherResult
fun put(payload: IntoZBytes, encoding: Encoding? = null, attachment: IntoZBytes? = null) =
jniPublisher?.put(payload, encoding ?: this.encoding, attachment) ?: InvalidPublisherResult

fun put(payload: String, encoding: Encoding? = null, attachment: String? = null) =
put(ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(attachment) })

/**
* Performs a DELETE operation on the specified [keyExpr].
*/
fun delete(attachment: IntoZBytes? = null) = jniPublisher?.delete(attachment) ?: InvalidPublisherResult

fun delete(attachment: String) = delete(ZBytes.from(attachment))

/**
* Returns `true` if the publisher is still running.
*/
Expand Down
25 changes: 25 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.zenoh.query
import io.zenoh.annotations.Unstable
import io.zenoh.bytes.Encoding
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import io.zenoh.exceptions.ZError
import io.zenoh.handlers.Callback
import io.zenoh.handlers.ChannelHandler
Expand Down Expand Up @@ -84,6 +85,14 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v
) ?: throw ZError("Querier is not valid.")
}

fun get(
channel: Channel<Reply>,
parameters: Parameters? = null,
payload: String,
encoding: Encoding? = null,
attachment: String? = null
): Result<Channel<Reply>> = get(channel, parameters, ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(it) })

/**
* Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies
* with the [callback] provided.
Expand Down Expand Up @@ -114,6 +123,14 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v
) ?: throw ZError("Querier is not valid.")
}

fun get(
callback: Callback<Reply>,
parameters: Parameters? = null,
payload: String,
encoding: Encoding? = null,
attachment: String? = null
): Result<Unit> = get(callback, parameters, ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(it) })

/**
* Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies
* with the [handler] provided.
Expand Down Expand Up @@ -144,6 +161,14 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v
) ?: throw ZError("Querier is not valid.")
}

fun <R> get(
handler: Handler<Reply, R>,
parameters: Parameters? = null,
payload: String,
encoding: Encoding? = null,
attachment: String? = null
): Result<R> = get(handler, parameters, ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(it) })

/**
* Get the [QoS.congestionControl] of the querier.
*/
Expand Down
21 changes: 21 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Query.kt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ class Query internal constructor(
} ?: Result.failure(ZError("Query is invalid"))
}

fun reply(
keyExpr: KeyExpr,
payload: String,
encoding: Encoding = Encoding.default(),
qos: QoS = QoS.default(),
timestamp: TimeStamp? = null,
attachment: String? = null
): Result<Unit> =
reply(keyExpr, ZBytes.from(payload), encoding, qos, timestamp, attachment?.let { ZBytes.from(it) })

/**
* Reply error to the remote [Query].
*
Expand All @@ -96,6 +106,10 @@ class Query internal constructor(
} ?: Result.failure(ZError("Query is invalid"))
}


fun replyErr(error: String, encoding: Encoding = Encoding.default()): Result<Unit> =
replyErr(ZBytes.from(error), encoding)

/**
* Perform a delete reply operation to the remote [Query].
*
Expand All @@ -121,6 +135,13 @@ class Query internal constructor(
} ?: Result.failure(ZError("Query is invalid"))
}

fun replyDel(
keyExpr: KeyExpr,
qos: QoS = QoS.default(),
timestamp: TimeStamp? = null,
attachment: String,
): Result<Unit> = replyDel(keyExpr, qos, timestamp, ZBytes.from(attachment))

override fun close() {
jniQuery?.apply {
this.close()
Expand Down

0 comments on commit 869dbad

Please sign in to comment.