Skip to content

Commit

Permalink
out_s3: daemon thread with code changes only in S3 plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley committed Mar 28, 2023
1 parent a68f484 commit 78b0e2f
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 36 deletions.
145 changes: 109 additions & 36 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/

#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/flb_task.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_slist.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_time_utils.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_config_map.h>
#include <fluent-bit/flb_aws_util.h>
Expand Down Expand Up @@ -955,14 +958,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT;
}

/*
* S3 must ALWAYS use sync mode
* In the timer thread we do a mk_list_foreach_safe on the queue of uploads and chunks
* Iterating over those lists is not concurrent safe. If a flush call ran at the same time
* And deleted an item from the list, this could cause a crash/corruption.
*/
ctx->s3_client->upstream->flags &= ~(FLB_IO_ASYNC);

/* clean up any old buffers found on startup */
if (ctx->has_old_buffers == FLB_TRUE) {
flb_plg_info(ctx->ins,
Expand Down Expand Up @@ -1098,10 +1093,6 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,

put_object:

/*
* remove chunk from buffer list- needed for async http so that the
* same chunk won't be sent more than once
*/
ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size);
if (ctx->compression == FLB_AWS_COMPRESS_GZIP) {
flb_free(payload_buf);
Expand Down Expand Up @@ -1643,7 +1634,7 @@ static void cb_s3_upload(struct flb_config *config, void *data)
int complete;
int ret;

flb_plg_debug(ctx->ins, "Running upload timer callback (cb_s3_upload)..");
flb_plg_debug(ctx->ins, "Running upload daemon coro uploader (cb_s3_upload)..");

/* send any chunks that are ready */
mk_list_foreach_safe(head, tmp, &ctx->upload_queue) {
Expand Down Expand Up @@ -1899,11 +1890,9 @@ static void unit_test_flush(void *out_context, struct s3_file *upload_file,
FLB_OUTPUT_RETURN(ret);
}

static void flush_init(void *out_context)
static void flush_startup_chunks(struct flb_s3 *ctx)
{
int ret;
struct flb_s3 *ctx = out_context;
struct flb_sched *sched;

/* clean up any old buffers found on startup */
if (ctx->has_old_buffers == FLB_TRUE) {
Expand All @@ -1919,30 +1908,112 @@ static void flush_init(void *out_context)
"Failed to send locally buffered data left over "
"from previous executions; will retry. Buffer=%s",
ctx->fs->root_path);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
}

}


/*
* Same as flb_output_return_do() but no coro prepare destroy
* and no coro yield.
* Using for S3 daemon thread so we can clean up the task
* But keep the coroutine.
* This is the best way to do it because task clean up is
* handled by the control thread in the engine AFAICT
* But writing to the output pipe is done from coro.
*/
static inline void flb_output_return_no_destroy(int ret)
{
struct flb_coro *coro;
int n;
int pipe_fd;
uint32_t set;
uint64_t val;
struct flb_task *task;
struct flb_output_flush *out_flush;
struct flb_output_instance *o_ins;
struct flb_out_thread_instance *th_ins = NULL;

coro = flb_coro_get();

out_flush = (struct flb_output_flush *) coro->data;
o_ins = out_flush->o_ins;
task = out_flush->task;

/*
* create a timer that will run periodically and check if uploads
* are ready for completion
* this is created once on the first flush
* To compose the signal event the relevant info is:
*
* - Unique Task events id: 2 in this case
* - Return value: FLB_OK (0), FLB_ERROR (1) or FLB_RETRY (2)
* - Task ID
* - Output Instance ID (struct flb_output_instance)->id
*
* We put together the return value with the task_id on the 32 bits at right
*/
if (ctx->timer_created == FLB_FALSE) {
flb_plg_debug(ctx->ins,
"Creating upload timer with frequency %ds",
ctx->timer_ms / 1000);
set = FLB_TASK_SET(ret, task->id, o_ins->id);
val = FLB_BITS_U64_SET(2 /* FLB_ENGINE_TASK */, set);

sched = flb_sched_ctx_get();
/*
* Set the target pipe channel: if this return code is running inside a
* thread pool worker, use the specific worker pipe/event loop to handle
* the return status, otherwise use the channel connected to the parent
* event loop.
*/
if (flb_output_is_threaded(o_ins) == FLB_TRUE) {
/* Retrieve the thread instance and prepare pipe channel */
th_ins = flb_output_thread_instance_get();
pipe_fd = th_ins->ch_thread_events[1];
}
else {
pipe_fd = out_flush->o_ins->ch_events[1];
}

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_ms, cb_s3_upload, ctx, NULL);
/* Notify the event loop about our return status */
n = flb_pipe_w(pipe_fd, (void *) &val, sizeof(val));
if (n == -1) {
flb_errno();
}
}

if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to create upload timer");
FLB_OUTPUT_RETURN(FLB_RETRY);
}
ctx->timer_created = FLB_TRUE;
/*
* We need to use async IO for S3 because it is more stable
* However, S3 has unique needs. The chunk list, multipart code, etc
* all is not concurrent safe.
* Additionally, timer callbacks don't run in coroutines and
* can't using async IO.
* Solution: daemon coroutine
* The first coroutine that that flushes to S3 never ends,
* and just uploads and sleeps.
*
* We increment the metrics counters for the chunk originally
* associated with the coroutine and decrement the task users
* in release_chunk_upstream()
*/
static void daemon_coroutine(struct flb_config *config, struct flb_s3 *ctx)
{
flb_plg_info(ctx->ins, "daemon coroutine starting...");

ctx->daemon_coro_started = FLB_TRUE;

/* tell engine that this task did complete successfully */
flb_output_return_no_destroy(FLB_OK);

while (FLB_TRUE) {
/* Cleanup old buffers found on startup */
flush_startup_chunks(ctx);

/* upload any ready chunks */
cb_s3_upload(config, ctx);

/*
* special coroutine sleep
* Doesn't block any thread
* Puts an event on the event
* loop which will wake this coro back up
*/
flb_time_sleep(ctx->timer_ms);
flb_plg_info(ctx->ins, "daemon coroutine resumed...");
}
}

Expand All @@ -1966,9 +2037,6 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
struct flb_time tms;
time_t file_first_log_time = 0;

/* Cleanup old buffers and initialize upload timer */
flush_init(ctx);

/* Process chunk */
if (ctx->log_key) {
chunk = flb_pack_msgpack_extract_log_key(ctx,
Expand Down Expand Up @@ -2059,10 +2127,15 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
/* lock chunk file so new new appends. Its ready to be sent. */
if ((upload_file != NULL) && (upload_timeout_check == FLB_TRUE || total_file_size_check == FLB_TRUE)) {
s3_store_file_lock(upload_file);
/* sends only happen from upload timer callback which iterates over queue */
/* sends only happen from upload daemon coroutine which iterates over queue */
mk_list_add(&upload_file->_head, &ctx->upload_queue);
}

if (ctx->daemon_coro_started == FLB_FALSE) {
daemon_coroutine(config, ctx);
}

flb_plg_info(ctx->ins, "normal coro flush ending...");
FLB_OUTPUT_RETURN(FLB_OK);
}

Expand Down
2 changes: 2 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ struct flb_s3 {
flb_sds_t metadata_dir;
flb_sds_t seq_index_file;

int daemon_coro_started;

struct flb_output_instance *ins;
};

Expand Down

0 comments on commit 78b0e2f

Please sign in to comment.