Skip to content

Commit

Permalink
Put API rework (#436)
Browse files Browse the repository at this point in the history
* feat: rework put api

* feat: update tests

* feat: update examples
  • Loading branch information
jean-roland authored Jun 21, 2024
1 parent 60ff708 commit 448695d
Show file tree
Hide file tree
Showing 31 changed files with 242 additions and 86 deletions.
7 changes: 6 additions & 1 deletion examples/arduino/z_pub.ino
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,18 @@ void loop() {
delay(1000);
char buf[256];
sprintf(buf, "[%4d] %s", idx++, VALUE);

Serial.print("Writing Data ('");
Serial.print(KEYEXPR);
Serial.print("': '");
Serial.print(buf);
Serial.println("')");

if (z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), NULL) < 0) {
// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

if (z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL) < 0) {
Serial.println("Error while publishing data");
}
}
Expand Down
7 changes: 6 additions & 1 deletion examples/espidf/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ void app_main() {
sleep(1);
sprintf(buf, "[%4d] %s", idx, VALUE);
printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf);
z_publisher_put(z_loan(pub), (const uint8_t*)buf, strlen(buf), NULL);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
}

printf("Closing Zenoh Session...");
Expand Down
6 changes: 5 additions & 1 deletion examples/freertos_plus_tcp/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,13 @@ void app_main(void) {
snprintf(buf, 256, "[%4d] %s", idx, VALUE);
printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options);
z_publisher_put(z_loan(pub), z_move(payload), &options);
}

// Clean-up
Expand Down
7 changes: 6 additions & 1 deletion examples/freertos_plus_tcp/z_pub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ void app_main(void) {
if (z_clock_elapsed_ms(&now) > 1000) {
snprintf(buf, 256, "[%4d] %s", idx, VALUE);
printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf);
z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), NULL);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
++idx;

now = z_clock_now();
Expand Down
7 changes: 6 additions & 1 deletion examples/freertos_plus_tcp/z_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ void app_main(void) {
printf("Putting Data ('%s': '%s')...\n", KEYEXPR, VALUE);
z_put_options_t options;
z_put_options_default(&options);
if (z_put(z_loan(s), z_loan(ke), (const uint8_t *)VALUE, strlen(VALUE), &options) < 0) {

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, VALUE);

if (z_put(z_loan(s), z_loan(ke), z_move(payload), &options) < 0) {
printf("Oh no! Put has failed...\n");
}

Expand Down
7 changes: 6 additions & 1 deletion examples/mbed/z_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ int main(int argc, char **argv) {
z_sleep_s(1);
sprintf(buf, "[%4d] %s", idx, VALUE);
printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf);
z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), NULL);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL);
}

