Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Examples updated due to new event-driven design #3

Open
wants to merge 1 commit into
base: callbacks
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions examples/bearssl_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ typedef struct {
int testCerts(br_x509_trust_anchor *anch);

/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
* @brief The function will be called on each enabled client-event (like message receiving or successful connection).
*/
static void publish_callback(void** unused, struct mqtt_response_publish *published);
static void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state);


/**
* @brief Safely closes the socket in \p ctx before \c exit.
Expand Down Expand Up @@ -163,7 +162,7 @@ int main(int argc, const char *argv[])
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, &ctx, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_init(&client, &ctx, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), MQTT_EVENT_RECEIVED, NULL, event_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
mqtt_subscribe(&client, subscribe, 0);

Expand Down Expand Up @@ -276,13 +275,23 @@ static void exit_example(int status, bearssl_context *ctx)
exit(status);
}

static void publish_callback(void** unused, struct mqtt_response_publish *published)
static void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)
{
static const char *prelim = "Received publish('";
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
printf("%s", prelim);
fwrite(published->topic_name, 1, published->topic_name_size, stdout);
printf("'): %s\n", (const char*)published->application_message);
switch (event) {
case MQTT_EVENT_RECEIVED:
{
struct mqtt_response_publish* received_msg = data->received_msg;

static const char *prelim = "Received publish('";
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
printf("%s", prelim);
fwrite(received_msg->topic_name, 1, received_msg->topic_name_size, stdout);
printf("'): %s\n", (const char*)received_msg->application_message);

} break;

default: break;
}
}

static void vblob_append(void *cc, const void *data, size_t len)
Expand Down
10 changes: 5 additions & 5 deletions examples/bio_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@


/**
* @brief The function that would be called whenever a PUBLISH is received.
* @brief The function will be called on each enabled client-event (like message receiving or successful connection).
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state);

/**
* @brief The client's refresher. This function triggers back-end routines to
Expand Down Expand Up @@ -80,7 +80,7 @@ int main(int argc, const char *argv[])
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), 0, NULL, event_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);

/* check that we don't have any errors */
Expand Down Expand Up @@ -141,7 +141,7 @@ void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)



void publish_callback(void** unused, struct mqtt_response_publish *published)
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)
{
/* not used in this example */
}
Expand All @@ -154,4 +154,4 @@ void* client_refresher(void* client)
usleep(100000U);
}
return NULL;
}
}
10 changes: 5 additions & 5 deletions examples/bio_publisher_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@


/**
* @brief The function that would be called whenever a PUBLISH is received.
* @brief The function will be called on each enabled client-event (like message receiving or successful connection).
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state);

/**
* @brief The client's refresher. This function triggers back-end routines to
Expand Down Expand Up @@ -79,7 +79,7 @@ int main(int argc, const char *argv[])
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), 0, NULL, event_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);

/* check that we don't have any errors */
Expand Down Expand Up @@ -138,7 +138,7 @@ void exit_example(int status, BIO* sockfd)



void publish_callback(void** unused, struct mqtt_response_publish *published)
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)
{
/* not used in this example */
}
Expand All @@ -150,4 +150,4 @@ void client_refresher(void* client)
mqtt_sync((struct mqtt_client*) client);
Sleep(100);
}
}
}
8 changes: 4 additions & 4 deletions examples/mbedtls_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@


/**
* @brief The function that would be called whenever a PUBLISH is received.
* @brief The function will be called on each enabled client-event (like message receiving or successful connection).
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state);

/**
* @brief The client's refresher. This function triggers back-end routines to
Expand Down Expand Up @@ -85,7 +85,7 @@ int main(int argc, const char *argv[])
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), 0, NULL, event_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);

/* check that we don't have any errors */
Expand Down Expand Up @@ -147,7 +147,7 @@ void exit_example(int status, mqtt_pal_socket_handle sockfd, pthread_t *client_d



void publish_callback(void** unused, struct mqtt_response_publish *published)
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)
{
/* not used in this example */
}
Expand Down
9 changes: 5 additions & 4 deletions examples/openssl_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@


/**
* @brief The function that would be called whenever a PUBLISH is received.
* @brief The function will be called on each enabled client-event (like message receiving or successful connection).
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state);


/**
* @brief The client's refresher. This function triggers back-end routines to
Expand Down Expand Up @@ -107,7 +108,7 @@ int main(int argc, const char *argv[])
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), 0, NULL, event_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);

/* check that we don't have any errors */
Expand Down Expand Up @@ -168,7 +169,7 @@ void exit_example(int status, BIO* sockfd, pthread_t *client_daemon)



void publish_callback(void** unused, struct mqtt_response_publish *published)
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)
{
/* not used in this example */
}
Expand Down
10 changes: 5 additions & 5 deletions examples/openssl_publisher_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@


/**
* @brief The function that would be called whenever a PUBLISH is received.
* @brief The function will be called on each enabled client-event (like message receiving or successful connection).
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state);

/**
* @brief The client's refresher. This function triggers back-end routines to
Expand Down Expand Up @@ -90,7 +90,7 @@ int main(int argc, const char *argv[])
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), 0, NULL, event_callback);
mqtt_connect(&client, "publishing_client", NULL, NULL, 0, NULL, NULL, 0, 400);

/* check that we don't have any errors */
Expand Down Expand Up @@ -149,7 +149,7 @@ void exit_example(int status, BIO* sockfd)



void publish_callback(void** unused, struct mqtt_response_publish *published)
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)
{
/* not used in this example */
}
Expand All @@ -161,4 +161,4 @@ void client_refresher(void* client)
mqtt_sync((struct mqtt_client*) client);
Sleep(100);
}
}
}
115 changes: 61 additions & 54 deletions examples/reconnect_subscriber.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* to setup the connection to the broker.
*
* An instance of this struct will be created in my \c main(). Then, whenever
* \ref reconnect_client is called, this instance will be passed.
* \ref event_callback is called on MQTT_EVENT_RECONNECT, this instance will be used.
*/
struct reconnect_state_t {
const char* hostname;
Expand All @@ -30,15 +30,9 @@ struct reconnect_state_t {


/**
* @brief My reconnect callback. It will reestablish the connection whenever
* an error occurs.
* @brief The function will be called on each enabled client-event (like message receiving or successful connection).
*/
void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr);

/**
* @brief The function will be called whenever a PUBLISH message is received.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state);

/**
* @brief The client's refresher. This function triggers back-end routines to
Expand Down Expand Up @@ -99,8 +93,8 @@ int main(int argc, const char *argv[])
struct mqtt_client client;

mqtt_init_reconnect(&client,
reconnect_client, &reconnect_state,
publish_callback
MQTT_EVENT_RECONNECT | MQTT_EVENT_RECEIVED,
&reconnect_state, event_callback
);

/* start a thread to refresh the client (handle egress and ingree client traffic) */
Expand Down Expand Up @@ -130,44 +124,69 @@ int main(int argc, const char *argv[])
exit_example(EXIT_SUCCESS, client.socketfd, &client_daemon);
}

void reconnect_client(struct mqtt_client* client, void **reconnect_state_vptr)
void event_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)
{
struct reconnect_state_t *reconnect_state = *((struct reconnect_state_t**) reconnect_state_vptr);
switch (event) {
case MQTT_EVENT_RECONNECT:
{
/* Reestablish the connection whenever an error occurs. */

struct reconnect_state_t *reconnect_state = *((struct reconnect_state_t**) user_state);

/* Close the clients socket if this isn't the initial reconnect call */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
close(client->socketfd);
}

/* Perform error handling here. */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
printf("event_callback: called on MQTT_EVENT_RECONNECT, while client was in error state \"%s\"\n",
mqtt_error_str(client->error)
);
}

/* Open a new socket. */
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}

/* Reinitialize the client. */
mqtt_reinit(client, sockfd,
reconnect_state->sendbuf, reconnect_state->sendbufsz,
reconnect_state->recvbuf, reconnect_state->recvbufsz
);

/* Close the clients socket if this isn't the initial reconnect call */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
close(client->socketfd);
}
/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);

/* Perform error handling here. */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
printf("reconnect_client: called while client was in error state \"%s\"\n",
mqtt_error_str(client->error)
);
}
/* Subscribe to the topic. */
mqtt_subscribe(client, reconnect_state->topic, 0);

/* Open a new socket. */
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
} break;

/* Reinitialize the client. */
mqtt_reinit(client, sockfd,
reconnect_state->sendbuf, reconnect_state->sendbufsz,
reconnect_state->recvbuf, reconnect_state->recvbufsz
);
case MQTT_EVENT_RECEIVED:
{
struct mqtt_response_publish* received_msg = data->received_msg;

/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*) malloc(received_msg->topic_name_size + 1);
memcpy(topic_name, received_msg->topic_name, received_msg->topic_name_size);
topic_name[received_msg->topic_name_size] = '\0';

printf("Received publish('%s'): %s\n", topic_name, (const char*) received_msg->application_message);

/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
free(topic_name);

/* Subscribe to the topic. */
mqtt_subscribe(client, reconnect_state->topic, 0);
} break;

default: break;
}
}

void exit_example(int status, int sockfd, pthread_t *client_daemon)
Expand All @@ -177,18 +196,6 @@ void exit_example(int status, int sockfd, pthread_t *client_daemon)
exit(status);
}

void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*) malloc(published->topic_name_size + 1);
memcpy(topic_name, published->topic_name, published->topic_name_size);
topic_name[published->topic_name_size] = '\0';

printf("Received publish('%s'): %s\n", topic_name, (const char*) published->application_message);

free(topic_name);
}

void* client_refresher(void* client)
{
while(1)
Expand Down
Loading