Skip to content

Commit

Permalink
Examples refactor (#334)
Browse files Browse the repository at this point in the history
* ZGet refactor

* refactor(examples): refactoring exmaples

- reducing nesting
- showing alternative implementations (with channel, callback and handlers)
  • Loading branch information
DariusIMP authored Jan 15, 2025
1 parent 869dbad commit 9d8744a
Show file tree
Hide file tree
Showing 14 changed files with 390 additions and 175 deletions.
11 changes: 5 additions & 6 deletions examples/src/main/kotlin/io.zenoh/ZDelete.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ class ZDelete(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
println("Deleting resources matching '$keyExpr'...")
session.delete(keyExpr)
}
val session = Zenoh.open(config).getOrThrow()
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
println("Deleting resources matching '$keyExpr'...")
session.delete(keyExpr)
}
}
}
Expand Down
100 changes: 77 additions & 23 deletions examples/src/main/kotlin/io.zenoh/ZGet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.long
import io.zenoh.bytes.ZBytes
import io.zenoh.handlers.Handler
import io.zenoh.sample.SampleKind
import io.zenoh.query.QueryTarget
import io.zenoh.query.Reply
import io.zenoh.query.Selector
import io.zenoh.query.intoSelector
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
Expand All @@ -34,32 +37,83 @@ class ZGet(private val emptyArgs: Boolean) : CliktCommand(

Zenoh.initLogFromEnvOr("error")

Zenoh.open(config).onSuccess { session ->
session.use {
selector.intoSelector().onSuccess { selector ->
session.get(selector,
channel = Channel(),
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout))
.onSuccess { channelReceiver ->
runBlocking {
for (reply in channelReceiver) {
reply.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
}
}
}
val session = Zenoh.open(config).getOrThrow()
val selector = selector.intoSelector().getOrThrow()

// Run the Get query through one of the examples below:
runChannelExample(session, selector)
// runCallbackExample(session, selector)
// runHandlerExample(session, selector)

session.close()
}

private fun runChannelExample(session: Session, selector: Selector) {
val channelReceiver = session.get(selector,
channel = Channel(),
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout)
).getOrThrow()
runBlocking {
for (reply in channelReceiver) {
reply.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
}
}
}

private fun runCallbackExample(session: Session, selector: Selector) {
session.get(selector,
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout),
callback = { reply ->
reply.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
},
).getOrThrow()
}

private fun runHandlerExample(session: Session, selector: Selector) {
// Create your own handler implementation
class ExampleHandler : Handler<Reply, Unit> {
override fun handle(t: Reply) {
t.result.onSuccess { sample ->
when (sample.kind) {
SampleKind.PUT -> println("Received ('${sample.keyExpr}': '${sample.payload}')")
SampleKind.DELETE -> println("Received (DELETE '${sample.keyExpr}')")
}
}.onFailure { error ->
println("Received (ERROR: '${error.message}')")
}
}

override fun receiver() {}
override fun onClose() {}
}

session.get(selector,
payload = payload?.let { ZBytes.from(it) },
target = target?.let { QueryTarget.valueOf(it.uppercase()) } ?: QueryTarget.BEST_MATCHING,
attachment = attachment?.let { ZBytes.from(it) },
timeout = Duration.ofMillis(timeout),
handler = ExampleHandler(), // Provide a handler instance
).getOrThrow()
}

private val selector by option(
Expand Down
67 changes: 50 additions & 17 deletions examples/src/main/kotlin/io.zenoh/ZGetLiveliness.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package io.zenoh
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.long
import io.zenoh.handlers.Handler
import io.zenoh.keyexpr.KeyExpr
import io.zenoh.keyexpr.intoKeyExpr
import io.zenoh.query.Reply
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import java.time.Duration

class ZGetLiveliness(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Sub Liveliness example"
help = "Zenoh Get Liveliness example"
) {

override fun run() {
Expand All @@ -32,24 +35,54 @@ class ZGetLiveliness(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
session.liveliness().get(keyExpr, channel = Channel(), timeout = Duration.ofMillis(timeout))
.onSuccess { channel ->
runBlocking {
for (reply in channel) {
reply.result.onSuccess {
println(">> Alive token ('${it.keyExpr}')")
}.onFailure {
println(">> Received (ERROR: '${it.message}')")
}
}
}
}
val session = Zenoh.open(config).getOrThrow()
val keyExpr = key.intoKeyExpr().getOrThrow()

runChannelExample(session, keyExpr)

session.close()
}

private fun runChannelExample(session: Session, keyExpr: KeyExpr) {
val channel =
session.liveliness().get(keyExpr, channel = Channel(), timeout = Duration.ofMillis(timeout)).getOrThrow()
runBlocking {
for (reply in channel) {
reply.result.onSuccess {
println(">> Alive token ('${it.keyExpr}')")
}.onFailure {
println(">> Received (ERROR: '${it.message}')")
}
}
}.onFailure { exception -> println(exception.message) }
}
}

private fun runCallbackExample(session: Session, keyExpr: KeyExpr) {
session.liveliness().get(keyExpr, timeout = Duration.ofMillis(timeout), callback = { reply ->
reply.result.onSuccess {
println(">> Alive token ('${it.keyExpr}')")
}.onFailure {
println(">> Received (ERROR: '${it.message}')")
}
}).getOrThrow()
}

private fun runHandlerExample(session: Session, keyExpr: KeyExpr) {
// Create your own handler implementation
class ExampleHandler : Handler<Reply, Unit> {
override fun handle(t: Reply) {
t.result.onSuccess {
println(">> Alive token ('${it.keyExpr}')")
}.onFailure {
println(">> Received (ERROR: '${it.message}')")
}
}

override fun receiver() {}
override fun onClose() {}
}

session.liveliness().get(keyExpr, timeout = Duration.ofMillis(timeout), handler = ExampleHandler()).getOrThrow()
}

private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
Expand Down
16 changes: 6 additions & 10 deletions examples/src/main/kotlin/io.zenoh/ZInfo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ class ZInfo(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
val info = session.info()
println("zid: ${info.zid().getOrThrow()}")

println("routers zid: ${info.routersZid().getOrThrow()}")

println("peers zid: ${info.peersZid().getOrThrow()}")
}
}.onFailure { exception -> println(exception.message) }
val session = Zenoh.open(config).getOrThrow()
val info = session.info()
println("zid: ${info.zid().getOrThrow()}")
println("routers zid: ${info.routersZid().getOrThrow()}")
println("peers zid: ${info.peersZid().getOrThrow()}")
session.close()
}