printf("Closing Zenoh Session...");
Expand Down
12 changes: 10 additions & 2 deletions examples/unix/c11/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,23 @@ int main(int argc, char** argv) {
z_clock_t warmup_start = z_clock_now();
unsigned long elapsed_us = 0;
while (elapsed_us < args.warmup_ms * 1000) {
z_publisher_put(z_loan(pub), data, args.size, NULL);
// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
z_condvar_wait(&cond, &mutex);
elapsed_us = z_clock_elapsed_us(&warmup_start);
}
}
unsigned long* results = z_malloc(sizeof(unsigned long) * args.number_of_pings);
for (unsigned int i = 0; i < args.number_of_pings; i++) {
z_clock_t measure_start = z_clock_now();
z_publisher_put(z_loan(pub), data, args.size, NULL);
// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
z_condvar_wait(&cond, &mutex);
results[i] = z_clock_elapsed_us(&measure_start);
}
Expand Down
4 changes: 3 additions & 1 deletion examples/unix/c11/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ void callback(const z_loaned_sample_t* sample, void* context) {
const z_loaned_publisher_t* pub = z_loan(*(z_owned_publisher_t*)context);
z_owned_slice_t value;
z_bytes_deserialize_into_slice(z_sample_payload(sample), &value);
z_publisher_put(pub, z_slice_data(z_loan(value)), z_slice_len(z_loan(value)), NULL);
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, z_slice_data(z_loan(value)), z_slice_len(z_loan(value)));
z_publisher_put(pub, z_move(payload), NULL);
z_drop(z_move(value));
}
void drop(void* context) {
Expand Down
6 changes: 5 additions & 1 deletion examples/unix/c11/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,18 @@ int main(int argc, char **argv) {
sprintf(buf, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

// Create encoding
z_owned_encoding_t encoding;
zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, "utf8");
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
options.encoding = z_move(encoding);

z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options);
z_publisher_put(z_loan(pub), z_move(payload), &options);
}
// Clean up
z_undeclare_publisher(z_move(pub));
Expand Down
12 changes: 8 additions & 4 deletions examples/unix/c11/z_pub_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,19 @@ int main(int argc, char **argv) {
z_owned_bytes_t attachment;

// Allocate buffer
char buf[256];
char buf_ind[16];

// Publish data
printf("Press CTRL-C to quit...\n");
char buf[256];
for (int idx = 0; idx < n; ++idx) {
z_sleep_s(1);
sprintf(buf, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

// Add attachment value
sprintf(buf_ind, "%d", idx);
Expand All @@ -166,9 +172,7 @@ int main(int argc, char **argv) {
zp_bytes_serialize_from_iter(&attachment, create_attachment_iter, (void *)&ctx, kv_pairs_size(&ctx));
options.attachment = z_move(attachment);

sprintf(buf, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);
z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options);
z_publisher_put(z_loan(pub), z_move(payload), &options);
}
// Clean up
z_undeclare_publisher(z_move(pub));
Expand Down
7 changes: 6 additions & 1 deletion examples/unix/c11/z_pub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ int main(int argc, char **argv) {
sleep(1);
sprintf(buf, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);
z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), NULL);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

z_publisher_put(z_loan(pub), z_move(payload), NULL);

zp_read(z_loan(s), NULL);
zp_send_keep_alive(z_loan(s), NULL);
Expand Down
10 changes: 7 additions & 3 deletions examples/unix/c11/z_pub_thr.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ int main(int argc, char **argv) {
}
char *keyexpr = "test/thr";
size_t len = (size_t)atoi(argv[1]);
uint8_t *value = (uint8_t *)malloc(len);
uint8_t *value = (uint8_t *)z_malloc(len);
memset(value, 1, len);

// Set config
Expand Down Expand Up @@ -61,14 +61,18 @@ int main(int argc, char **argv) {

// Send packets
while (1) {
z_publisher_put(z_loan(pub), (const uint8_t *)value, len, NULL);
// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, (char *)value);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
}
// Clean up
z_undeclare_publisher(z_move(pub));
zp_stop_read_task(z_loan_mut(s));
zp_stop_lease_task(z_loan_mut(s));
z_close(z_move(s));
free(value);
z_free(value);
exit(0);
}
#else
Expand Down
6 changes: 5 additions & 1 deletion examples/unix/c11/z_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ int main(int argc, char **argv) {
return -1;
}

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, value);

// Create encoding
z_owned_encoding_t encoding;
zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL);
Expand All @@ -105,7 +109,7 @@ int main(int argc, char **argv) {
z_put_options_t options;
z_put_options_default(&options);
options.encoding = z_move(encoding);
if (z_put(z_loan(s), z_loan(ke), (const uint8_t *)value, strlen(value), &options) < 0) {
if (z_put(z_loan(s), z_loan(ke), z_move(payload), &options) < 0) {
printf("Oh no! Put has failed...\n");
}
// Clean up
Expand Down
13 changes: 11 additions & 2 deletions examples/unix/c99/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,24 @@ int main(int argc, char** argv) {
z_clock_t warmup_start = z_clock_now();
unsigned long elapsed_us = 0;
while (elapsed_us < args.warmup_ms * 1000) {
z_publisher_put(z_publisher_loan(&pub), data, args.size, NULL);
// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL);
z_condvar_wait(&cond, &mutex);
elapsed_us = z_clock_elapsed_us(&warmup_start);
}
}
unsigned long* results = z_malloc(sizeof(unsigned long) * args.number_of_pings);
for (unsigned int i = 0; i < args.number_of_pings; i++) {
z_clock_t measure_start = z_clock_now();
z_publisher_put(z_publisher_loan(&pub), data, args.size, NULL);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL);
z_condvar_wait(&cond, &mutex);
results[i] = z_clock_elapsed_us(&measure_start);
}
Expand Down
7 changes: 6 additions & 1 deletion examples/unix/c99/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ void callback(const z_loaned_sample_t* sample, void* context) {
const z_loaned_publisher_t* pub = z_publisher_loan((z_owned_publisher_t*)context);
z_owned_slice_t value;
z_bytes_deserialize_into_slice(z_sample_payload(sample), &value);
z_publisher_put(pub, z_slice_data(z_slice_loan(&value)), z_slice_len(z_slice_loan(&value)), NULL);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, z_slice_data(z_slice_loan(&value)), z_slice_len(z_slice_loan(&value)));

z_publisher_put(pub, z_bytes_move(&payload), NULL);
z_slice_drop(z_slice_move(&value));
}
void drop(void* context) {
Expand Down
6 changes: 5 additions & 1 deletion examples/unix/c99/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,18 @@ int main(int argc, char **argv) {
snprintf(buf, 256, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

// Create encoding
z_owned_encoding_t encoding;
zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL);
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
options.encoding = z_encoding_move(&encoding);

z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), &options);
z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), &options);
}
// Clean up
z_undeclare_publisher(z_publisher_move(&pub));
Expand Down
7 changes: 6 additions & 1 deletion examples/unix/c99/z_pub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ int main(int argc, char **argv) {
if (z_clock_elapsed_ms(&now) > 1000) {
snprintf(buf, 256, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);
z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), NULL);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL);
++idx;

