Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

espidf added flush, clean task stop , commented out crashing debug log #362

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/zenoh-pico/api/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,10 @@ template<> inline int8_t z_drop(z_owned_publisher_t* v) { return z_undeclare_pub
template<> inline void z_drop(z_owned_keyexpr_t* v) { z_keyexpr_drop(v); }
template<> inline void z_drop(z_owned_config_t* v) { z_config_drop(v); }
template<> inline void z_drop(z_owned_scouting_config_t* v) { z_scouting_config_drop(v); }
#if Z_FEATURE_SUBSCRIPTION==1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one, I'd say we should either have all the feature token in the file or none.

Did you need this for a reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to remember that otherwise it didn´t compile with Z_FEATURE_SUBSCRIPTION=0

lib/zenoh-pico/include/zenoh-pico/api/macros.h:282:72: error: 'z_undeclare_pull_subscriber' was not declared in this scope

template<> inline int8_t z_drop(z_owned_pull_subscriber_t* v) { return z_undeclare_pull_subscriber(v); }
template<> inline int8_t z_drop(z_owned_subscriber_t* v) { return z_undeclare_subscriber(v); }
#endif
template<> inline int8_t z_drop(z_owned_queryable_t* v) { return z_undeclare_queryable(v); }
template<> inline void z_drop(z_owned_reply_t* v) { z_reply_drop(v); }
template<> inline void z_drop(z_owned_hello_t* v) { z_hello_drop(v); }
Expand Down
6 changes: 3 additions & 3 deletions include/zenoh-pico/collections/refcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@

// c11 atomic variant
#define _ZP_RC_CNT_TYPE _z_atomic(unsigned int)
#define _ZP_RC_OP_INIT_CNT _z_atomic_store_explicit(&p.in->_cnt, 1, _z_memory_order_relaxed);
#define _ZP_RC_OP_INCR_CNT _z_atomic_fetch_add_explicit(&p->in->_cnt, 1, _z_memory_order_relaxed);
#define _ZP_RC_OP_DECR_AND_CMP _z_atomic_fetch_sub_explicit(&p->in->_cnt, 1, _z_memory_order_release) > 1
#define _ZP_RC_OP_INIT_CNT _z_atomic_store_explicit(&p.in->_cnt, (unsigned int)1, _z_memory_order_relaxed);
#define _ZP_RC_OP_INCR_CNT _z_atomic_fetch_add_explicit(&p->in->_cnt, (unsigned int)1, _z_memory_order_relaxed);
#define _ZP_RC_OP_DECR_AND_CMP _z_atomic_fetch_sub_explicit(&p->in->_cnt, (unsigned int)1, _z_memory_order_release) > 1
#define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire);

#else // ZENOH_C_STANDARD == 99
Expand Down
23 changes: 20 additions & 3 deletions include/zenoh-pico/system/platform/espidf.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,28 @@

#include <driver/uart.h>
#include <freertos/FreeRTOS.h>
#include <freertos/event_groups.h>
#include <freertos/task.h>

#include "zenoh-pico/config.h"

#if Z_FEATURE_MULTI_THREAD == 1
#include <pthread.h>

typedef TaskHandle_t zp_task_t;
typedef void *zp_task_attr_t; // Not used in ESP32
typedef struct {
const char *name;
UBaseType_t priority;
size_t stack_depth;
#if (configSUPPORT_STATIC_ALLOCATION == 1)
_Bool static_allocation;
StackType_t *stack_buffer;
StaticTask_t *task_buffer;
#endif /* SUPPORT_STATIC_ALLOCATION */
} zp_task_attr_t;
typedef struct {
TaskHandle_t handle;
EventGroupHandle_t join_event;
} zp_task_t;
typedef pthread_mutex_t zp_mutex_t;
typedef pthread_cond_t zp_condvar_t;
#endif // Z_FEATURE_MULTI_THREAD == 1
Expand All @@ -39,7 +52,11 @@ typedef struct {
int _fd;
#endif
#if Z_FEATURE_LINK_SERIAL == 1
uart_port_t _serial;
struct {
uart_port_t _serial;
uint8_t *before_cobs;
uint8_t *after_cobs;
};
#endif
};
} _z_sys_net_socket_t;
Expand Down
7 changes: 4 additions & 3 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint
#endif
);

