diff --git a/examples/reconnect_subscriber.c b/examples/reconnect_subscriber.c index c122a83..fd9a0c8 100644 --- a/examples/reconnect_subscriber.c +++ b/examples/reconnect_subscriber.c @@ -35,6 +35,13 @@ struct reconnect_state_t { */ void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr); + +/** + * @brief My connect callback. This is called when we receive a CONNACK from + * the broker. We use it to subscribe to our topics. + */ +void connect_client(struct mqtt_client* client, void **reconnect_state_vptr); + /** * @brief The function will be called whenever a PUBLISH message is received. */ @@ -98,7 +105,7 @@ int main(int argc, const char *argv[]) /* setup a client */ struct mqtt_client client; - mqtt_init_reconnect(&client, + mqtt_init_reconnect(&client, connect_client, reconnect_client, &reconnect_state, publish_callback ); @@ -165,6 +172,11 @@ void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr) uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION; /* Send connection request to the broker. */ mqtt_connect(client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400); +} + +void connect_client(struct mqtt_client* client, void **reconnect_state_vptr) +{ + struct reconnect_state_t *reconnect_state = *((struct reconnect_state_t**) reconnect_state_vptr); /* Subscribe to the topic. */ mqtt_subscribe(client, reconnect_state->topic, 0); diff --git a/include/mqtt.h b/include/mqtt.h index 8695ab8..5a289e2 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -1192,18 +1192,26 @@ struct mqtt_client { */ enum MQTTErrors (*inspector_callback)(struct mqtt_client*); + /** + * @brief A callback that is called when the connection is established. + * + * This callback is invoked when CONNACK is received, and can be used for + * things like session configurations (i.e. subscriptions). + */ + void (*connected_callback)(struct mqtt_client*, void**); + /** * @brief A callback that is called whenever the client is in an error state. * * This callback is responsible for: application level error handling, closing - * previous sockets, and reestabilishing the connection to the broker and - * session configurations (i.e. subscriptions). + * previous sockets, and reestabilishing the connection to the broker. */ void (*reconnect_callback)(struct mqtt_client*, void**); /** * @brief A pointer to some state. A pointer to this member is passed to - * \ref mqtt_client.reconnect_callback. + * \ref mqtt_client.reconnect_callback and + * \ref mqtt_client.connected_callback. */ void* reconnect_state; @@ -1379,6 +1387,8 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, * @pre None. * * @param[in,out] client The MQTT client that will be initialized. + * @param[in] connected_callback The callback that will be called when a connection has been + * established. * @param[in] reconnect_callback The callback that will be called to connect/reconnect the * client to the broker and perform application level error handling. * @param[in] reconnect_state A pointer to some state data for your \p reconnect_callback. @@ -1396,6 +1406,7 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, * */ void mqtt_init_reconnect(struct mqtt_client *client, + void (*connected_callback)(struct mqtt_client *client, void** state), void (*reconnect_callback)(struct mqtt_client *client, void** state), void *reconnect_state, void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)); diff --git a/src/mqtt.c b/src/mqtt.c index d914c1a..e55c9cd 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -143,6 +143,7 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, client->send_offset = 0; client->inspector_callback = NULL; + client->connected_callback = NULL; client->reconnect_callback = NULL; client->reconnect_state = NULL; @@ -150,7 +151,8 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, } void mqtt_init_reconnect(struct mqtt_client *client, - void (*reconnect)(struct mqtt_client *, void**), + void (*connected_callback)(struct mqtt_client *, void**), + void (*reconnect_callback)(struct mqtt_client *, void**), void *reconnect_state, void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)) { @@ -176,7 +178,8 @@ void mqtt_init_reconnect(struct mqtt_client *client, client->send_offset = 0; client->inspector_callback = NULL; - client->reconnect_callback = reconnect; + client->connected_callback = connected_callback; + client->reconnect_callback = reconnect_callback; client->reconnect_state = reconnect_state; } @@ -719,6 +722,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) MQTT_CONTROL_PINGRESP: -> release PINGREQ */ + int has_connected = 0; switch (response.fixed_header.control_type) { case MQTT_CONTROL_CONNACK: /* release associated CONNECT */ @@ -741,6 +745,8 @@ ssize_t __mqtt_recv(struct mqtt_client *client) mqtt_recv_ret = MQTT_ERROR_CONNECTION_REFUSED; } break; + } else { + has_connected = 1; } break; case MQTT_CONTROL_PUBLISH: @@ -890,10 +896,16 @@ ssize_t __mqtt_recv(struct mqtt_client *client) client->recv_buffer.curr -= consumed; client->recv_buffer.curr_sz += (unsigned long)consumed; } + if (has_connected && client->connected_callback != NULL) { + MQTT_PAL_MUTEX_UNLOCK(&client->mutex); + client->connected_callback(client, &client->reconnect_state); + MQTT_PAL_MUTEX_LOCK(&client->mutex); + } } /* In case there was some error handling the (well formed) message, we end up here */ MQTT_PAL_MUTEX_UNLOCK(&client->mutex); + return mqtt_recv_ret; }