Skip to content

Commit

Permalink
Providing function overloads that receive String params instead of ZB…
Browse files Browse the repository at this point in the history
…ytes

Allowing users to directly pass strings instead of having to convert them into ZBytes using ZBytes.from(string) each time.

This affects:
- session.get/put/delete
- publisher.put/delete
- query.reply/replyErr/replyDel
- querier.get
  • Loading branch information
DariusIMP committed Jan 9, 2025
1 parent 64593b7 commit e85975f
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 2 deletions.
93 changes: 92 additions & 1 deletion zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,28 @@ class Session private constructor(private val config: Config) : AutoCloseable {
)
}

fun get(
selector: Selector,
callback: Callback<Reply>,
payload: String,
encoding: Encoding? = null,
attachment: String? = null,
timeout: Duration = Duration.ofMillis(10000),
target: QueryTarget = QueryTarget.BEST_MATCHING,
consolidation: ConsolidationMode = ConsolidationMode.AUTO,
onClose: (() -> Unit)? = null
): Result<Unit> = get(
selector,
callback,
ZBytes.from(payload),
encoding,
attachment?.let { ZBytes.from(it) },
timeout,
target,
consolidation,
onClose
)

/**
* Performs a Get query on the [selector], handling the replies with a [Handler].
*
Expand Down Expand Up @@ -599,6 +621,28 @@ class Session private constructor(private val config: Config) : AutoCloseable {
)
}

fun <R> get(
selector: Selector,
handler: Handler<Reply, R>,
payload: String,
encoding: Encoding? = null,
attachment: String? = null,
timeout: Duration = Duration.ofMillis(10000),
target: QueryTarget = QueryTarget.BEST_MATCHING,
consolidation: ConsolidationMode = ConsolidationMode.AUTO,
onClose: (() -> Unit)? = null
): Result<R> = get(
selector,
handler,
ZBytes.from(payload),
encoding,
attachment?.let { ZBytes.from(it) },
timeout,
target,
consolidation,
onClose
)

/**
* Performs a Get query on the [selector], handling the replies with a blocking [Channel].
*
Expand Down Expand Up @@ -663,6 +707,28 @@ class Session private constructor(private val config: Config) : AutoCloseable {
)
}

fun get(
selector: Selector,
channel: Channel<Reply>,
payload: String,
encoding: Encoding? = null,
attachment: String? = null,
timeout: Duration = Duration.ofMillis(10000),
target: QueryTarget = QueryTarget.BEST_MATCHING,
consolidation: ConsolidationMode = ConsolidationMode.AUTO,
onClose: (() -> Unit)? = null
): Result<Channel<Reply>> = get(
selector,
channel,
ZBytes.from(payload),
encoding,
attachment?.let { ZBytes.from(it) },
timeout,
target,
consolidation,
onClose
)

/**
* Declare a [Put] with the provided value on the specified key expression.
*
Expand Down Expand Up @@ -698,6 +764,16 @@ class Session private constructor(private val config: Config) : AutoCloseable {
return resolvePut(keyExpr, put)
}

fun put(
keyExpr: KeyExpr,
payload: String,
encoding: Encoding = Encoding.default(),
qos: QoS = QoS.default(),
attachment: String? = null,
reliability: Reliability = Reliability.RELIABLE
): Result<Unit> =
put(keyExpr, ZBytes.from(payload), encoding, qos, attachment?.let { ZBytes.from(it) }, reliability)

/**
* Perform a delete operation.
*
Expand Down Expand Up @@ -730,6 +806,16 @@ class Session private constructor(private val config: Config) : AutoCloseable {
return resolveDelete(keyExpr, delete)
}

fun delete(
keyExpr: KeyExpr,
qos: QoS = QoS.default(),
attachment: String,
reliability: Reliability = Reliability.RELIABLE
): Result<Unit> {
val delete = Delete(keyExpr, qos, ZBytes.from(attachment), reliability)
return resolveDelete(keyExpr, delete)
}

/**
* Obtain a [Liveliness] instance tied to this Zenoh session.
*/
Expand All @@ -749,7 +835,12 @@ class Session private constructor(private val config: Config) : AutoCloseable {
return SessionInfo(this)
}