#if Z_FEATURE_SUBSCRIPTION == 1
// Trigger local subscriptions
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len,
_z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority)
Expand All @@ -655,7 +656,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint
opt.attachment
#endif
);

#endif
return ret;
}

Expand Down Expand Up @@ -746,15 +747,15 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l
opt.attachment
#endif
);

#if Z_FEATURE_SUBSCRIPTION == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this because _z_trigger_local_subscriptions is replaced by a dummy function if subscription function is off.... Suposedly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above, it doesn´t compile. Feels like code was tested with this feature off.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it's definitely tested with both in CI, I recall you use C++ to compile, so maybe this is the source of our compilation diverging behavior.

// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
#endif
);

#endif
return ret;
}

Expand Down
5 changes: 4 additions & 1 deletion src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,11 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
#if Z_FEATURE_SUBSCRIPTION == 1

_Z_DEBUG("Resolving %d - %s on mapping 0x%x", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr));
// _Z_DEBUG(" %x - %s", keyexpr._id, keyexpr._suffix);
// _Z_DEBUG("Resolving %d - %s on mapping 0x%x", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't comment the debug lines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand these pose issues when receiving something without a suffix, then I suggest you something like:

    _Z_DEBUG("Resolving %d on mapping 0x%x", keyexpr._id, _z_keyexpr_mapping_id(&keyexpr));
    _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr);
    if (key._suffix != NULL) {
        _Z_DEBUG("Triggering subs for %d - %s", key._id, key._suffix);
        _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key);

#endif
_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr);
_Z_DEBUG("Triggering subs for %d - %s", key._id, key._suffix);
if (key._suffix != NULL) {
Expand Down
93 changes: 52 additions & 41 deletions src/system/espidf/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,10 @@ int8_t _z_open_serial_from_dev(_z_sys_net_socket_t *sock, char *dev, uint32_t ba
uart_set_pin(sock->_serial, txpin, rxpin, UART_PIN_NO_CHANGE, UART_PIN_NO_CHANGE);

const int uart_buffer_size = (1024 * 2);
QueueHandle_t uart_queue;
uart_driver_install(sock->_serial, uart_buffer_size, 0, 100, &uart_queue, 0);

uart_driver_install(sock->_serial, uart_buffer_size, 0, 100, NULL, 0);
uart_flush_input(sock->_serial);
sock->after_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MFS_SIZE);
sock->before_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MAX_COBS_BUF_SIZE);
return ret;
}

Expand Down Expand Up @@ -622,65 +623,75 @@ int8_t _z_listen_serial_from_dev(_z_sys_net_socket_t *sock, char *dev, uint32_t
return ret;
}

void _z_close_serial(_z_sys_net_socket_t *sock) { uart_driver_delete(sock->_serial); }
void _z_close_serial(_z_sys_net_socket_t *sock) {
printf("Closing serial %d\n", sock->_serial);
uart_wait_tx_done(sock->_serial, 1000);
uart_flush(sock->_serial);
uart_driver_delete(sock->_serial);
zp_free(sock->after_cobs);
zp_free(sock->before_cobs);
}

