Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support placeholder for s3_bucket #360

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions lib/fluent/plugin/out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,17 @@ def start

s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(client: s3_client)
@bucket = @s3.bucket(@s3_bucket)

check_apikeys if @check_apikey_on_start
ensure_bucket if @check_bucket
ensure_bucket_lifecycle
@s3_bucket_template = @s3_bucket

unless @s3_bucket_template.match?(CHUNK_TAG_PLACEHOLDER_PATTERN)
@bucket = @s3.bucket(@s3_bucket)
check_apikeys if @check_apikey_on_start
ensure_bucket if @check_bucket
ensure_bucket_lifecycle
@dynamic_bucket = false
else
@dynamic_bucket = true
end

super
end
Expand All @@ -264,6 +270,12 @@ def format(tag, time, record)
end

def write(chunk)
if @dynamic_bucket
@s3_bucket = extract_placeholders(@s3_bucket_template, chunk)
@bucket = @s3.bucket(@s3_bucket)
ensure_bucket if @check_bucket
ensure_bucket_lifecycle
end
i = 0
metadata = chunk.metadata
previous_path = nil
Expand Down
40 changes: 40 additions & 0 deletions test/test_out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,22 @@ def test_configure_with_grant
assert_equal "id='3456789012'", d.instance.grant_write_acp
end

def test_configure_with_s3_bucket_tag
conf = config_element(
'ROOT', '', {
'@type' => 's3',
's3_bucket' => 's3-${tag}'
}, [
config_element(
'buffer', 'tag,time', {
'@type' => 'file',
'path' => 'tmp'
}
)])
d = create_driver(conf)
assert_equal "s3-\${tag}", d.instance.s3_bucket
end

def test_format
d = create_driver

Expand Down Expand Up @@ -411,6 +427,30 @@ def test_write_with_custom_s3_object_key_format_containing_hex_random_placeholde
FileUtils.rm_f(s3_local_file_path)
end

def test_write_with_custom_s3_bucket_placeholder
conf = config_element(
'ROOT', '', {
'@type' => 's3',
's3_bucket' => 's3-${tag}',
}, [
config_element(
'buffer', 'tag,time', {
'@type' => 'file',
'path' => 'tmp'
}
)])
setup_mocks(true)

d = create_time_sliced_driver(conf)
time = event_time("2011-01-02 13:14:15 UTC")
d.run(default_tag: "test") do
d.feed(time, {"a"=>1})
d.feed(time, {"a"=>2})
end

assert_equal "s3-test", d.instance.s3_bucket
end

class MockResponse
attr_reader :data

Expand Down