Skip to content

Commit

Permalink
feat: re-add rc to queries
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Jul 2, 2024
1 parent 0032525 commit 00efa81
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 18 deletions.
95 changes: 93 additions & 2 deletions include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,97 @@
/* elem_copy_f */ _z_##item_name##_copy, \
/* elem_drop_f */ z_##item_name##_drop)

#define _Z_CHANNEL_RC_DEFINE_IMPL(handler_type, handler_name, handler_new_f_name, callback_type, callback_new_f, \
collection_type, collection_new_f, collection_free_f, collection_push_f, \
collection_pull_f, collection_try_pull_f, elem_owned_type, elem_loaned_type, \
elem_copy_f, elem_drop_f) \
typedef struct { \
collection_type *collection; \
} handler_type; \
\
_Z_OWNED_TYPE_PTR(handler_type, handler_name) \
_Z_LOANED_TYPE(handler_type, handler_name) \
\
static inline void _z_##handler_name##_elem_free(void **elem) { \
elem_drop_f((elem_owned_type *)*elem); \
z_free(*elem); \
*elem = NULL; \
} \
static inline void _z_##handler_name##_elem_move(void *dst, void *src) { \
memcpy(dst, src, sizeof(elem_owned_type)); \
z_free(src); \
} \
static inline void _z_##handler_name##_send(const elem_loaned_type *elem, void *context) { \
elem_owned_type *internal_elem = (elem_owned_type *)z_malloc(sizeof(elem_owned_type)); \
if (internal_elem == NULL) { \
_Z_ERROR("Out of memory"); \
return; \
} \
if (elem == NULL) { \
internal_elem->_rc.in = NULL; \
} else { \
elem_copy_f(&internal_elem->_rc, elem); \
} \
int8_t ret = collection_push_f(internal_elem, context, _z_##handler_name##_elem_free); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_push_f, ret); \
} \
} \
static inline void z_##handler_name##_recv(const z_loaned_##handler_name##_t *handler, elem_owned_type *elem) { \
int8_t ret = collection_pull_f(elem, (collection_type *)handler->collection, _z_##handler_name##_elem_move); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_pull_f, ret); \
} \
} \
static inline void z_##handler_name##_try_recv(const z_loaned_##handler_name##_t *handler, \
elem_owned_type *elem) { \
int8_t ret = \
collection_try_pull_f(elem, (collection_type *)handler->collection, _z_##handler_name##_elem_move); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_try_pull_f, ret); \
} \
} \
\
static inline void _z_##handler_name##_free(handler_type **handler) { \
handler_type *ptr = *handler; \
if (ptr != NULL) { \
collection_free_f(ptr->collection, _z_##handler_name##_elem_free); \
z_free(ptr); \
*handler = NULL; \
} \
} \
static inline void _z_##handler_name##_copy(void *dst, const void *src) { \
(void)(dst); \
(void)(src); \
} \
\
_Z_OWNED_FUNCTIONS_PTR_IMPL(handler_type, handler_name, _z_##handler_name##_copy, _z_##handler_name##_free) \
\
static inline int8_t handler_new_f_name(callback_type *callback, z_owned_##handler_name##_t *handler, \
size_t capacity) { \
handler->_val = (handler_type *)z_malloc(sizeof(handler_type)); \
handler->_val->collection = collection_new_f(capacity); \
callback_new_f(callback, _z_##handler_name##_send, NULL, handler->_val->collection); \
return _Z_RES_OK; \
}

#define _Z_CHANNEL_RC_DEFINE(item_name, kind_name) \
_Z_CHANNEL_RC_DEFINE_IMPL(/* handler_type */ _z_##kind_name##_handler_##item_name##_t, \
/* handler_name */ kind_name##_handler_##item_name, \
/* handler_new_f_name */ z_##kind_name##_channel_##item_name##_new, \
/* callback_type */ z_owned_closure_##item_name##_t, \
/* callback_new_f */ z_closure_##item_name, \
/* collection_type */ _z_##kind_name##_mt_t, \
/* collection_new_f */ _z_##kind_name##_mt_new, \
/* collection_free_f */ _z_##kind_name##_mt_free, \
/* collection_push_f */ _z_##kind_name##_mt_push, \
/* collection_pull_f */ _z_##kind_name##_mt_pull, \
/* collection_try_pull_f */ _z_##kind_name##_mt_try_pull, \
/* elem_owned_type */ z_owned_##item_name##_t, \
/* elem_loaned_type */ z_loaned_##item_name##_t, \
/* elem_copy_f */ _z_##item_name##_rc_copy, \
/* elem_drop_f */ z_##item_name##_drop)

#define _Z_CHANNEL_DEFINE_DUMMY(item_name, kind_name) \
typedef struct { \
uint8_t _foo; \
Expand Down Expand Up @@ -148,12 +239,12 @@ _Z_CHANNEL_DEFINE(sample, fifo)
// This macro defines:
// z_ring_channel_query_new()
// z_owned_ring_handler_query_t/z_loaned_ring_handler_query_t
_Z_CHANNEL_DEFINE(query, ring)
_Z_CHANNEL_RC_DEFINE(query, ring)

// This macro defines:
// z_fifo_channel_query_new()
// z_owned_fifo_handler_query_t/z_loaned_fifo_handler_query_t
_Z_CHANNEL_DEFINE(query, fifo)
_Z_CHANNEL_RC_DEFINE(query, fifo)
#else // Z_FEATURE_QUERYABLE
_Z_CHANNEL_DEFINE_DUMMY(query, ring)
_Z_CHANNEL_DEFINE_DUMMY(query, fifo)
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ _Z_LOANED_TYPE(_z_queryable_t, queryable)
* Represents a Zenoh Query entity, received by Zenoh Queryable entities.
*
*/
_Z_OWNED_TYPE_PTR(_z_query_t, query)
_Z_LOANED_TYPE(_z_query_t, query)
_Z_OWNED_TYPE_RC(_z_query_rc_t, query)
_Z_LOANED_TYPE(_z_query_rc_t, query)

/**
* Represents the encoding of a payload, in a MIME-like format.
Expand Down
4 changes: 3 additions & 1 deletion include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ typedef struct _z_query_t {
_z_value_t _value;
_z_keyexpr_t _key;
uint32_t _request_id;
_z_session_t *_zn; // FIXME: Switch to session rc, Issue #476
_z_session_t *_zn; // FIXME: Potential UB source, Issue #476
_z_bytes_t attachment;
char *_parameters;
_Bool _anyke;
Expand All @@ -39,6 +39,8 @@ void _z_query_clear(_z_query_t *q);
void _z_query_copy(_z_query_t *dst, const _z_query_t *src);
void _z_query_free(_z_query_t **query);

_Z_REFCOUNT_DEFINE(_z_query, _z_query)

/**
* Return type when declaring a queryable.
*/
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ typedef struct {
} _z_publication_t;

// Forward type declaration to avoid cyclical include
typedef struct _z_query_t _z_query_t;
typedef struct _z_query_rc_t _z_query_rc_t;

/**
* The callback signature of the functions handling query messages.
*/
typedef void (*_z_queryable_handler_t)(const _z_query_t *query, void *arg);
typedef void (*_z_queryable_handler_t)(const _z_query_rc_t *query, void *arg);

typedef struct {
_z_keyexpr_t _key;
Expand Down
17 changes: 9 additions & 8 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -631,16 +631,16 @@ z_query_consolidation_t z_query_consolidation_none(void) {
z_query_consolidation_t z_query_consolidation_default(void) { return z_query_consolidation_auto(); }

void z_query_parameters(const z_loaned_query_t *query, z_view_string_t *parameters) {
parameters->_val.val = query->_parameters;
parameters->_val.len = strlen(query->_parameters);
parameters->_val.val = query->in->val._parameters;
parameters->_val.len = strlen(query->in->val._parameters);
}

const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &query->attachment; }
const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &query->in->val.attachment; }

const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &query->_key; }
const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &query->in->val._key; }

const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &query->_value.payload; }
const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) { return &query->_value.encoding; }
const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &query->in->val._value.payload; }
const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) { return &query->in->val._value.encoding; }

