Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_s3: major refactor to only upload in a single "daemon" routine #6919

Draft
wants to merge 17 commits into
base: 1.9
Choose a base branch
from

Conversation

PettitWesley
Copy link
Contributor


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

EC2 Default User and others added 10 commits February 26, 2023 22:08
… update s3 warn output messages with function s3_retry_warn()

Signed-off-by: Clay Cheng <[email protected]>
S3 now always uses sync IO. In the future, we will
switch back to async with the concept of a daemon
coroutine that is the only thing that can send data.
Either way, we don't need to mark that chunks are
actively being sent with the locked bool

A subsequent commit will refactor this code to
use locked for a similar but slightly different
purpose. Locked will mean a chunk is ready/done
and there should be no new appends to it.
Locked chunks will be added to a queue for sending.
Unlocked chunks are ready for new flushes to buffer
new data.

Signed-off-by: Wesley Pettit <[email protected]>
@PettitWesley PettitWesley marked this pull request as draft February 27, 2023 06:09
@PettitWesley PettitWesley temporarily deployed to pr February 27, 2023 06:10 — with GitHub Actions Inactive
@PettitWesley PettitWesley temporarily deployed to pr February 27, 2023 06:10 — with GitHub Actions Inactive
@PettitWesley PettitWesley temporarily deployed to pr February 27, 2023 06:10 — with GitHub Actions Inactive
@PettitWesley PettitWesley temporarily deployed to pr February 27, 2023 06:29 — with GitHub Actions Inactive
@kailunshi
Copy link

@PettitWesley hey there, will this PR help with enabling concurrency which means we can enable multiple workers?

@PettitWesley
Copy link
Contributor Author

@kailunshi No... this PR is incomplete but once complete aims simply to fix instabilities in the plugin...

Adding support for multi-workers is a much more long term thing. Unfortunately since S3 has its own internal buffer there needs to be synchronization if multiple threads of trying to send. While that could be implemented, its not high priority right now IMO. S3 is fairly performant: see our load test results here: https://github.com/aws/aws-for-fluent-bit/releases

@kailunshi
Copy link

@PettitWesley i agree it's pretty performant but we hit the bottleneck of 15K per second without gzip and less than 10K rps with compression enabled.

@PettitWesley
Copy link
Contributor Author

@kailunshi is the 15K per second there correct? Is it bytes or records? if its bytes, that doesn't match up with our load test results: https://github.com/aws/aws-for-fluent-bit/releases

If its records, how large are your records? Our load tests use a 1KB log size, so our 30 MB/s is 30,000 records per second actually...

@kailunshi
Copy link

@PettitWesley sorry by 15k i mean, 15,000 log record (all json format) per second. i randomly checked a few records, and they are around 1.5KB to 2KB.

i think the bottleneck got hit with compression turned on as i think it's running in the same CPU.

in your load test, do you enable compression?

@PettitWesley
Copy link
Contributor Author

@kailunshi we do not enable compression, its just a basic upload to S3 test: https://github.com/aws/aws-for-fluent-bit/blob/mainline/load_tests/logger/stdout_logger/log_configuration/s3.json

@kailunshi
Copy link

@PettitWesley yeah with compression, one cpu quickly gets saturated which causes the limitation for us.
btw, i believe fluentd supports multi threads so i'm trying to do fluentbit + fluentd. i can share the results.

@PettitWesley PettitWesley force-pushed the s3-fix-data-ordering-1_9 branch 2 times, most recently from 27f898b to b311a0d Compare March 27, 2023 06:00
@PettitWesley PettitWesley force-pushed the s3-fix-data-ordering-1_9 branch from b311a0d to 78b0e2f Compare March 28, 2023 04:10
@PettitWesley PettitWesley changed the title out_s3: major refactor to only upload in the timer callback out_s3: major refactor to only upload in a single "daemon" routine Mar 28, 2023
*/
cb_s3_upload(config, ctx);
}
/* this would use sync IO which we want to avoid */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that this means old data will not be sent until the first chunk is received. Maybe it's fine to use sync networking here since it's only once. We just don't want to consistantly use sync network.

If so, cb_s3_upload may need a new parameter called is_async

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I'm still debating this part myself. And testing. This is one bit which can block shutdown from happening properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just temporary while I'm testing. I need to get the flush startup chunks routine working and testing in the daemon coroutine first, then I'll add this back.