now = z_clock_now();
Expand Down
6 changes: 5 additions & 1 deletion examples/unix/c99/z_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ int main(int argc, char **argv) {
return -1;
}

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, value);

// Create encoding
z_owned_encoding_t encoding;
zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL);
Expand All @@ -101,7 +105,7 @@ int main(int argc, char **argv) {
z_put_options_t options;
z_put_options_default(&options);
options.encoding = z_encoding_move(&encoding);
if (z_put(z_session_loan(&s), z_keyexpr_loan(&ke), (const uint8_t *)value, strlen(value), &options) < 0) {
if (z_put(z_session_loan(&s), z_keyexpr_loan(&ke), z_bytes_move(&payload), &options) < 0) {
printf("Oh no! Put has failed...\n");
}

Expand Down
13 changes: 11 additions & 2 deletions examples/windows/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,24 @@ int main(int argc, char** argv) {
z_clock_t warmup_start = z_clock_now();
unsigned long elapsed_us = 0;
while (elapsed_us < args.warmup_ms * 1000) {
z_publisher_put(z_loan(pub), data, args.size, NULL);
// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
z_condvar_wait(&cond, &mutex);
elapsed_us = z_clock_elapsed_us(&warmup_start);
}
}
unsigned long* results = z_malloc(sizeof(unsigned long) * args.number_of_pings);
for (unsigned int i = 0; i < args.number_of_pings; i++) {
z_clock_t measure_start = z_clock_now();
z_publisher_put(z_loan(pub), data, args.size, NULL);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
z_condvar_wait(&cond, &mutex);
results[i] = z_clock_elapsed_us(&measure_start);
}
Expand Down
7 changes: 6 additions & 1 deletion examples/windows/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ void callback(const z_loaned_sample_t* sample, void* context) {
const z_loaned_publisher_t* pub = z_loan(*(z_owned_publisher_t*)context);
z_owned_slice_t value;
z_bytes_deserialize_into_slice(z_sample_payload(sample), &value);
z_publisher_put(pub, z_slice_data(z_loan(value)), z_slice_len(z_loan(value)), NULL);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_slice(&payload, z_slice_data(z_loan(value)), z_slice_len(z_loan(value)));

z_publisher_put(pub, z_move(payload), NULL);
z_drop(z_move(value));
}
void drop(void* context) {
Expand Down
6 changes: 5 additions & 1 deletion examples/windows/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,18 @@ int main(int argc, char **argv) {
snprintf(buf, 256, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);

// Create payload
z_owned_bytes_t payload;
z_bytes_serialize_from_string(&payload, buf);

// Create encoding
z_owned_encoding_t encoding;
zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL);
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
options.encoding = z_move(encoding);

z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options);
z_publisher_put(z_loan(pub), z_move(payload), &options);
}

// Clean-up
Expand Down
Loading

0 comments on commit 448695d

Please sign in to comment.