size_t _z_read_serial(const _z_sys_net_socket_t sock, uint8_t *ptr, size_t len) {
int8_t ret = _Z_RES_OK;

uint8_t *before_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MAX_COBS_BUF_SIZE);
size_t rb = 0;
for (size_t i = 0; i < _Z_SERIAL_MAX_COBS_BUF_SIZE; i++) {
size_t len = 0;
do {
uart_get_buffered_data_len(sock._serial, &len);
if (len < 1) {
zp_sleep_ms(10); // FIXME: Yield by sleeping.
} else {
break;
while (rb < _Z_SERIAL_MAX_COBS_BUF_SIZE) {
int r = uart_read_bytes(sock._serial, &sock.before_cobs[rb], 1, 1000);
if (r == 0) {
_Z_DEBUG("Timeout reading from serial");
if ( rb == 0 ) {
ret = _Z_ERR_GENERIC;
}
} while (1);
uart_read_bytes(sock._serial, &before_cobs[i], 1, 100);
rb = rb + (size_t)1;
if (before_cobs[i] == (uint8_t)0x00) {
break;
} else if (r == 1) {
rb = rb + (size_t)1;
if (sock.before_cobs[rb-1] == (uint8_t)0x00) {
break;
}
} else {
_Z_ERROR("Error reading from serial");
ret = _Z_ERR_GENERIC;
}
}

uint8_t *after_cobs = (uint8_t *)zp_malloc(_Z_SERIAL_MFS_SIZE);
size_t trb = _z_cobs_decode(before_cobs, rb, after_cobs);

size_t i = 0;
uint16_t payload_len = 0;
for (i = 0; i < sizeof(payload_len); i++) {
payload_len |= (after_cobs[i] << ((uint8_t)i * (uint8_t)8));
}

if (trb == (size_t)(payload_len + (uint16_t)6)) {
(void)memcpy(ptr, &after_cobs[i], payload_len);
i = i + (size_t)payload_len;

uint32_t crc = 0;
for (uint8_t j = 0; j < sizeof(crc); j++) {
crc |= (uint32_t)(after_cobs[i] << (j * (uint8_t)8));
i = i + (size_t)1;
if (ret == _Z_RES_OK) {
_Z_DEBUG("Read %u bytes from serial", rb);
size_t trb = _z_cobs_decode(sock.before_cobs, rb, sock.after_cobs);
_Z_DEBUG("Decoded %u bytes from serial", trb);
size_t i = 0;
for (i = 0; i < sizeof(payload_len); i++) {
payload_len |= (sock.after_cobs[i] << ((uint8_t)i * (uint8_t)8));
}
_Z_DEBUG("payload_len = %u <= %X %X", payload_len, sock.after_cobs[1], sock.after_cobs[0]);

if (trb == (size_t)(payload_len + (uint16_t)6)) {
(void)memcpy(ptr, &sock.after_cobs[i], payload_len);
i = i + (size_t)payload_len;

uint32_t crc = 0;
for (uint8_t j = 0; j < sizeof(crc); j++) {
crc |= (uint32_t)(sock.after_cobs[i] << (j * (uint8_t)8));
i = i + (size_t)1;
}

uint32_t c_crc = _z_crc32(ptr, payload_len);
if (c_crc != crc) {
uint32_t c_crc = _z_crc32(ptr, payload_len);
if (c_crc != crc) {
_Z_ERROR("CRC mismatch: %d != %d ", c_crc, crc);
ret = _Z_ERR_GENERIC;
}
} else {
_Z_ERROR("length mismatch => %d <> %d ", trb, payload_len + (uint16_t)6);
ret = _Z_ERR_GENERIC;
}
} else {
ret = _Z_ERR_GENERIC;
}

zp_free(before_cobs);
zp_free(after_cobs);

rb = payload_len;
if (ret != _Z_RES_OK) {
rb = SIZE_MAX;
}

_Z_DEBUG("return _z_read_serial() = %d ", rb);
return rb;
}

Expand Down
74 changes: 52 additions & 22 deletions src/system/espidf/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,48 +50,77 @@ void zp_free(void *ptr) { heap_caps_free(ptr); }
// In FreeRTOS, tasks created using xTaskCreate must end with vTaskDelete.
// A task function should __not__ simply return.
typedef struct {
void *(*_fun)(void *);
void *_arg;
void *(*fun)(void *);
void *arg;
EventGroupHandle_t join_event;
} z_task_arg;

void z_task_wrapper(z_task_arg *targ) {
targ->_fun(targ->_arg);
static void z_task_wrapper(void *arg) {
z_task_arg *targ = (z_task_arg *)arg;
targ->fun(targ->arg);
xEventGroupSetBits(targ->join_event, 1);
vTaskDelete(NULL);
zp_free(targ);
}

/*------------------ Task ------------------*/
int8_t zp_task_init(zp_task_t *task, zp_task_attr_t *attr, void *(*fun)(void *), void *arg) {
int ret = 0;
static zp_task_attr_t z_default_task_attr = {
.name = "",
.priority = configMAX_PRIORITIES / 2,
.stack_depth = 5120,
#if (configSUPPORT_STATIC_ALLOCATION == 1)
.static_allocation = false,
.stack_buffer = NULL,
.task_buffer = NULL,
#endif /* SUPPORT_STATIC_ALLOCATION */
};

/*------------------ Thread ------------------*/
int8_t zp_task_init(zp_task_t *task, zp_task_attr_t *attr, void *(*fun)(void *), void *arg) {
z_task_arg *z_arg = (z_task_arg *)zp_malloc(sizeof(z_task_arg));
if (z_arg != NULL) {
z_arg->_fun = fun;
z_arg->_arg = arg;
if (xTaskCreate((void *)z_task_wrapper, "", 5120, z_arg, configMAX_PRIORITIES / 2, task) != pdPASS) {
ret = -1;
if (z_arg == NULL) {
return -1;
}

z_arg->fun = fun;
z_arg->arg = arg;
z_arg->join_event = task->join_event = xEventGroupCreate();

if (attr == NULL) {
attr = &z_default_task_attr;
}

#if (configSUPPORT_STATIC_ALLOCATION == 1)
if (attr->static_allocation) {
task->handle = xTaskCreateStatic(z_task_wrapper, attr->name, attr->stack_depth, z_arg, attr->priority,
attr->stack_buffer, attr->task_buffer);
if (task->handle == NULL) {
return -1;
}
} else {
ret = -1;
#endif /* SUPPORT_STATIC_ALLOCATION */
if (xTaskCreate(z_task_wrapper, attr->name, attr->stack_depth, z_arg, attr->priority, &task->handle) !=
pdPASS) {
return -1;
}
#if (configSUPPORT_STATIC_ALLOCATION == 1)
}
#endif /* SUPPORT_STATIC_ALLOCATION */

return ret;
return 0;
}

int8_t zp_task_join(zp_task_t *task) {
// Note: task/thread join not supported on FreeRTOS API, so we force its deletion instead.
return zp_task_cancel(task);
xEventGroupWaitBits(task->join_event, 1, pdFALSE, pdFALSE, portMAX_DELAY);
return 0;
}

int8_t zp_task_cancel(zp_task_t *task) {
vTaskDelete(*task);
vTaskDelete(task->handle);
return 0;
}

void zp_task_free(zp_task_t **task) {
zp_task_t *ptr = *task;
zp_free(ptr);
*task = NULL;
zp_free((*task)->join_event);
zp_free(*task);
}

/*------------------ Mutex ------------------*/
Expand Down Expand Up @@ -125,7 +154,8 @@ int zp_sleep_ms(size_t time) {
// This may compound, so this approach may make sleeps longer than expected.
// This extra check tries to minimize the amount of extra time it might sleep.
while (zp_time_elapsed_ms(&start) < time) {
zp_sleep_us(1000);
//zp_sleep_us(1000);
vTaskDelay(1/portTICK_PERIOD_MS);
}

return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/transport/unicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void *_zp_unicast_read_task(void *ztu_arg) {
}
// Wrap the main buffer for to_read bytes
_z_zbuf_t zbuf = _z_zbuf_view(&ztu->_zbuf, to_read);

// Mark the session that we have received data
ztu->_received = true;

Expand Down