Skip to content

Commit

Permalink
Matching subscribers automatic undeclaration
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 14, 2025
1 parent 744fa1b commit a5da5f8
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 12 deletions.
5 changes: 4 additions & 1 deletion include/zenoh-pico/net/matching.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ typedef struct _z_publisher_t _z_publisher_t;

#if Z_FEATURE_MATCHING == 1
typedef struct _z_matching_listener_t {
uint32_t _id;
uint32_t _interest_id;
_z_session_weak_t _zn;
} _z_matching_listener_t;

_z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _z_closure_matching_status_t callback);
_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id,
_z_closure_matching_status_t callback);
z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t entity_id);
z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener);
// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; }
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ typedef struct _z_session_t {
#endif

#if Z_FEATURE_INTEREST == 1
_z_matching_listener_item_intmap_t _matching_listeners;
_z_matching_listener_intmap_t _matching_listeners;
#endif

// Session interests
Expand Down
7 changes: 4 additions & 3 deletions include/zenoh-pico/session/matching.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ typedef struct _z_matching_listener_ctx_t {
} _z_matching_listener_ctx_t;

typedef struct {
uint32_t _interest_id;
uint32_t interest_id;
_z_zint_t entity_id;
_z_matching_listener_ctx_t *ctx;
} _z_matching_listener_state_t;

_Z_ELEM_DEFINE(_z_matching_listener_item, _z_matching_listener_state_t, _z_noop_size, _z_noop_clear, _z_noop_copy,
_Z_ELEM_DEFINE(_z_matching_listener, _z_matching_listener_state_t, _z_noop_size, _z_noop_clear, _z_noop_copy,
_z_noop_move)
_Z_INT_MAP_DEFINE(_z_matching_listener_item, _z_matching_listener_state_t)
_Z_INT_MAP_DEFINE(_z_matching_listener, _z_matching_listener_state_t)
#endif

#ifdef __cplusplus
Expand Down
11 changes: 10 additions & 1 deletion src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,10 @@ z_result_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher
session = _Z_RC_IN_VAL(&pub->_zn);
#endif

#if Z_FEATURE_MATCHING == 1
_z_matching_listener_entity_undeclare(session, pub->_id);
#endif

z_result_t ret =
_z_write(session, pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, pub->_congestion_control,
pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), reliability);
Expand Down Expand Up @@ -1119,10 +1123,15 @@ z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publi
z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher,
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback) {
_z_matching_listener_t listener = _z_matching_listener_declare(publisher, callback->_this._val);
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&publisher->_zn);
_z_matching_listener_t listener =
_z_matching_listener_declare(&sess_rc, &publisher->_key, publisher->_id, callback->_this._val);
_z_session_rc_drop(&sess_rc);

z_internal_closure_matching_status_null(&callback->_this);

matching_listener->_val = listener;

return _z_matching_listener_check(&listener) ? _Z_RES_OK : _Z_ERR_GENERIC;
}

Expand Down
36 changes: 32 additions & 4 deletions src/net/matching.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "zenoh-pico/api/primitives.h"
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/net/primitives.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/session/matching.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/result.h"

Expand Down Expand Up @@ -48,7 +51,8 @@ static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *ar
}
}

_z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _z_closure_matching_status_t callback) {
_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id,
_z_closure_matching_status_t callback) {
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED |
_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
_z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t));
Expand All @@ -59,18 +63,42 @@ _z_matching_listener_t _z_matching_listener_declare(const _z_publisher_t *pub, _
}
ctx->decl_id = 0;
ctx->callback = callback;
ret._interest_id = _z_add_interest(_Z_RC_IN_VAL(&pub->_zn), _z_keyexpr_alias_from_user_defined(pub->_key, true),
ret._interest_id = _z_add_interest(_Z_RC_IN_VAL(zn), _z_keyexpr_alias_from_user_defined(*key, true),
_z_matching_listener_callback, flags, (void *)ctx);
if (ret._interest_id == 0) {
z_free(ctx);
return ret;
}
ret._zn = _z_session_weak_clone(&pub->_zn);

ret._id = _z_get_entity_id(_Z_RC_IN_VAL(zn));
ret._zn = _z_session_rc_clone_as_weak(zn);

_z_matching_listener_state_t state;
state.entity_id = entity_id;
state.interest_id = ret._interest_id;
_z_matching_listener_intmap_insert(&_Z_RC_IN_VAL(zn)->_matching_listeners, ret._id, &state);

return ret;
}

