Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch sending of cached qos msg. #1000

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
138 changes: 98 additions & 40 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ static void mqtt_sock_recv(void *arg, nni_aio *aio);
static void mqtt_send_cb(void *arg);
static void mqtt_recv_cb(void *arg);
static void mqtt_timer_cb(void *arg);
static void mqtt_batch_cb(void *arg);

static int mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s);
static void mqtt_pipe_fini(void *arg);
Expand Down Expand Up @@ -77,7 +78,8 @@ struct mqtt_pipe_s {
nni_id_map recv_unack; // recv messages unacknowledged
nni_aio send_aio; // send aio to the underlying transport
nni_aio recv_aio; // recv aio to the underlying transport
nni_aio time_aio; // timer aio to resend unack msg
nni_aio time_aio; // timer aio to trigger batch_aio and ping
nni_aio batch_aio; // batch aio to resend unack msg
nni_lmq recv_messages; // recv messages queue
nni_lmq send_messages; // send messages queue
uint16_t rid; // index of resending packet id
Expand All @@ -97,6 +99,10 @@ struct mqtt_sock_s {
nni_duration retry;
nni_duration keepalive; // mqtt keepalive
nni_duration timeleft; // left time to send next ping
uint16_t batchsz; // resend qos in batchs
uint16_t batchcnt;
nni_duration batchtmo; // interval of batch sending
uint16_t lastpid;
nni_mtx mtx; // more fine grained mutual exclusion
mqtt_ctx_t master; // to which we delegate send/recv calls
mqtt_pipe_t *mqtt_pipe;
Expand Down Expand Up @@ -134,6 +140,9 @@ mqtt_sock_init(void *arg, nni_sock *sock)
s->retry = NNI_SECOND * 5;
s->keepalive = NNI_SECOND * 10; // default mqtt keepalive
s->timeleft = NNI_SECOND * 10;
s->batchcnt = 0;
s->batchsz = 8;
s->batchtmo = 10; // Interval of batch sending (ms)

nni_mtx_init(&s->mtx);
mqtt_ctx_init(&s->master, s);
Expand Down Expand Up @@ -287,7 +296,7 @@ mqtt_sock_get_next_packet_id(mqtt_sock_t *s)
do {
packet_id = nni_atomic_get(&s->next_packet_id);
/* PROTOCOL ERROR: when packet_id == 0 */
while (packet_id & 0xFFFF == 0) {
while ((packet_id & 0xFFFF) == 0) {
JaylinYu marked this conversation as resolved.
Show resolved Hide resolved
while(!nni_atomic_cas(&s->next_packet_id,
packet_id,
packet_id + 1)) {
Expand Down Expand Up @@ -325,6 +334,7 @@ mqtt_pipe_init(void *arg, nni_pipe *pipe, void *s)
nni_aio_init(&p->send_aio, mqtt_send_cb, p);
nni_aio_init(&p->recv_aio, mqtt_recv_cb, p);
nni_aio_init(&p->time_aio, mqtt_timer_cb, p);
nni_aio_init(&p->batch_aio, mqtt_batch_cb, p);
// Packet IDs are 16 bits
// We start at a random point, to minimize likelihood of
// accidental collision across restarts.
Expand Down Expand Up @@ -367,6 +377,7 @@ mqtt_pipe_fini(void *arg)
nni_aio_fini(&p->send_aio);
nni_aio_fini(&p->recv_aio);
nni_aio_fini(&p->time_aio);
nni_aio_fini(&p->batch_aio);

nni_id_map_fini(&p->sent_unack);
nni_id_map_fini(&p->recv_unack);
Expand Down Expand Up @@ -450,15 +461,20 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg)
packet_id = nni_mqtt_msg_get_packet_id(msg);
taio = nni_id_get(&p->sent_unack, packet_id);
if (taio != NULL) {
nni_plat_printf("Warning : msg %d lost due to "
"packetID duplicated!",
packet_id);
log_warn("msg %d lost due to packetID duplicated!", packet_id);
nni_aio_finish_error(taio, NNG_ECANCELED);
if ((tmsg = nni_aio_get_msg(taio)) != NULL)
wanghaEMQ marked this conversation as resolved.
Show resolved Hide resolved
nni_msg_free(tmsg);
nni_aio_set_msg(taio, NULL);
nni_id_remove(&p->sent_unack, packet_id);
}
nni_msg_set_timestamp(msg, nni_timestamp());
nni_msg_clone(msg); // clone for resend, will free when ack received
wanghaEMQ marked this conversation as resolved.
Show resolved Hide resolved
if (0 != nni_id_set(&p->sent_unack, packet_id, aio)) {
nni_plat_printf("Warning : aio caching failed");
log_warn("aio caching failed");
nni_aio_finish_error(aio, NNG_ECANCELED);
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
wanghaEMQ marked this conversation as resolved.
Show resolved Hide resolved
}
break;

Expand All @@ -483,7 +499,7 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg)
return;
}
if (nni_lmq_full(&p->send_messages)) {
log_error("rhack: pipe is busy and lmq is full\n");
log_warn("pipe is busy and lmq is full, drop a msg\n");
(void) nni_lmq_get(&p->send_messages, &tmsg);
nni_msg_free(tmsg);
}
Expand Down Expand Up @@ -535,6 +551,7 @@ mqtt_pipe_stop(void *arg)
nni_aio_stop(&p->send_aio);
nni_aio_stop(&p->recv_aio);
nni_aio_stop(&p->time_aio);
nni_aio_stop(&p->batch_aio);
}

static int
Expand All @@ -549,6 +566,7 @@ mqtt_pipe_close(void *arg)
nni_aio_close(&p->send_aio);
nni_aio_close(&p->recv_aio);
nni_aio_close(&p->time_aio);
nni_aio_close(&p->batch_aio);

#if defined(NNG_SUPP_SQLITE)
// flush to disk
Expand Down Expand Up @@ -607,6 +625,70 @@ mqtt_pipe_close(void *arg)
return 0;
}

static void
mqtt_batch_cb(void *arg)
{
mqtt_pipe_t *p = arg;
mqtt_sock_t *s = p->mqtt_sock;
nni_msg *msg = NULL;
uint16_t pid;
uint16_t ptype;
nni_aio *aio = NULL;

if (nng_aio_result(&p->batch_aio) != 0) {
s->batchcnt = 0;
log_error("Batch aio error!");
return;
}
nni_mtx_lock(&s->mtx);
if (NULL == p || nni_atomic_get_bool(&p->closed)) {
s->batchcnt = 0;
nni_mtx_unlock(&s->mtx);
return;
}

// If batchcnt > 0. Batch sending was started.
if (s->batchcnt > 0) {
if (s->batchcnt < s->batchsz) {
pid = ++ s->lastpid;
aio = nni_id_get_min(&p->sent_unack, &pid);
} else {
// This batch sending finished
s->batchcnt = 0;
nni_mtx_unlock(&s->mtx);
return;
}
} else {
// The first run of this batch
aio = nni_id_get_min(&p->sent_unack, &pid);
}
s->batchcnt ++;
if (aio != NULL && (msg = nni_aio_get_msg(aio)) != NULL) {
s->lastpid = pid;
if (nni_timestamp() - nni_msg_get_timestamp(msg) <= 10 * NNI_SECOND) {
log_debug("NO.%d id%d Time short than 10s %ld", s->batchcnt, pid, nni_timestamp() - nni_msg_get_timestamp(msg));
goto next;
}
nni_msg_clone(msg);
log_debug("NO.%d Batch sending id%d msg%p", s->batchcnt, pid, msg);
ptype = nni_mqtt_msg_get_packet_type(msg);
if (ptype == NNG_MQTT_PUBLISH)
nni_mqtt_msg_set_publish_dup(msg, true);
if (!p->busy) {
p->busy = true;
nni_aio_set_msg(&p->send_aio, msg);
nni_pipe_send(p->pipe, &p->send_aio);
} else {
nni_lmq_put(&p->send_messages, msg);
}
} else {
log_debug("NO.%d Batch sending missing", s->batchcnt);
}
next:
nni_mtx_unlock(&s->mtx);
nni_sleep_aio(s->batchtmo, &p->batch_aio);
}

// Timer callback, we use it for retransmitting.
static void
mqtt_timer_cb(void *arg)
Expand All @@ -615,7 +697,7 @@ mqtt_timer_cb(void *arg)
mqtt_sock_t *s = p->mqtt_sock;

if (nng_aio_result(&p->time_aio) != 0) {
log_info("Timer aio error!");
log_error("Timer aio error!");
return;
}
nni_mtx_lock(&s->mtx);
Expand Down Expand Up @@ -649,38 +731,12 @@ mqtt_timer_cb(void *arg)
return;
}

