From a5da5f84ebecf204960df990bfde77e4eef5a372 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 14 Jan 2025 14:21:37 +0100 Subject: [PATCH] Matching subscribers automatic undeclaration --- include/zenoh-pico/net/matching.h | 5 +++- include/zenoh-pico/net/session.h | 2 +- include/zenoh-pico/session/matching.h | 7 +++--- src/api/api.c | 11 +++++++- src/net/matching.c | 36 ++++++++++++++++++++++++--- src/session/utils.c | 4 +-- tests/memory_leak.py | 3 +++ tests/z_collections_test.c | 21 ++++++++++++++++ 8 files changed, 77 insertions(+), 12 deletions(-) diff --git a/include/zenoh-pico/net/matching.h b/include/zenoh-pico/net/matching.h index 7c3579c5f..00e330d4a 100644 --- a/include/zenoh-pico/net/matching.h +++ b/include/zenoh-pico/net/matching.h @@ -25,11 +25,14 @@ typedef struct _z_publisher_t _z_publisher_t; #if Z_FEATURE_MATCHING == 1 typedef struct _z_matching_listener_t { + uint32_t _id; uint32_t _interest_id; _z_session_weak_t _zn; } _z_matching_listener_t; -_z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _z_closure_matching_status_t callback); +_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id, + _z_closure_matching_status_t callback); +z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t entity_id); z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener); // Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes. static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; } diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 4f899ccdc..1bc988e07 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -95,7 +95,7 @@ typedef struct _z_session_t { #endif #if Z_FEATURE_INTEREST == 1 - _z_matching_listener_item_intmap_t _matching_listeners; + _z_matching_listener_intmap_t _matching_listeners; #endif // Session interests diff --git a/include/zenoh-pico/session/matching.h b/include/zenoh-pico/session/matching.h index 190e26ff5..143fb13f1 100644 --- a/include/zenoh-pico/session/matching.h +++ b/include/zenoh-pico/session/matching.h @@ -42,13 +42,14 @@ typedef struct _z_matching_listener_ctx_t { } _z_matching_listener_ctx_t; typedef struct { - uint32_t _interest_id; + uint32_t interest_id; + _z_zint_t entity_id; _z_matching_listener_ctx_t *ctx; } _z_matching_listener_state_t; -_Z_ELEM_DEFINE(_z_matching_listener_item, _z_matching_listener_state_t, _z_noop_size, _z_noop_clear, _z_noop_copy, +_Z_ELEM_DEFINE(_z_matching_listener, _z_matching_listener_state_t, _z_noop_size, _z_noop_clear, _z_noop_copy, _z_noop_move) -_Z_INT_MAP_DEFINE(_z_matching_listener_item, _z_matching_listener_state_t) +_Z_INT_MAP_DEFINE(_z_matching_listener, _z_matching_listener_state_t) #endif #ifdef __cplusplus diff --git a/src/api/api.c b/src/api/api.c index adb18a94f..338d30439 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1092,6 +1092,10 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher session = _Z_RC_IN_VAL(&pub->_zn); #endif +#if Z_FEATURE_MATCHING == 1 + _z_matching_listener_entity_undeclare(session, pub->_id); +#endif + z_result_t ret = _z_write(session, pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), reliability); @@ -1119,10 +1123,15 @@ z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publi z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher, z_owned_matching_listener_t *matching_listener, z_moved_closure_matching_status_t *callback) { - _z_matching_listener_t listener = _z_matching_listener_declare(publisher, callback->_this._val); + _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&publisher->_zn); + _z_matching_listener_t listener = + _z_matching_listener_declare(&sess_rc, &publisher->_key, publisher->_id, callback->_this._val); + _z_session_rc_drop(&sess_rc); z_internal_closure_matching_status_null(&callback->_this); + matching_listener->_val = listener; + return _z_matching_listener_check(&listener) ? _Z_RES_OK : _Z_ERR_GENERIC; } diff --git a/src/net/matching.c b/src/net/matching.c index 012f6256c..c74811e97 100644 --- a/src/net/matching.c +++ b/src/net/matching.c @@ -19,6 +19,9 @@ #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/api/types.h" #include "zenoh-pico/net/primitives.h" +#include "zenoh-pico/net/session.h" +#include "zenoh-pico/session/matching.h" +#include "zenoh-pico/session/resource.h" #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" @@ -48,7 +51,8 @@ static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *ar } } -_z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _z_closure_matching_status_t callback) { +_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id, + _z_closure_matching_status_t callback) { uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; _z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t)); @@ -59,18 +63,42 @@ _z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _ } ctx->decl_id = 0; ctx->callback = callback; - ret._interest_id = _z_add_interest(_Z_RC_IN_VAL(&pub->_zn), _z_keyexpr_alias_from_user_defined(pub->_key, true), + ret._interest_id = _z_add_interest(_Z_RC_IN_VAL(zn), _z_keyexpr_alias_from_user_defined(*key, true), _z_matching_listener_callback, flags, (void *)ctx); if (ret._interest_id == 0) { z_free(ctx); return ret; } - ret._zn = _z_session_weak_clone(&pub->_zn); + + ret._id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); + ret._zn = _z_session_rc_clone_as_weak(zn); + + _z_matching_listener_state_t state; + state.entity_id = entity_id; + state.interest_id = ret._interest_id; + _z_matching_listener_intmap_insert(&_Z_RC_IN_VAL(zn)->_matching_listeners, ret._id, &state); + return ret; } +z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t entity_id) { + _z_matching_listener_intmap_iterator_t iter = _z_matching_listener_intmap_iterator_make(&zn->_matching_listeners); + while (_z_matching_listener_intmap_iterator_next(&iter)) { + size_t key = _z_matching_listener_intmap_iterator_key(&iter); + _z_matching_listener_state_t *listener = _z_matching_listener_intmap_iterator_value(&iter); + if (listener->entity_id == entity_id) { + _z_matching_listener_intmap_remove(&zn->_matching_listeners, key); + _z_remove_interest(zn, listener->interest_id); + } + _z_str_intmap_iterator_next(&iter); + } + return _Z_RES_OK; +} + z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener) { - return _z_remove_interest(_Z_RC_IN_VAL(&listener->_zn), listener->_interest_id); + _z_session_t *zn = _Z_RC_IN_VAL(&listener->_zn); + _z_matching_listener_intmap_remove(&zn->_matching_listeners, listener->_id); + return _z_remove_interest(zn, listener->_interest_id); } void _z_matching_listener_clear(_z_matching_listener_t *listener) { diff --git a/src/session/utils.c b/src/session/utils.c index c40941973..5767cd410 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -97,7 +97,7 @@ z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid) { #endif #if Z_FEATURE_MATCHING == 1 - zn->_matching_listeners = _z_matching_listener_item_intmap_make(); + zn->_matching_listeners = _z_matching_listener_intmap_make(); #endif _z_interest_init(zn); @@ -148,7 +148,7 @@ void _z_session_clear(_z_session_t *zn) { #endif #if Z_FEATURE_MATCHING == 1 - _z_matching_listener_item_intmap_clear(&zn->_matching_listeners); + _z_matching_listener_intmap_clear(&zn->_matching_listeners); #endif _z_flush_interest(zn); diff --git a/tests/memory_leak.py b/tests/memory_leak.py index bc37bbf2f..89e727f0a 100644 --- a/tests/memory_leak.py +++ b/tests/memory_leak.py @@ -167,6 +167,9 @@ def query_and_queryable(query_cmd, queryable_cmd): print("*** Pub & sub attachment test ***") if pub_and_sub('z_pub_attachment -n 1', 'z_sub_attachment -n 1') == 1: EXIT_STATUS = 1 + print("*** Pub & sub listener test ***") + if pub_and_sub('z_pub -n 1 -a', 'z_sub -n 1') == 1: + EXIT_STATUS = 1 # Test query and queryable examples print("*** Query & queryable test ***") if query_and_queryable('z_get', 'z_queryable -n 1') == 1: diff --git a/tests/z_collections_test.c b/tests/z_collections_test.c index 0fe9bab17..69183050b 100644 --- a/tests/z_collections_test.c +++ b/tests/z_collections_test.c @@ -351,6 +351,26 @@ void int_map_iterator_test(void) { #undef TEST_MAP } +void int_map_iterator_deletion_test(void) { + _z_str_intmap_t map; + + map = _z_str_intmap_make(); + _z_str_intmap_insert(&map, 10, _z_str_clone("A")); + _z_str_intmap_insert(&map, 20, _z_str_clone("B")); + _z_str_intmap_insert(&map, 30, _z_str_clone("C")); + _z_str_intmap_insert(&map, 40, _z_str_clone("D")); + + _z_str_intmap_iterator_t iter = _z_str_intmap_iterator_make(&map); + for (size_t s = 4; s != 0; s--) { + assert(s == _z_str_intmap_len(&map)); + _z_str_intmap_iterator_next(&iter); + size_t key = _z_str_intmap_iterator_key(&iter); + assert(strlen(_z_str_intmap_iterator_value(&iter)) == 1); + _z_str_intmap_remove(&map, key); + } + _z_str_intmap_clear(&map); +} + int main(void) { ring_test(); ring_test_init_free(); @@ -360,4 +380,5 @@ int main(void) { fifo_test_init_free(); int_map_iterator_test(); + int_map_iterator_deletion_test(); }