private fun resolvePublisher(keyExpr: KeyExpr, qos: QoS, encoding: Encoding, reliability: Reliability): Result<Publisher> {
private fun resolvePublisher(
keyExpr: KeyExpr,
qos: QoS,
encoding: Encoding,
reliability: Reliability
): Result<Publisher> {
return jniSession?.run {
declarePublisher(keyExpr, qos, encoding, reliability).onSuccess { declarations.add(it) }
} ?: Result.failure(sessionClosedException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,19 @@ class Publisher internal constructor(
fun priority() = qos.priority

/** Performs a PUT operation on the specified [keyExpr] with the specified [payload]. */
fun put(payload: IntoZBytes, encoding: Encoding? = null, attachment: IntoZBytes? = null) = jniPublisher?.put(payload, encoding ?: this.encoding, attachment) ?: InvalidPublisherResult
fun put(payload: IntoZBytes, encoding: Encoding? = null, attachment: IntoZBytes? = null) =
jniPublisher?.put(payload, encoding ?: this.encoding, attachment) ?: InvalidPublisherResult

fun put(payload: String, encoding: Encoding? = null, attachment: String? = null) =
put(ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(attachment) })

/**
* Performs a DELETE operation on the specified [keyExpr].
*/
fun delete(attachment: IntoZBytes? = null) = jniPublisher?.delete(attachment) ?: InvalidPublisherResult

fun delete(attachment: String) = delete(ZBytes.from(attachment))

/**
* Returns `true` if the publisher is still running.
*/
Expand Down
25 changes: 25 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Querier.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.zenoh.query
import io.zenoh.annotations.Unstable
import io.zenoh.bytes.Encoding
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import io.zenoh.exceptions.ZError
import io.zenoh.handlers.Callback
import io.zenoh.handlers.ChannelHandler
Expand Down Expand Up @@ -84,6 +85,14 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v
) ?: throw ZError("Querier is not valid.")
}

fun get(
channel: Channel<Reply>,
parameters: Parameters? = null,
payload: String,
encoding: Encoding? = null,
attachment: String? = null
): Result<Channel<Reply>> = get(channel, parameters, ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(it) })

/**
* Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies
* with the [callback] provided.
Expand Down Expand Up @@ -114,6 +123,14 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v
) ?: throw ZError("Querier is not valid.")
}

fun get(
callback: Callback<Reply>,
parameters: Parameters? = null,
payload: String,
encoding: Encoding? = null,
attachment: String? = null
): Result<Unit> = get(callback, parameters, ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(it) })

/**
* Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies
* with the [handler] provided.
Expand Down Expand Up @@ -144,6 +161,14 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v
) ?: throw ZError("Querier is not valid.")
}

fun <R> get(
handler: Handler<Reply, R>,
parameters: Parameters? = null,
payload: String,
encoding: Encoding? = null,
attachment: String? = null
): Result<R> = get(handler, parameters, ZBytes.from(payload), encoding, attachment?.let { ZBytes.from(it) })

/**
* Get the [QoS.congestionControl] of the querier.
*/
Expand Down
21 changes: 21 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/query/Query.kt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ class Query internal constructor(
} ?: Result.failure(ZError("Query is invalid"))
}

fun reply(
keyExpr: KeyExpr,
payload: String,
encoding: Encoding = Encoding.default(),
qos: QoS = QoS.default(),
timestamp: TimeStamp? = null,
attachment: String? = null
): Result<Unit> =
reply(keyExpr, ZBytes.from(payload), encoding, qos, timestamp, attachment?.let { ZBytes.from(it) })

/**
* Reply error to the remote [Query].
*
Expand All @@ -96,6 +106,10 @@ class Query internal constructor(
} ?: Result.failure(ZError("Query is invalid"))
}


fun replyErr(error: String, encoding: Encoding = Encoding.default()): Result<Unit> =
replyErr(ZBytes.from(error), encoding)

/**
* Perform a delete reply operation to the remote [Query].
*
Expand All @@ -121,6 +135,13 @@ class Query internal constructor(
} ?: Result.failure(ZError("Query is invalid"))
}

fun replyDel(
keyExpr: KeyExpr,
qos: QoS = QoS.default(),
timestamp: TimeStamp? = null,
attachment: String,
): Result<Unit> = replyDel(keyExpr, qos, timestamp, ZBytes.from(attachment))

override fun close() {
jniQuery?.apply {
this.close()
Expand Down

0 comments on commit e85975f

Please sign in to comment.