Expand Down
16 changes: 8 additions & 8 deletions examples/src/main/kotlin/io.zenoh/ZLiveliness.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.zenoh
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.options.*
import io.zenoh.keyexpr.intoKeyExpr
import java.util.concurrent.CountDownLatch

class ZLiveliness(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh Liveliness example"
Expand All @@ -28,14 +29,13 @@ class ZLiveliness(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
key.intoKeyExpr().onSuccess { keyExpr ->
session.liveliness().declareToken(keyExpr)
while (true) {
Thread.sleep(1000)
}
}
}.onFailure { exception -> println(exception.message) }
val session = Zenoh.open(config).getOrThrow()
val keyExpr = key.intoKeyExpr().getOrThrow()
session.liveliness().declareToken(keyExpr)

CountDownLatch(1).await() // A countdown latch is used here to block execution while the liveliness token
// is declared. Typically, this wouldn't be needed.
session.close()
}

private val configFile by option("-c", "--config", help = "A configuration file.", metavar = "config")
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/kotlin/io.zenoh/ZPong.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import java.util.concurrent.CountDownLatch
class ZPong(private val emptyArgs: Boolean) : CliktCommand(
help = "Zenoh ZPong example"
) {
val latch = CountDownLatch(1)

override fun run() {
val config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode)

Zenoh.initLogFromEnvOr("error")
val latch = CountDownLatch(1)

println("Opening session...")
val session = Zenoh.open(config).getOrThrow()
Expand Down
48 changes: 23 additions & 25 deletions examples/src/main/kotlin/io.zenoh/ZPub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,29 @@ class ZPub(private val emptyArgs: Boolean) : CliktCommand(
Zenoh.initLogFromEnvOr("error")

println("Opening session...")
Zenoh.open(config).onSuccess { session ->
session.use {
key.intoKeyExpr().onSuccess { keyExpr ->
println("Declaring publisher on '$keyExpr'...")
session.declarePublisher(keyExpr).onSuccess { pub ->
println("Press CTRL-C to quit...")
val attachment = attachment?.toByteArray()
var idx = 0
while (true) {
Thread.sleep(1000)
val payload = "[${
idx.toString().padStart(4, ' ')
}] $value"
println(
"Putting Data ('$keyExpr': '$payload')..."
)
attachment?.let {
pub.put(ZBytes.from(payload), attachment = ZBytes.from(it) )
} ?: let { pub.put(ZBytes.from(payload)) }
idx++
}
}
}
}
}.onFailure { exception -> println(exception.message) }
val session = Zenoh.open(config).getOrThrow()
val keyExpr = key.intoKeyExpr().getOrThrow()

println("Declaring publisher on '$keyExpr'...")
val publisher = session.declarePublisher(keyExpr).getOrThrow()

println("Press CTRL-C to quit...")
val attachment = attachment?.toByteArray()

var idx = 0
while (true) {
Thread.sleep(1000)
val payload = "[${
idx.toString().padStart(4, ' ')
}] $value"
println(
"Putting Data ('$keyExpr': '$payload')..."
)
attachment?.let {
publisher.put(ZBytes.from(payload), attachment = ZBytes.from(it))
} ?: let { publisher.put(ZBytes.from(payload)) }
idx++
}
}


Expand Down
41 changes: 20 additions & 21 deletions examples/src/main/kotlin/io.zenoh/ZPubThr.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,26 @@ class ZPubThr(private val emptyArgs: Boolean) : CliktCommand(
priority = priorityInput?.let { Priority.entries[it] } ?: Priority.DATA,
)

Zenoh.open(config).onSuccess {
it.use { session ->
session.declarePublisher("test/thr".intoKeyExpr().getOrThrow(), qos = qos).onSuccess { pub ->
println("Publisher declared on test/thr.")
var count: Long = 0
var start = System.currentTimeMillis()
val number = number.toLong()
println("Press CTRL-C to quit...")
while (true) {
pub.put(payload).getOrThrow()
if (statsPrint) {
if (count < number) {
count++
} else {
val throughput = count * 1000 / (System.currentTimeMillis() - start)
println("$throughput msgs/s")
count = 0
start = System.currentTimeMillis()
}
}
}
val session = Zenoh.open(config).getOrThrow()
val keyExpr = "test/thr".intoKeyExpr().getOrThrow()
val publisher = session.declarePublisher(keyExpr, qos = qos).getOrThrow()

println("Publisher declared on test/thr.")
var count: Long = 0
var start = System.currentTimeMillis()
val number = number.toLong()

println("Press CTRL-C to quit...")
while (true) {
publisher.put(payload).getOrThrow()
if (statsPrint) {
if (count < number) {
count++
} else {
val throughput = count * 1000 / (System.currentTimeMillis() - start)
println("$throughput msgs/s")
count = 0
start = System.currentTimeMillis()
}
}
}
Expand Down
Loading

0 comments on commit 9d8744a

Please sign in to comment.