void z_closure_sample_call(const z_owned_closure_sample_t *closure, const z_loaned_sample_t *sample) {
if (closure->call != NULL) {
Expand Down Expand Up @@ -1130,7 +1130,7 @@ int8_t _z_queryable_drop(_z_queryable_t **queryable) {
return ret;
}

_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_query_t, query, _z_query_copy, _z_query_free)
_Z_OWNED_FUNCTIONS_RC_IMPL(query)
_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_queryable_t, queryable, _z_owner_noop_copy, _z_queryable_drop)

void z_queryable_options_default(z_queryable_options_t *options) { options->complete = _Z_QUERYABLE_COMPLETE_DEFAULT; }
Expand Down Expand Up @@ -1184,7 +1184,8 @@ int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *ke
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(payload),
.encoding = _z_encoding_from_owned(opts.encoding)};

int8_t ret = _z_send_reply(query, *keyexpr, value, Z_SAMPLE_KIND_PUT, _z_bytes_from_owned_bytes(opts.attachment));
int8_t ret =
_z_send_reply(&query->in->val, *keyexpr, value, Z_SAMPLE_KIND_PUT, _z_bytes_from_owned_bytes(opts.attachment));
if (payload != NULL) {
z_bytes_drop(payload);
}
Expand Down
6 changes: 3 additions & 3 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const
_zp_session_unlock_mutex(zn);

// Build the z_query
_z_query_t query = _z_query_null();
query = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid, attachment);
_z_query_rc_t query = _z_query_rc_new();
query.in->val = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid, attachment);
// Parse session_queryable list
_z_session_queryable_rc_list_t *xs = qles;
while (xs != NULL) {
Expand All @@ -155,7 +155,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const
xs = _z_session_queryable_rc_list_tail(xs);
}
// Clean up
_z_query_clear(&query);
_z_query_rc_drop(&query);
_z_session_queryable_rc_list_free(&qles);
} else {
_zp_session_unlock_mutex(zn);
Expand Down

0 comments on commit 00efa81

Please sign in to comment.