Currently the flush startup chunks routine is flushing all chunks, since it just iterates over the fstore queue...

We have to have this because otherwise S3 won't upload any old data until its sent new data, which is silly.

if (chunk) {
chunk->failures += 1;
if (chunk->failures > ctx->ins->retry_limit){
s3_retry_warn(ctx, tag, chunk->input_name, create_time, FLB_FALSE);
s3_store_file_delete(ctx, chunk);
s3_retry_warn(ctx, tag, chunk->input_name, file_first_log_time, FLB_FALSE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leonardo is the one who found the create_time issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah i see.

if (chunk) {
chunk->failures += 1;
if (chunk->failures > ctx->ins->retry_limit){
s3_retry_warn(ctx, tag, chunk->input_name, create_time, FLB_FALSE);
s3_store_file_delete(ctx, chunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after failures is greater than retry limit why is the chunk not deleted?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You're unifying all the chunk maintenance logic to send. This makes things simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also we have to remove from the upload_queue list before deleting else segfault IIRC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized we need to support the unlimited retries option here: https://docs.fluentbit.io/manual/administration/scheduling-and-retries

… coro on shutdown, send timed out chunks in daemon coro

Signed-off-by: Wesley Pettit <[email protected]>
Copy link
Contributor

@matthewfala matthewfala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to check this more...

chunk->create_time, FLB_FALSE);
if (chunk->locked == FLB_TRUE) {
/* remove from upload_queue */
if (chunk->_head.next != NULL && chunk->_head.prev != NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check is valid, because calloc is used to create s3_file

}
ctx->timer_created = FLB_TRUE;
/* Notify the event loop about our return status */
n = flb_pipe_w(pipe_fd, (void *) &val, sizeof(val));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing wrong here.
The following line doesn't affect this coroutine because flb_output_flush_prepare_destroy(out_flush); was deleted

flb_output_flush_finished(config, out_id);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly clear how the current code is working after the coroutine for it is destroyed.

  • handle_output_event
  • flb_output_flush_finished
  • flb_output_flush_destroy
  • flb_coro_destroy
  • co_delete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the coroutine is not destroyed at all, the task is just marked as completed basically.

Copy link
Contributor

@matthewfala matthewfala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code revision is looking promising! Added some concerns about the new stopping condition added.

Comment on lines +2039 to 2053
/*
* FLB engine uses a graceful cooperative shutdown model.
* If coroutines never end, the system won't stop.
* So the daemon coroutine must exit itself when the engine is in shutdown mode.
*/
while (config->is_running == FLB_TRUE) {
/* Cleanup old buffers found on startup */
flush_startup_chunks(ctx);

/* upload any ready chunks */
cb_s3_upload(config, ctx);

if (config->is_running == FLB_FALSE) {
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. Do you know if retries are not processed after the grace timeout has expired?

I see ->is_running is set in the flb_engine_shutdown function after grace timeout is triggered (as mentioned in the comment)
https://github.com/fluent/fluent-bit/blob/1.9/src/flb_engine.c

If retries can still be triggered, however, then we may have an edge case where a new chunk could be ingested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern 2

If there are failures in cb_s3_upload, then we may want this daemon thread to continue running. In that case, the while loop condition could be:

while (config->is_running == FLB_TRUE || <there is an item in the chunk buffer>)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern 3

What if there are a bunch of coroutines scheduled to run and deliver data to s3, but haven't run yet. This data will be lost.

Is there a way to identify the number of chunks on a plugin? I see we have input plugins

Maybe you could wait for the buffer space to drop to 0 indicating that all the data has been successfully enqueued into the s3 buffer. Then also check that the buffer is empty
fs_backlog_chunks_size:

size_t fs_backlog_chunks_size;

So we have 3 places where logs can exist
1 Input - This seems to stop being ingested here:

fluent-bit/src/flb_engine.c

Lines 474 to 477 in 52fe1bd

/* flb_engine_shutdown was already initiated */
if (config->is_running == FLB_FALSE) {
return 0;
}

2. As a Flush Coro that hasn't started yet or has started. - Most likely this is indicated by the fs_backlog_chunks_size field. 0, means that this data has been sent to (3.)
3. The S3 Buffer - Once processed, the data is sent to the s3 buffer. The buffer's linked list being empty would indicate that this data has been sent or dropped.

Let's make sure all data from 2 and 3 are processed (empty) before stopping the daemon coroutine, as it is needed to send out the last bits of data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the revised while condition would be:

while (config->is_running == FLB_TRUE || x->fs_backlog_chunks_size != 0 || <there is an item in the chunk buffer>)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to preserve the existing behavior which is that S3 never blocks the engine shutdown. Since S3 always buffers locally, any data it gets sent can be recovered. Adding some sort of wait would introduce new shutdown blocking behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If retries can still be triggered, however, then we may have an edge case where a new chunk could be ingested.

Its not.

Once all flush coros are done, the engine will invoke cb_s3_exit which will try once to send all pending chunks. So the data still has a chance to be sent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, if retries aren't processed, and we don't want the engine to be blocked on shutdown then terminating the daemon coroutine seems like a good idea.

Not sure if cb_s3_exit can use the async network stack.
Also it looks like flb_output_exit which invokes the cb_exit callback is called before the remaining coroutines finish. I suppose this is acceptable, because after the grace timeout, all logs that should be processed already had their chance to do so.

flb_output_exit(config);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if cb_s3_exit can use the async network stack.

I think its fine if it uses sync?

Also it looks like flb_output_exit which invokes the cb_exit callback is called before the remaining coroutines finish. I suppose this is acceptable, because after the grace timeout, all logs that should be processed already had their chance to do so.

this is not what I see in the code... I see that its asking coros to stop happens first, but there's no wait/guarantee that they have finished before cb_exit...

though in my testing before when the daemon coro didn't end, I didn't see the cb_s3_exit being run... so I think we might be misreading the code here.

If the exit callback and the daemon coro attempted to send data at the same time that could cause problems.

I;'ll look more into this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. Let me know what you find.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments make sense to me. Shutting down when grace expires, even if some remaining coroutines are queued to add data to the buffer seems reasonable. We could say that at this point in time, after the grace is expired, that the remaining data should have already been sent out.

I think that the current way of doing things keeps things simple and is effective.

cb_s3_upload(config, ctx);
}
/* this would use sync IO which we want to avoid */
// /* clean up any old buffers found on startup */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PettitWesley I keep forgetting to add this back...

@@ -1202,17 +1172,12 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
else {
s3_retry_warn(ctx, (char *) chunk->fsf->meta_buf, m_upload->input_name,
chunk->create_time, FLB_TRUE);
s3_store_file_unlock(chunk);
Copy link
Contributor Author

@PettitWesley PettitWesley May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few lines above here we call s3_decrement_index(ctx); after UploadPart failure. But its only incremented on CreateUpload, not for each part. So this is a bug.

I think I'll fix that in a separate bug fix PR

@matthewfala

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh oops. I read this wrong. We do it only if we are removing the upload, so this is correct.

@@ -1041,6 +1029,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
*/
if (chunk != NULL) {
file_first_log_time = chunk->first_log_time;
input_name = chunk->input_name;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to actually use this below

@PettitWesley
Copy link
Contributor Author

As 1.9 branch is no longer actively maintained, this PR won't be merged. For the AWS distro, I have this PR here: https://github.com/PettitWesley/fluent-bit/pull/24/files

Which is slightly different since AWS distro has diverged slightly from 1.9 branch.

/* skip metadata stream */
if (fs_stream == ctx->stream_metadata) {
continue;
}

/* on startup, we only send old chunks in this routine */
if (is_startup == FLB_TRUE && fs_stream == ctx->stream_active) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: need to double check that I didn't mess up the async flags for startup vs normal execution

@@ -24,20 +24,22 @@
#include <fluent-bit/flb_fstore.h>

struct s3_file {
int locked; /* locked chunk is busy, cannot write to it */
int locked; /* locked = no appends to this chunk */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should just remove the unlock function, to prevent accidental bugs, its not needed anymore.

@PettitWesley
Copy link
Contributor Author

This PR will not be updated and is missing the async_flags setting. Instead I am using: PettitWesley#24

@github-actions
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Aug 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants