-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathdemo_mqtt.cpp
104 lines (90 loc) · 3.19 KB
/
demo_mqtt.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
* Copyright (c) 2020-2021 Arm Limited
* SPDX-License-Identifier: Apache-2.0
*/
#if !MBED_CONF_AWS_CLIENT_SHADOW
#include "mbed.h"
#include "mbed-trace/mbed_trace.h"
#include "rtos/ThisThread.h"
#include "AWSClient/AWSClient.h"
extern "C" {
#include "core_json.h"
}
#define TRACE_GROUP "Main"
static bool reply_received = false;
// Callback when a MQTT message has been added to the topic
void on_message_callback(
const char *topic,
uint16_t topic_length,
const void *payload,
size_t payload_length)
{
char *json_value;
size_t value_length;
auto ret = JSON_Search((char *)payload, payload_length, "sender", strlen("sender"), &json_value, &value_length);
if (ret == JSONSuccess && (strncmp(json_value, "device", strlen("device")) == 0)) {
tr_info("Message sent successfully");
} else {
ret = JSON_Search((char *)payload, payload_length, "message", strlen("message"), &json_value, &value_length);
if (ret == JSONSuccess) {
reply_received = true;
tr_info("Message received from the cloud: \"%.*s\"", value_length, json_value);
} else {
tr_error("Failed to extract message from the payload: \"%.*s\"", payload_length, (const char *) payload);
}
}
}
void demo()
{
AWSClient &client = AWSClient::getInstance();
// Subscribe to the topic
const char topic[] = MBED_CONF_APP_AWS_MQTT_TOPIC;
int ret = client.subscribe(topic, strlen(topic));
if (ret != MBED_SUCCESS) {
tr_error("AWSClient::subscribe() failed");
return;
}
// Send ten message to the cloud (one per second)
// Stop when we receive a cloud-to-device message
char payload[128];
for (int i = 0; i < 10; i++) {
if (reply_received) {
// If we have received a message from the cloud, don't send more messeges
break;
}
// The MQTT protocol does not distinguish between senders,
// so we add a "sender" attribute to the payload
const char base_message[] = "messages left to send, or until we receive a reply";
sprintf(payload, "{\n"
" \"sender\": \"device\",\n"
" \"message\": \"%d %s\"\n"
"}",
10 - i, base_message);
tr_info("Publishing \"%d %s\" to topic \"%s\"", 10 - i, base_message, topic);
ret = client.publish(
topic,
strlen(topic),
payload,
strlen(payload)
);
if (ret != MBED_SUCCESS) {
tr_error("AWSClient::publish() failed");
goto unsubscribe;
}
rtos::ThisThread::sleep_for(1s);
}
// If the user didn't manage to send a cloud-to-device message earlier,
// let's wait until we receive one
while (!reply_received) {
// Continue to receive messages in the communication thread
// which is internally created and maintained by the Azure SDK.
sleep();
}
unsubscribe:
// Unsubscribe from the topic
ret = client.unsubscribe(topic, strlen(topic));
if (ret != MBED_SUCCESS) {
tr_error("AWSClient::unsubscribe() failed");
}
}
#endif // !MBED_CONF_AWS_CLIENT_SHADOW