// start message resending
// msg = nni_id_get_min(&p->sent_unack, &pid);
// if (msg != NULL) {
// uint16_t ptype;
// ptype = nni_mqtt_msg_get_packet_type(msg);
// if (ptype == NNG_MQTT_PUBLISH) {
// nni_mqtt_msg_set_publish_dup(msg, true);
// }
// if (!p->busy) {
// p->busy = true;
// nni_msg_clone(msg);
// aio = nni_mqtt_msg_get_aio(msg);
// if (aio) {
// nni_aio_bump_count(aio,
// nni_msg_header_len(msg) +
// nni_msg_len(msg));
// nni_aio_set_msg(aio, NULL);
// }
// nni_aio_set_msg(&p->send_aio, msg);
// nni_pipe_send(p->pipe, &p->send_aio);

// nni_mtx_unlock(&s->mtx);
// nni_sleep_aio(s->retry, &p->time_aio);
// return;
// } else {
// nni_msg_clone(msg);
// nni_lmq_put(&p->send_messages, msg);
// }
// }
if (s->batchcnt == 0)
nni_aio_finish(&p->batch_aio, 0, 0); // start batch resend

#if defined(NNG_SUPP_SQLITE)
nni_msg *msg = NULL;
if (!p->busy) {
nni_msg *msg = NULL;
nni_mqtt_sqlite_option *sqlite =
mqtt_sock_get_sqlite_option(s);
if (sqlite_is_enabled(sqlite)) {
Expand Down Expand Up @@ -904,7 +960,7 @@ mqtt_recv_cb(void *arg)
}

if (rv != MQTT_SUCCESS) {
nni_plat_printf("Error in encoding CONNACK.\n");
log_error("Error in encoding CONNACK.\n");
}
conn_param_clone(p->cparam);
if ((ctx = nni_list_first(&s->recv_queue)) == NULL) {
Expand Down Expand Up @@ -939,10 +995,12 @@ mqtt_recv_cb(void *arg)
// FALLTHROUGH
case NNG_MQTT_UNSUBACK:
// we have received a UNSUBACK, successful unsubscription
packet_id = nni_mqtt_msg_get_packet_id(msg);
packet_id = nni_mqtt_msg_get_packet_id(msg);
p->rid ++;
user_aio = nni_id_get(&p->sent_unack, packet_id);
if (user_aio != NULL) {
if (nni_aio_get_msg(user_aio) != NULL)
nni_msg_free(nni_aio_get_msg(user_aio));
nni_id_remove(&p->sent_unack, packet_id);
if (packet_type == NNG_MQTT_SUBACK ||
packet_type == NNG_MQTT_UNSUBACK) {
Expand Down
Loading