diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index 5fb653db..c6d67ff0 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -2,6 +2,7 @@ require 'fluent/log-ext' require 'fluent/timezone' require 'aws-sdk-s3' +require 'concurrent' require 'zlib' require 'time' require 'tempfile' @@ -223,7 +224,7 @@ def configure(conf) # For backward compatibility # TODO: Remove time_slice_format when end of support compat_parameters @configured_time_slice_format = conf['time_slice_format'] - @values_for_s3_object_chunk = {} + @values_for_s3_object_chunk = Concurrent::Hash.new @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])) end @@ -251,6 +252,9 @@ def start @s3 = Aws::S3::Resource.new(client: s3_client) @bucket = @s3.bucket(@s3_bucket) + @index = Concurrent::AtomicFixnum.new(-1) + @time_slice = Concurrent::AtomicReference.new + check_apikeys if @check_apikey_on_start ensure_bucket if @check_bucket ensure_bucket_lifecycle @@ -273,8 +277,18 @@ def write(chunk) @time_slice_with_tz.call(metadata.timekey) end + # If we set a new time slice, then reset our index. + # There is a small race here, where a new time slice can have an old index set. + # This shouldn't be a problem if @check_object is enabled but could cause overwrites + # otherwise, when the old index is reached on the new timeslice + if @time_slice.get_and_set(time_slice) != time_slice + @index.value= -1 + end + if @check_object begin + i = @index.increment + @values_for_s3_object_chunk[chunk.unique_id] ||= { "%{hex_random}" => hex_random(chunk), } @@ -284,7 +298,7 @@ def write(chunk) } values_for_s3_object_key_post = { "%{time_slice}" => time_slice, - "%{index}" => sprintf(@index_format,i), + "%{index}" => sprintf(@index_format, i), }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled @@ -302,7 +316,6 @@ def write(chunk) end end - i += 1 previous_path = s3path end while @bucket.object(s3path).exists? else