Skip to content

Commit

Permalink
Improve test coverage for matching subscribers, bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 15, 2025
1 parent 713f6fe commit ef5b7a6
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 40 deletions.
4 changes: 0 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1092,10 +1092,6 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher
session = _Z_RC_IN_VAL(&pub->_zn);
#endif

#if Z_FEATURE_MATCHING == 1
_z_matching_listener_entity_undeclare(session, pub->_id);
#endif

z_result_t ret =
_z_write(session, pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, pub->_congestion_control,
pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), reliability);
Expand Down
2 changes: 2 additions & 0 deletions src/net/matching.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/session/matching.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/result.h"

#if Z_FEATURE_MATCHING == 1
Expand Down Expand Up @@ -83,6 +84,7 @@ z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t ent
size_t key = _z_matching_listener_intmap_iterator_key(&iter);
_z_matching_listener_state_t *listener = _z_matching_listener_intmap_iterator_value(&iter);
if (listener->entity_id == entity_id) {
_Z_DEBUG("_z_matching_listener_entity_undeclare: entity=%i, listener=%i", (int)entity_id, (int)key);
_z_matching_listener_intmap_remove(&zn->_matching_listeners, key);
_z_remove_interest(zn, listener->interest_id);
}
Expand Down
4 changes: 4 additions & 0 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "zenoh-pico/config.h"
#include "zenoh-pico/net/filtering.h"
#include "zenoh-pico/net/logger.h"
#include "zenoh-pico/net/matching.h"
#include "zenoh-pico/net/sample.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"
Expand Down Expand Up @@ -180,6 +181,9 @@ z_result_t _z_undeclare_publisher(_z_publisher_t *pub) {
if (pub == NULL || _Z_RC_IS_NULL(&pub->_zn)) {
return _Z_ERR_ENTITY_UNKNOWN;
}
#if Z_FEATURE_MATCHING == 1
_z_matching_listener_entity_undeclare(_Z_RC_IN_VAL(&pub->_zn), pub->_id);
#endif
// Clear publisher
_z_write_filter_destroy(pub);
_z_undeclare_resource(_Z_RC_IN_VAL(&pub->_zn), pub->_key._id);
Expand Down
159 changes: 123 additions & 36 deletions tests/z_api_matching_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,86 @@

#undef NDEBUG
#include <assert.h>

typedef enum { NONE, MATCH, UNMATCH, DROP } context_state_t;

typedef struct context_t {
bool match_put;
bool match_drop;
z_owned_condvar_t cv;
z_owned_mutex_t m;
context_state_t state;
} context_t;

#define assert_ok(x) \
{ \
int ret = (int)x; \
if (ret != Z_OK) { \
printf("%s failed: %d\n", #x, ret); \
assert(false); \
} \
static void _context_init(context_t* c) {
z_condvar_init(&c->cv);
z_mutex_init(&c->m);
c->state = NONE;
}

static void _context_drop(context_t* c) {
z_condvar_drop(z_condvar_move(&c->cv));
z_mutex_drop(z_mutex_move(&c->m));
}

static void _context_wait(context_t* c, context_state_t state, int timeout_s) {
z_mutex_lock(z_mutex_loan_mut(&c->m));
if (c->state != state) {
z_clock_t clock = z_clock_now();
z_clock_advance_s(&clock, timeout_s);
printf("Waiting for state %d...\n", state);
z_result_t res = z_condvar_wait_until(z_condvar_loan_mut(&c->cv), z_mutex_loan_mut(&c->m), &clock);
if (res == Z_ETIMEDOUT) {
fprintf(stderr, "Timeout waiting for state %d\n", state);
assert(false);
}
if (c->state != state) {
fprintf(stderr, "Expected state %d, got %d\n", state, c->state);
assert(false);
}
}
c->state = NONE;
z_mutex_unlock(z_mutex_loan_mut(&c->m));
}

static void _context_notify(context_t* c, context_state_t state) {
z_mutex_lock(z_mutex_loan_mut(&c->m));
if (c->state != NONE) {
fprintf(stderr, "State already set %d\n", c->state);
assert(false);
}
c->state = state;
fprintf(stderr, "State recieved %d\n", state);
z_condvar_signal(z_condvar_loan_mut(&c->cv));
z_mutex_unlock(z_mutex_loan_mut(&c->m));
}

#define assert_ok(x) \
{ \
int ret = (int)x; \
if (ret != Z_OK) { \
fprintf(stderr, "%s failed: %d\n", #x, ret); \
assert(false); \
} \
}

const char* pub_expr = "zenoh-pico/matching/test/val";
const char* sub_expr = "zenoh-pico/matching/test/*";
const char* sub_expr_wrong = "zenoh-pico/matching/test_wrong/*";

void on_receive(const z_matching_status_t* s, void* context) {
context_t* c = (context_t*)context;
if (s->matching) {
c->match_put = true;
} else {
c->match_drop = true;
}
_context_notify(c, s->matching ? MATCH : UNMATCH);
}

void on_drop(void* context) {
context_t* c = (context_t*)context;
_context_notify(c, DROP);
}

void test_matching_sub(void) {
printf("test_matching_sub\n");
void test_matching_sub(bool background) {
printf("test_matching_sub: background=%d\n", background);

context_t context = {0};
_context_init(&context);

z_owned_session_t s1, s2;
z_owned_config_t c1, c2;
Expand All @@ -73,31 +125,36 @@ void test_matching_sub(void) {
assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL));

z_owned_closure_matching_status_t closure;
context_t context = {false, false};
z_closure_matching_status(&closure, on_receive, NULL, (void*)(&context));
z_closure_matching_status(&closure, on_receive, on_drop, (void*)(&context));

z_owned_matching_listener_t matching_listener;
assert_ok(z_publisher_declare_matching_listener(z_publisher_loan(&pub), &matching_listener,
z_closure_matching_status_move(&closure)));
z_sleep_s(1);

assert(!context.match_put);
assert(!context.match_drop);
if (background) {
assert_ok(z_publisher_declare_background_matching_listener(z_publisher_loan(&pub),
z_closure_matching_status_move(&closure)));
} else {
assert_ok(z_publisher_declare_matching_listener(z_publisher_loan(&pub), &matching_listener,
z_closure_matching_status_move(&closure)));
}

z_owned_subscriber_t sub;
z_owned_closure_sample_t callback;
z_closure_sample(&callback, NULL, NULL, NULL);
assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub),
z_closure_sample_move(&callback), NULL));
z_sleep_s(5);

assert(context.match_put);
_context_wait(&context, MATCH, 10);

z_subscriber_drop(z_subscriber_move(&sub));

z_sleep_s(5);
_context_wait(&context, UNMATCH, 10);

z_publisher_drop(z_publisher_move(&pub));

assert(context.match_drop);
_context_wait(&context, DROP, 10);

if (!background) {
z_matching_listener_drop(z_matching_listener_move(&matching_listener));
}

assert_ok(zp_stop_read_task(z_loan_mut(s1)));
assert_ok(zp_stop_read_task(z_loan_mut(s2)));
Expand All @@ -106,16 +163,33 @@ void test_matching_sub(void) {

z_session_drop(z_session_move(&s1));
z_session_drop(z_session_move(&s2));

_context_drop(&context);
}

static void _check_status(z_owned_publisher_t* pub, bool expected) {
z_matching_status_t status;
status.matching = !expected;
z_clock_t clock = z_clock_now();
while (status.matching != expected && z_clock_elapsed_s(&clock) < 10) {
assert_ok(z_publisher_get_matching_status(z_publisher_loan(pub), &status));
z_sleep_ms(100);
}
if (status.matching != expected) {
fprintf(stderr, "Expected matching status %d, got %d\n", expected, status.matching);
assert(false);
}
}

void test_matching_get(void) {
z_owned_session_t s1, s2;
z_owned_config_t c1, c2;
z_config_default(&c1);
z_config_default(&c2);
z_view_keyexpr_t k_sub, k_pub;
z_view_keyexpr_t k_sub, k_pub, k_sub_wrong;
z_view_keyexpr_from_str(&k_sub, sub_expr);
z_view_keyexpr_from_str(&k_pub, pub_expr);
z_view_keyexpr_from_str(&k_sub_wrong, sub_expr_wrong);

assert_ok(z_open(&s1, z_config_move(&c1), NULL));
assert_ok(z_open(&s2, z_config_move(&c2), NULL));
Expand All @@ -129,19 +203,31 @@ void test_matching_get(void) {
assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL));
z_sleep_s(1);

z_matching_status_t status;
assert_ok(z_publisher_get_matching_status(z_publisher_loan(&pub), &status));
assert(!status.matching);
_check_status(&pub, false);

z_owned_subscriber_t sub_wrong;
z_owned_closure_sample_t callback_wrong;
z_closure_sample(&callback_wrong, NULL, NULL, NULL);
assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub_wrong, z_view_keyexpr_loan(&k_sub_wrong),
z_closure_sample_move(&callback_wrong), NULL));
z_sleep_s(1);

_check_status(&pub, false);

z_owned_subscriber_t sub;
z_owned_closure_sample_t callback;
z_closure_sample(&callback, NULL, NULL, NULL);
assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub),
z_closure_sample_move(&callback), NULL));
z_sleep_s(3);

assert_ok(z_publisher_get_matching_status(z_publisher_loan(&pub), &status));
assert(status.matching);
_check_status(&pub, true);

z_subscriber_drop(z_subscriber_move(&sub));

_check_status(&pub, false);

z_publisher_drop(z_publisher_move(&pub));
z_subscriber_drop(z_subscriber_move(&sub_wrong));

assert_ok(zp_stop_read_task(z_loan_mut(s1)));
assert_ok(zp_stop_read_task(z_loan_mut(s2)));
Expand All @@ -155,7 +241,8 @@ void test_matching_get(void) {
int main(int argc, char** argv) {
(void)argc;
(void)argv;
test_matching_sub();
test_matching_sub(true);
test_matching_sub(false);
test_matching_get();
}

Expand Down

0 comments on commit ef5b7a6

Please sign in to comment.