z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t entity_id) {
_z_matching_listener_intmap_iterator_t iter = _z_matching_listener_intmap_iterator_make(&zn->_matching_listeners);
while (_z_matching_listener_intmap_iterator_next(&iter)) {
size_t key = _z_matching_listener_intmap_iterator_key(&iter);
_z_matching_listener_state_t *listener = _z_matching_listener_intmap_iterator_value(&iter);
if (listener->entity_id == entity_id) {
_z_matching_listener_intmap_remove(&zn->_matching_listeners, key);
_z_remove_interest(zn, listener->interest_id);
}
_z_str_intmap_iterator_next(&iter);
}
return _Z_RES_OK;
}

z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener) {
return _z_remove_interest(_Z_RC_IN_VAL(&listener->_zn), listener->_interest_id);
_z_session_t *zn = _Z_RC_IN_VAL(&listener->_zn);
_z_matching_listener_intmap_remove(&zn->_matching_listeners, listener->_id);
return _z_remove_interest(zn, listener->_interest_id);
}

void _z_matching_listener_clear(_z_matching_listener_t *listener) {
Expand Down
4 changes: 2 additions & 2 deletions src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid) {
#endif

#if Z_FEATURE_MATCHING == 1
zn->_matching_listeners = _z_matching_listener_item_intmap_make();
zn->_matching_listeners = _z_matching_listener_intmap_make();
#endif

_z_interest_init(zn);
Expand Down Expand Up @@ -148,7 +148,7 @@ void _z_session_clear(_z_session_t *zn) {
#endif

#if Z_FEATURE_MATCHING == 1
_z_matching_listener_item_intmap_clear(&zn->_matching_listeners);
_z_matching_listener_intmap_clear(&zn->_matching_listeners);
#endif

_z_flush_interest(zn);
Expand Down
3 changes: 3 additions & 0 deletions tests/memory_leak.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ def query_and_queryable(query_cmd, queryable_cmd):
print("*** Pub & sub attachment test ***")
if pub_and_sub('z_pub_attachment -n 1', 'z_sub_attachment -n 1') == 1:
EXIT_STATUS = 1
print("*** Pub & sub listener test ***")
if pub_and_sub('z_pub -n 1 -a', 'z_sub -n 1') == 1:
EXIT_STATUS = 1
# Test query and queryable examples
print("*** Query & queryable test ***")
if query_and_queryable('z_get', 'z_queryable -n 1') == 1:
Expand Down
21 changes: 21 additions & 0 deletions tests/z_collections_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,26 @@ void int_map_iterator_test(void) {
#undef TEST_MAP
}

void int_map_iterator_deletion_test(void) {
_z_str_intmap_t map;

map = _z_str_intmap_make();
_z_str_intmap_insert(&map, 10, _z_str_clone("A"));
_z_str_intmap_insert(&map, 20, _z_str_clone("B"));
_z_str_intmap_insert(&map, 30, _z_str_clone("C"));
_z_str_intmap_insert(&map, 40, _z_str_clone("D"));

_z_str_intmap_iterator_t iter = _z_str_intmap_iterator_make(&map);
for (size_t s = 4; s != 0; s--) {
assert(s == _z_str_intmap_len(&map));
_z_str_intmap_iterator_next(&iter);
size_t key = _z_str_intmap_iterator_key(&iter);
assert(strlen(_z_str_intmap_iterator_value(&iter)) == 1);
_z_str_intmap_remove(&map, key);
}
_z_str_intmap_clear(&map);
}

int main(void) {
ring_test();
ring_test_init_free();
Expand All @@ -360,4 +380,5 @@ int main(void) {
fifo_test_init_free();

int_map_iterator_test();
int_map_iterator_deletion_test();
}

0 comments on commit a5da5f8

Please sign in to comment.