Skip to content

Commit

Permalink
out_s3: daemon coroutine for uploads
Browse files Browse the repository at this point in the history
S3 is complicated, uploads happen potentially after
a timeout. And the plugin is not concurrent safe at all.
But we need to use async IO because sync IO stack is not
stable. Solution: Daemon coroutine.

A single flush coroutine, the first, will never yield
and instead will become a daemon thread that handles
all uploads with async IO.

I'd like to thank Matthew Fala for his work on
the design of this solution.

Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley committed Mar 27, 2023
1 parent 4c189fc commit b311a0d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 32 deletions.
72 changes: 40 additions & 32 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_slist.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_time_utils.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_config_map.h>
#include <fluent-bit/flb_aws_util.h>
Expand Down Expand Up @@ -955,14 +956,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT;
}

/*
* S3 must ALWAYS use sync mode
* In the timer thread we do a mk_list_foreach_safe on the queue of uploads and chunks
* Iterating over those lists is not concurrent safe. If a flush call ran at the same time
* And deleted an item from the list, this could cause a crash/corruption.
*/
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);

/* clean up any old buffers found on startup */
if (ctx->has_old_buffers == FLB_TRUE) {
flb_plg_info(ctx->ins,
Expand Down Expand Up @@ -1098,10 +1091,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,

put_object:

/*
* remove chunk from buffer list- needed for async http so that the
* same chunk won't be sent more than once
*/
ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size);
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
flb_free(payload_buf);
Expand Down Expand Up @@ -1643,7 +1632,7 @@ static void cb_s3_upload(struct flb_config *config, void *data)
int complete;
int ret;

flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload)..");
flb_plg_debug(ctx->ins, "Running upload daemon coro uploader (cb_s3_upload)..");

/* send any chunks that are ready */
mk_list_foreach_safe(head, tmp, &ctx->upload_queue) {
Expand Down Expand Up @@ -1903,7 +1892,6 @@ static void flush_init(void *out_context)
{
int ret;
struct flb_s3 *ctx = out_context;
struct flb_sched *sched;

/* clean up any old buffers found on startup */
if (ctx->has_old_buffers == FLB_TRUE) {
Expand All @@ -1923,26 +1911,41 @@ static void flush_init(void *out_context)
}
}

/*
* create a timer that will run periodically and check if uploads
* are ready for completion
* this is created once on the first flush
*/
if (ctx->timer_created == FLB_FALSE) {
flb_plg_debug(ctx->ins,
"Creating upload timer with frequency %ds",
ctx->timer_ms / 1000);
}

/*
* We need to use async IO for S3 because it is more stable
* However, S3 has unique needs. The chunk list, multipart code, etc
* all is not concurrent safe.
* Additionally, timer callbacks don't run in coroutines and
* can't using async IO.
* Solution: daemon coroutine
* The first coroutine that that flushes to S3 never ends,
* and just uploads and sleeps.
* FLB_OUTPUT_RETURN_NO_YIELD is used to send the task
* FLB_OK status.
*/
static void daemon_coroutine(struct flb_config *config, struct flb_s3 *ctx)
{
flb_plg_info(ctx->ins, "daemon coroutine starting...");

sched = flb_sched_ctx_get();
ctx->daemon_coro_started = FLB_TRUE;

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_ms, cb_s3_upload, ctx, NULL);
/* tell engine that this task did complete successfully */
FLB_OUTPUT_RETURN_NO_YIELD(FLB_OK);

if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to create upload timer");
FLB_OUTPUT_RETURN(FLB_RETRY);
}
ctx->timer_created = FLB_TRUE;
while (FLB_TRUE) {
/* upload any ready chunks */
cb_s3_upload(config, ctx);

/*
* special coroutine sleep
* Doesn't block any thread
* Puts an event on the event
* loop which will wake this coro back up
*/
flb_time_sleep(ctx->timer_ms);
flb_plg_info(ctx->ins, "daemon coroutine resumed...");
}
}

Expand Down Expand Up @@ -2059,10 +2062,15 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
/* lock chunk file so new new appends. Its ready to be sent. */
if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) {
s3_store_file_lock(upload_file);
/* sends only happen from upload timer callback which iterates over queue */
/* sends only happen from upload daemon coroutine which iterates over queue */
mk_list_add(&upload_file->_head, &ctx->upload_queue);
}

if (ctx->daemon_coro_started == FLB_FALSE) {
daemon_coroutine(config, ctx);
}

flb_plg_info(ctx->ins, "normal coro flush ending...");
FLB_OUTPUT_RETURN(FLB_OK);
}

Expand Down
2 changes: 2 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ struct flb_s3 {
flb_sds_t metadata_dir;
flb_sds_t seq_index_file;

int daemon_coro_started;

struct flb_output_instance *ins;
};

Expand Down

0 comments on commit b311a0d

Please sign in to comment.