diff --git a/src/api/api.c b/src/api/api.c index 338d30439..124d43579 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1092,10 +1092,6 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher session = _Z_RC_IN_VAL(&pub->_zn); #endif -#if Z_FEATURE_MATCHING == 1 - _z_matching_listener_entity_undeclare(session, pub->_id); -#endif - z_result_t ret = _z_write(session, pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), reliability); diff --git a/src/net/matching.c b/src/net/matching.c index d7cd27e25..401d2f3c6 100644 --- a/src/net/matching.c +++ b/src/net/matching.c @@ -22,6 +22,7 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/session/matching.h" #include "zenoh-pico/session/resource.h" +#include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" #if Z_FEATURE_MATCHING == 1 @@ -83,6 +84,7 @@ z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t ent size_t key = _z_matching_listener_intmap_iterator_key(&iter); _z_matching_listener_state_t *listener = _z_matching_listener_intmap_iterator_value(&iter); if (listener->entity_id == entity_id) { + _Z_DEBUG("_z_matching_listener_entity_undeclare: entity=%i, listener=%i", (int)entity_id, (int)key); _z_matching_listener_intmap_remove(&zn->_matching_listeners, key); _z_remove_interest(zn, listener->interest_id); } diff --git a/src/net/primitives.c b/src/net/primitives.c index dcad5d6b9..c81b37d1b 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -22,6 +22,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/net/filtering.h" #include "zenoh-pico/net/logger.h" +#include "zenoh-pico/net/matching.h" #include "zenoh-pico/net/sample.h" #include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" @@ -180,6 +181,9 @@ z_result_t _z_undeclare_publisher(_z_publisher_t *pub) { if (pub == NULL || _Z_RC_IS_NULL(&pub->_zn)) { return _Z_ERR_ENTITY_UNKNOWN; } +#if Z_FEATURE_MATCHING == 1 + _z_matching_listener_entity_undeclare(_Z_RC_IN_VAL(&pub->_zn), pub->_id); +#endif // Clear publisher _z_write_filter_destroy(pub); _z_undeclare_resource(_Z_RC_IN_VAL(&pub->_zn), pub->_key._id); diff --git a/tests/z_api_matching_test.c b/tests/z_api_matching_test.c index ad925854f..6080530d2 100644 --- a/tests/z_api_matching_test.c +++ b/tests/z_api_matching_test.c @@ -24,11 +24,58 @@ #undef NDEBUG #include + +typedef enum { NONE, MATCH, UNMATCH, DROP } context_state_t; + typedef struct context_t { - bool match_put; - bool match_drop; + z_owned_condvar_t cv; + z_owned_mutex_t m; + context_state_t state; } context_t; +static void _context_init(context_t* c) { + z_condvar_init(&c->cv); + z_mutex_init(&c->m); + c->state = NONE; +} + +static void _context_drop(context_t* c) { + z_condvar_drop(z_condvar_move(&c->cv)); + z_mutex_drop(z_mutex_move(&c->m)); +} + +static void _context_wait(context_t* c, context_state_t state, int timeout_s) { + z_mutex_lock(z_mutex_loan_mut(&c->m)); + if (c->state != state) { + z_clock_t clock = z_clock_now(); + z_clock_advance_s(&clock, timeout_s); + printf("Waiting for state %d...\n", state); + z_result_t res = z_condvar_wait_until(z_condvar_loan_mut(&c->cv), z_mutex_loan_mut(&c->m), &clock); + if (res == Z_ETIMEDOUT) { + printf("Timeout waiting for state %d\n", state); + assert(false); + } + if (c->state != state) { + printf("Expected state %d, got %d\n", state, c->state); + assert(false); + } + } + c->state = NONE; + z_mutex_unlock(z_mutex_loan_mut(&c->m)); +} + +static void _context_notify(context_t* c, context_state_t state) { + z_mutex_lock(z_mutex_loan_mut(&c->m)); + if (c->state != NONE) { + printf("State already set %d\n", c->state); + assert(false); + } + c->state = state; + printf("State recieved %d\n", state); + z_condvar_signal(z_condvar_loan_mut(&c->cv)); + z_mutex_unlock(z_mutex_loan_mut(&c->m)); +} + #define assert_ok(x) \ { \ int ret = (int)x; \ @@ -40,18 +87,24 @@ typedef struct context_t { const char* pub_expr = "zenoh-pico/matching/test/val"; const char* sub_expr = "zenoh-pico/matching/test/*"; +// const char* sub_expr_wrong = "zenoh-pico/matching/**"; +const char* sub_expr_wrong = "zenoh-pico/matching/test_wrong/*"; void on_receive(const z_matching_status_t* s, void* context) { context_t* c = (context_t*)context; - if (s->matching) { - c->match_put = true; - } else { - c->match_drop = true; - } + _context_notify(c, s->matching ? MATCH : UNMATCH); +} + +void on_drop(void* context) { + context_t* c = (context_t*)context; + _context_notify(c, DROP); } -void test_matching_sub(void) { - printf("test_matching_sub\n"); +void test_matching_sub(bool background) { + printf("test_matching_sub: background=%d\n", background); + + context_t context = {0}; + _context_init(&context); z_owned_session_t s1, s2; z_owned_config_t c1, c2; @@ -73,31 +126,36 @@ void test_matching_sub(void) { assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL)); z_owned_closure_matching_status_t closure; - context_t context = {false, false}; - z_closure_matching_status(&closure, on_receive, NULL, (void*)(&context)); + z_closure_matching_status(&closure, on_receive, on_drop, (void*)(&context)); z_owned_matching_listener_t matching_listener; - assert_ok(z_publisher_declare_matching_listener(z_publisher_loan(&pub), &matching_listener, - z_closure_matching_status_move(&closure))); - z_sleep_s(1); - - assert(!context.match_put); - assert(!context.match_drop); + if (background) { + assert_ok(z_publisher_declare_background_matching_listener(z_publisher_loan(&pub), + z_closure_matching_status_move(&closure))); + } else { + assert_ok(z_publisher_declare_matching_listener(z_publisher_loan(&pub), &matching_listener, + z_closure_matching_status_move(&closure))); + } z_owned_subscriber_t sub; z_owned_closure_sample_t callback; z_closure_sample(&callback, NULL, NULL, NULL); assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub), z_closure_sample_move(&callback), NULL)); - z_sleep_s(5); - assert(context.match_put); + _context_wait(&context, MATCH, 10); z_subscriber_drop(z_subscriber_move(&sub)); - z_sleep_s(5); + _context_wait(&context, UNMATCH, 10); + + z_publisher_drop(z_publisher_move(&pub)); - assert(context.match_drop); + _context_wait(&context, DROP, 10); + + if (!background) { + z_matching_listener_drop(z_matching_listener_move(&matching_listener)); + } assert_ok(zp_stop_read_task(z_loan_mut(s1))); assert_ok(zp_stop_read_task(z_loan_mut(s2))); @@ -106,6 +164,22 @@ void test_matching_sub(void) { z_session_drop(z_session_move(&s1)); z_session_drop(z_session_move(&s2)); + + _context_drop(&context); +} + +static void _check_status(z_owned_publisher_t* pub, bool expected) { + z_matching_status_t status; + status.matching = !expected; + z_clock_t clock = z_clock_now(); + while (status.matching != expected && z_clock_elapsed_s(&clock) < 10) { + assert_ok(z_publisher_get_matching_status(z_publisher_loan(pub), &status)); + z_sleep_ms(100); + } + if (status.matching != expected) { + printf("Expected matching status %d, got %d\n", expected, status.matching); + assert(false); + } } void test_matching_get(void) { @@ -113,9 +187,10 @@ void test_matching_get(void) { z_owned_config_t c1, c2; z_config_default(&c1); z_config_default(&c2); - z_view_keyexpr_t k_sub, k_pub; + z_view_keyexpr_t k_sub, k_pub, k_sub_wrong; z_view_keyexpr_from_str(&k_sub, sub_expr); z_view_keyexpr_from_str(&k_pub, pub_expr); + z_view_keyexpr_from_str(&k_sub_wrong, sub_expr_wrong); assert_ok(z_open(&s1, z_config_move(&c1), NULL)); assert_ok(z_open(&s2, z_config_move(&c2), NULL)); @@ -129,19 +204,31 @@ void test_matching_get(void) { assert_ok(z_declare_publisher(z_session_loan(&s1), &pub, z_view_keyexpr_loan(&k_pub), NULL)); z_sleep_s(1); - z_matching_status_t status; - assert_ok(z_publisher_get_matching_status(z_publisher_loan(&pub), &status)); - assert(!status.matching); + _check_status(&pub, false); + + z_owned_subscriber_t sub_wrong; + z_owned_closure_sample_t callback_wrong; + z_closure_sample(&callback_wrong, NULL, NULL, NULL); + assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub_wrong, z_view_keyexpr_loan(&k_sub_wrong), + z_closure_sample_move(&callback_wrong), NULL)); + z_sleep_s(1); + + _check_status(&pub, false); z_owned_subscriber_t sub; z_owned_closure_sample_t callback; z_closure_sample(&callback, NULL, NULL, NULL); assert_ok(z_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k_sub), z_closure_sample_move(&callback), NULL)); - z_sleep_s(3); - assert_ok(z_publisher_get_matching_status(z_publisher_loan(&pub), &status)); - assert(status.matching); + _check_status(&pub, true); + + z_subscriber_drop(z_subscriber_move(&sub)); + + _check_status(&pub, false); + + z_publisher_drop(z_publisher_move(&pub)); + z_subscriber_drop(z_subscriber_move(&sub_wrong)); assert_ok(zp_stop_read_task(z_loan_mut(s1))); assert_ok(zp_stop_read_task(z_loan_mut(s2))); @@ -155,7 +242,8 @@ void test_matching_get(void) { int main(int argc, char** argv) { (void)argc; (void)argv; - test_matching_sub(); + test_matching_sub(true); + test_matching_sub(false); test_matching_get(); }