From 58ad2a184604bac9cb02e1ad22bbfda53017e89c Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Mon, 29 Aug 2016 10:56:55 -0400 Subject: [PATCH 1/8] refactoring for dynamic prefixes and new concurrency model :shared **Motivation** One of the most requested features was adding a way to add dynamic prefixes using the fieldref syntax for the files on the bucket and also the changes in the pipeline to support shared delegator. The S3 output by nature was always a single threaded writes but had multiples workers to process the upload, the code was threadsafe when used in the concurrency `:single` mode. This PR addresses a few problems and provide shorter and more structured code: - This Plugin now uses the V2 version of the SDK, this make sure we receive the latest updates and changes. - We now uses S3's `upload_file` instead of reading chunks, this method is more efficient and will uses the multipart with threads if the files is too big. - You can now use the `fieldref` syntax in the prefix to dynamically changes the target with the events it receives. - The Upload queue is now a bounded list, this options is necessary to allow back pressure to be communicated back to the pipeline but its configurable by the user. - If the queue is full the plugin will start the upload in the current thread. - The plugin now threadsafe and support the concurrency model `shared` - The rotation strategy can be selected, the recommended is `size_and_time` that will check for both the configured limits (`size` and `time` are also available) - The `restore` option will now use a separate threadpool with an unbounded queue - The `restore` option will not block the launch of logstash and will uses less resources than the real time path - The plugin now uses `multi_receive_encode`, this will optimize the writes to the files - rotate operation are now batched to reduce the number of IO calls. - Empty file will not be uploaded by any rotation rotation strategy - We now use Concurrent-Ruby for the implementation of the java executor - If you have finer grain permission on prefixes or want faster boot, you can disable the credentials check with `validate_credentials_on_root_bucket` - The credentials check will no longer fails if we can't delete the file - We now have a full suite of integration test for all the defined rotation Fixes: #4 #81 #44 #59 #50 --- lib/logstash/outputs/s3.rb | 493 +++++++----------- lib/logstash/outputs/s3/file_repository.rb | 96 ++++ lib/logstash/outputs/s3/path_validator.rb | 18 + .../s3/size_and_time_rotation_policy.rb | 24 + .../outputs/s3/size_rotation_policy.rb | 26 + lib/logstash/outputs/s3/temporary_file.rb | 45 ++ .../outputs/s3/temporary_file_factory.rb | 90 ++++ .../outputs/s3/time_rotation_policy.rb | 26 + lib/logstash/outputs/s3/uploader.rb | 60 +++ .../s3/writable_directory_validator.rb | 17 + .../s3/write_bucket_permission_validator.rb | 49 ++ logstash-output-s3.gemspec | 1 + spec/integration/dynamic_prefix_spec.rb | 92 ++++ spec/integration/restore_from_crash_spec.rb | 39 ++ spec/integration/s3_spec.rb | 97 ---- spec/integration/size_rotation_spec.rb | 58 +++ spec/integration/stress_test_spec.rb | 60 +++ ...based_rotation_with_constant_write_spec.rb | 60 +++ ...me_based_rotation_with_stale_write_spec.rb | 55 ++ .../upload_current_file_on_shutdown_spec.rb | 53 ++ spec/outputs/s3/file_repository_spec.rb | 123 +++++ .../s3/size_and_time_rotation_policy_spec.rb | 76 +++ spec/outputs/s3/size_rotation_policy_spec.rb | 40 ++ .../outputs/s3/temporary_file_factory_spec.rb | 85 +++ spec/outputs/s3/temporary_file_spec.rb | 46 ++ spec/outputs/s3/time_rotation_policy_spec.rb | 55 ++ spec/outputs/s3/uploader_spec.rb | 56 ++ .../s3/writable_directory_validator_spec.rb | 40 ++ .../write_bucket_permission_validator_spec.rb | 38 ++ spec/outputs/s3_spec.rb | 381 ++------------ spec/spec_helper.rb | 3 + spec/supports/helpers.rb | 42 +- 32 files changed, 1693 insertions(+), 751 deletions(-) create mode 100644 lib/logstash/outputs/s3/file_repository.rb create mode 100644 lib/logstash/outputs/s3/path_validator.rb create mode 100644 lib/logstash/outputs/s3/size_and_time_rotation_policy.rb create mode 100644 lib/logstash/outputs/s3/size_rotation_policy.rb create mode 100644 lib/logstash/outputs/s3/temporary_file.rb create mode 100644 lib/logstash/outputs/s3/temporary_file_factory.rb create mode 100644 lib/logstash/outputs/s3/time_rotation_policy.rb create mode 100644 lib/logstash/outputs/s3/uploader.rb create mode 100644 lib/logstash/outputs/s3/writable_directory_validator.rb create mode 100644 lib/logstash/outputs/s3/write_bucket_permission_validator.rb create mode 100644 spec/integration/dynamic_prefix_spec.rb create mode 100644 spec/integration/restore_from_crash_spec.rb delete mode 100644 spec/integration/s3_spec.rb create mode 100644 spec/integration/size_rotation_spec.rb create mode 100644 spec/integration/stress_test_spec.rb create mode 100644 spec/integration/time_based_rotation_with_constant_write_spec.rb create mode 100644 spec/integration/time_based_rotation_with_stale_write_spec.rb create mode 100644 spec/integration/upload_current_file_on_shutdown_spec.rb create mode 100644 spec/outputs/s3/file_repository_spec.rb create mode 100644 spec/outputs/s3/size_and_time_rotation_policy_spec.rb create mode 100644 spec/outputs/s3/size_rotation_policy_spec.rb create mode 100644 spec/outputs/s3/temporary_file_factory_spec.rb create mode 100644 spec/outputs/s3/temporary_file_spec.rb create mode 100644 spec/outputs/s3/time_rotation_policy_spec.rb create mode 100644 spec/outputs/s3/uploader_spec.rb create mode 100644 spec/outputs/s3/writable_directory_validator_spec.rb create mode 100644 spec/outputs/s3/write_bucket_permission_validator_spec.rb create mode 100644 spec/spec_helper.rb diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 257ef357..9492b6a1 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -4,10 +4,13 @@ require "logstash/plugin_mixins/aws_config" require "stud/temporary" require "stud/task" -require "socket" # for Socket.gethostname +require "concurrent" +require "socket" require "thread" require "tmpdir" require "fileutils" +require "set" +require "pathname" # INFORMATION: @@ -17,35 +20,34 @@ # Requirements: # * Amazon S3 Bucket and S3 Access Permissions (Typically access_key_id and secret_access_key) # * S3 PutObject permission -# * Run logstash as superuser to establish connection # -# S3 outputs create temporary files into "/opt/logstash/S3_temp/". If you want, you can change the path at the start of register method. +# S3 outputs create temporary files into the OS' temporary directory, you can specify where to save them using the `temporary_directory` option. # # S3 output files have the following format # # ls.s3.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt # -# ls.s3 : indicate logstash plugin s3 # -# "ip-10-228-27-95" : indicates the ip of your machine. -# "2013-04-18T10.00" : represents the time whenever you specify time_file. -# "tag_hello" : this indicates the event's tag. -# "part0" : this means if you indicate size_file then it will generate more parts if you file.size > size_file. -# When a file is full it will be pushed to the bucket and then deleted from the temporary directory. -# If a file is empty, it is simply deleted. Empty files will not be pushed +# |======= +# | ls.s3 | indicate logstash plugin s3 | +# | ip-10-228-27-95 | indicates the ip of your machine. | +# | 2013-04-18T10.00 | represents the time whenever you specify time_file. | +# | tag_hello | this indicates the event's tag. | +# | part0 | this means if you indicate size_file then it will generate more parts if you file.size > size_file. When a file is full it will be pushed to the bucket and then deleted from the temporary directory. If a file is empty, it is simply deleted. Empty files will not be pushed | +# |======= # # Crash Recovery: -# * This plugin will recover and upload temporary log files after crash/abnormal termination +# * This plugin will recover and upload temporary log files after crash/abnormal termination when using `restore` set to true # ##[Note regarding time_file and size_file] : # -# Both time_file and size_file settings can trigger a log "file rotation" -# A log rotation pushes the current log "part" to s3 and deleted from local temporary storage. +## Both time_file and size_file settings can trigger a log "file rotation" +## A log rotation pushes the current log "part" to s3 and deleted from local temporary storage. # -## If you specify BOTH size_file and time_file then it will create file for each tag (if specified). +## If you specify BOTH size_file and time_file then it will create file for each tag (if specified). ## When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered. ## -## If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created.. +## If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created. ## When time_file minutes elapses, a log rotation will be triggered. # ## If you ONLY specify size_file, but NOT time_file, one files for each tag (if specified) will be created. @@ -63,37 +65,54 @@ # access_key_id => "crazy_key" (required) # secret_access_key => "monkey_access_key" (required) # region => "eu-west-1" (optional, default = "us-east-1") -# bucket => "boss_please_open_your_bucket" (required) +# bucket => "your_bucket" (required) # size_file => 2048 (optional) - Bytes # time_file => 5 (optional) - Minutes # format => "plain" (optional) -# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read", "bucket_owner_full_control". Defaults to "private" ) +# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) # } # class LogStash::Outputs::S3 < LogStash::Outputs::Base - include LogStash::PluginMixins::AwsConfig + require "logstash/outputs/s3/writable_directory_validator" + require "logstash/outputs/s3/path_validator" + require "logstash/outputs/s3/write_bucket_permission_validator" + require "logstash/outputs/s3/size_rotation_policy" + require "logstash/outputs/s3/time_rotation_policy" + require "logstash/outputs/s3/size_and_time_rotation_policy" + require "logstash/outputs/s3/temporary_file" + require "logstash/outputs/s3/temporary_file_factory" + require "logstash/outputs/s3/uploader" + require "logstash/outputs/s3/file_repository" + + include LogStash::PluginMixins::AwsConfig::V2 + + PREFIX_KEY_NORMALIZE_CHARACTER = "_" + PERIODIC_CHECK_INTERVAL_IN_SECONDS = 15 + CRASH_RECOVERY_THREADPOOL = Concurrent::ThreadPoolExecutor.new({ + :min_threads => 1, + :max_threads => 2, + :fallback_policy => :caller_runs + }) - TEMPFILE_EXTENSION = "txt" - S3_INVALID_CHARACTERS = /[\^`><]/ config_name "s3" - default :codec, 'line' + default :codec, "line" - concurrency :single + concurrency :shared # S3 bucket - config :bucket, :validate => :string + config :bucket, :validate => :string, :required => true # Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file. # If you have tags then it will generate a specific size file for every tags ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. - config :size_file, :validate => :number, :default => 0 + config :size_file, :validate => :number, :default => 1024 * 1024 * 5 # Set the time, in MINUTES, to close the current sub_time_section of bucket. # If you define file_size you have a number of files in consideration of the section and the current tag. # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, # for now the only thing this plugin can do is to put the file when logstash restart. - config :time_file, :validate => :number, :default => 0 + config :time_file, :validate => :number, :default => 15 * 60 ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false". ## This is hack for not destroy the new files after restoring the initial files. @@ -102,7 +121,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base config :restore, :validate => :boolean, :default => false # The S3 canned ACL to use when putting the file. Defaults to "private". - config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read", "bucket_owner_full_control"], + config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"], :default => "private" # Specifies wether or not to use S3's AES256 server side encryption. Defaults to false. @@ -113,10 +132,14 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base config :temporary_directory, :validate => :string, :default => File.join(Dir.tmpdir, "logstash") # Specify a prefix to the uploaded filename, this can simulate directories on S3. Prefix does not require leading slash. + # This option support string interpolation, be warned this can created a lot of temporary local files. config :prefix, :validate => :string, :default => '' # Specify how many workers to use to upload the files to S3 - config :upload_workers_count, :validate => :number, :default => 1 + config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.25).round + + # Number of items we can keep in the local queue before uploading them + config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).round # The version of the S3 signature hash to use. Normally uses the internal client default, can be explicitly # specified here @@ -135,348 +158,202 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # Specify the content encoding. Supports ("gzip"). Defaults to "none" config :encoding, :validate => ["none", "gzip"], :default => "none" - # Exposed attributes for testing purpose. - attr_accessor :tempfile - attr_reader :page_counter, :upload_workers - attr_reader :s3 - - def aws_s3_config - @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region) - @s3 = AWS::S3.new(full_options) - end - - def full_options - aws_options_hash.merge(signature_options) - end - - def signature_options - if @signature_version - {:s3_signature_version => @signature_version} - else - {} - end - end - - def aws_service_endpoint(region) - return { - :s3_endpoint => region == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region}.amazonaws.com" - } - end - - public - def write_on_bucket(file) - # find and use the bucket - bucket = @s3.buckets[@bucket] - - remote_filename = "#{@prefix}#{File.basename(file)}" + # Define the strategy to use to decide when we need to rotate the file and push it to S3, + # The default strategy is to check for both size and time, the first one to match will rotate the file. + config :rotation_strategy, :validate => ["size_and_time", "size", "time"], :default => "size_and_time" - @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket) + # The common use case is to define permission on the root bucket and give Logstash full access to write his logs. + # In some circonstances you need finer grained permission on subfolder, this allow you to disable the check at startup. + config :validate_credentials_on_root_bucket, :validate => :boolean, :default => true - File.open(file, 'r') do |fileIO| - begin - # prepare for write the file - object = bucket.objects[remote_filename] - object.write(fileIO, - :acl => @canned_acl, - :server_side_encryption => @server_side_encryption ? :aes256 : nil, - :content_encoding => @encoding == "gzip" ? "gzip" : nil) - rescue AWS::Errors::Base => error - @logger.error("S3: AWS error", :error => error) - raise LogStash::Error, "AWS Configuration Error, #{error}" + def register + # I've move the validation of the items into custom classes + # to prepare for the new config validation that will be part of the core so the core can + # be moved easily. + unless @prefix.empty? + if !PathValidator.valid?(prefix) + raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}" end end - @logger.debug("S3: has written remote file in bucket with canned ACL", :remote_filename => remote_filename, :bucket => @bucket, :canned_acl => @canned_acl) - end - - # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. - public - def create_temporary_file - filename = File.join(@temporary_directory, get_temporary_filename(@page_counter)) - - @logger.debug("S3: Creating a new temporary file", :filename => filename) - - @file_rotation_lock.synchronize do - unless @tempfile.nil? - @tempfile.close - end - - if @encoding == "gzip" - @tempfile = Zlib::GzipWriter.open(filename) - else - @tempfile = File.open(filename, "a") - end + if !WritableDirectoryValidator.valid?(@temporary_directory) + raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}" end - end - public - def register - require "aws-sdk" - # required if using ruby version < 2.0 - # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby - AWS.eager_autoload!(AWS::S3) - - @s3 = aws_s3_config - @upload_queue = Queue.new - @file_rotation_lock = Mutex.new - - if @prefix && @prefix =~ S3_INVALID_CHARACTERS - @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS) - raise LogStash::ConfigurationError, "S3: prefix contains invalid characters" + if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.valid?(bucket_resource) + raise LogStash::ConfigurationError, "Logstash must have the privileges to write to root bucket `#{@bucket}`, check you credentials or your permissions." end - if !Dir.exist?(@temporary_directory) - FileUtils.mkdir_p(@temporary_directory) + if @time_file.nil? && @size_file.nil? || @size_file == 0 && @time_file == 0 + raise LogStash::ConfigurationError, "Logstash need at `time_file` or `size_file` greather than 0" end - test_s3_write - - restore_from_crashes if @restore == true - reset_page_counter - create_temporary_file - configure_periodic_rotation if time_file != 0 - configure_upload_workers - - @codec.on_event do |event, encoded_event| - handle_event(encoded_event) - end - end + @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) + @rotation = rotation_strategy + @uploader = Uploader.new(bucket_resource, @logger, Concurrent::ThreadPoolExecutor.new({ + :min_threads => 1, + :max_threads => @upload_workers_count, + :max_queue => @upload_queue_size, + :fallback_policy => :caller_runs + })) - # Use the same method that Amazon use to check - # permission on the user bucket by creating a small file - public - def test_s3_write - @logger.debug("S3: Creating a test file on S3") + # Restoring from crash will use a new threadpool to slowly recover + # New events should have more priority. + restore_from_crash if @restore - test_filename = File.join(@temporary_directory, - "logstash-programmatic-access-test-object-#{Time.now.to_i}") + # If we need time based rotation we need to do periodic check on the file + # to take care of file that were not updated recently + start_periodic_check if @rotation.need_periodic? + end - File.open(test_filename, 'a') do |file| - file.write('test') - end + def multi_receive_encoded(events_and_encoded) + prefix_written_to = Set.new - begin - write_on_bucket(test_filename) + events_and_encoded.each do |event, encoded| + prefix_key = normalize_key(event.sprintf(@prefix)) + prefix_written_to << prefix_key begin - remote_filename = "#{@prefix}#{File.basename(test_filename)}" - bucket = @s3.buckets[@bucket] - bucket.objects[remote_filename].delete - rescue StandardError => e - # we actually only need `put_object`, but if we dont delete them - # we can have a lot of tests files + @file_repository.get_file(prefix_key) { |file| file.write(encoded) } + # The output should stop accepting new events coming in, since he cannot do anything with them anymore. + # Log the error and rethrow it. + rescue Errno::ENOSPC => e + @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) + raise e end - ensure - File.delete(test_filename) end + + # Groups IO calls to optimize fstat checks + rotate_if_needed(prefix_written_to) end - public - def restore_from_crashes - @logger.debug("S3: Checking for temp files from a previoius crash...") + def close + stop_periodic_check if @rotation.need_periodic? - Dir[File.join(@temporary_directory, "*.#{get_tempfile_extension}")].each do |file| - name_file = File.basename(file) - @logger.warn("S3: Found temporary file from crash. Uploading file to S3.", :filename => name_file) - move_file_to_bucket_async(file) - end - end + @logger.debug("Uploading current workspace") - public - def move_file_to_bucket(file) - if !File.zero?(file) - write_on_bucket(file) - @logger.debug("S3: File was put on the upload thread", :filename => File.basename(file), :bucket => @bucket) + # The plugin has stopped receiving new events, but we still have + # data on disk, lets make sure it get to S3. + # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup + # the content in the temporary directly and upload it. + # This will block the shutdown until all upload are done or the use force quit. + @file_repository.each_files do |file| + upload_file(file) end - begin - File.delete(file) - rescue Errno::ENOENT - # Something else deleted the file, logging but not raising the issue - @logger.warn("S3: Cannot delete the temporary file since it doesn't exist on disk", :filename => File.basename(file)) - rescue Errno::EACCES - @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory) - end - end + @file_repository.shutdown - public - def periodic_interval - @time_file * 60 + @uploader.stop # wait until all the current upload are complete + @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done end - private - def get_tempfile_extension - @encoding == "gzip" ? "#{TEMPFILE_EXTENSION}.gz" : "#{TEMPFILE_EXTENSION}" + def full_options + options = { :credentials => credentials } + options[:s3_signature_version] = @signature_version if @signature_version + options.merge(aws_options_hash) end - public - def get_temporary_filename(page_counter = 0) - current_time = Time.now - filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}" - - if @tags.size > 0 - return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{get_tempfile_extension}" - else - return "#{filename}.part#{page_counter}.#{get_tempfile_extension}" - end + def normalize_key(prefix_key) + prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER) end - public - def receive(event) + private + # We start a task in the background for check for stale files and make sure we rotate them to S3 if needed. + def start_periodic_check + @logger.debug("Start periodic rotation check") - @codec.encode(event) - end + @periodic_check = Concurrent::TimerTask.new(:execution_interval => PERIODIC_CHECK_INTERVAL_IN_SECONDS) do + @logger.debug("Periodic check for stale files") - public - def rotate_events_log? - @file_rotation_lock.synchronize do - tempfile_size > @size_file + rotate_if_needed(@file_repository.keys) end - end - private - def tempfile_size - if @tempfile.instance_of? File - @tempfile.size - elsif @tempfile.instance_of? Zlib::GzipWriter - @tempfile.tell - else - raise LogStash::Error, "Unable to get size of temp file of type #{@tempfile.class}" - end + @periodic_check.execute end - public - def write_events_to_multiple_files? - @size_file > 0 + def stop_periodic_check + @periodic_check.shutdown end - public - def write_to_tempfile(event) - begin - @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile.path)) - - @file_rotation_lock.synchronize do - @tempfile.write(event) - end - rescue Errno::ENOSPC - @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) - close - end + def bucket_resource + Aws::S3::Bucket.new(@bucket, { :credentials => credentials }.merge(aws_options_hash)) end - public - def close - shutdown_upload_workers - @periodic_rotation_thread.stop! if @periodic_rotation_thread - - @file_rotation_lock.synchronize do - @tempfile.close unless @tempfile.nil? && @tempfile.closed? - end + def aws_service_endpoint(region) + { :s3_endpoint => region == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region}.amazonaws.com"} end - private - def shutdown_upload_workers - @logger.debug("S3: Gracefully shutdown the upload workers") - @upload_queue << LogStash::SHUTDOWN + def upload_options + { + :acl => @cannel_acl, + :server_side_encryption => @server_side_encryption ? :aes256 : nil, + :content_encoding => @encoding == "gzip" ? "gzip" : nil + } end - private - def handle_event(encoded_event) - if write_events_to_multiple_files? - if rotate_events_log? - @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile.path)) - - tempfile_path = @tempfile.path - # close and start next file before sending the previous one - next_page - create_temporary_file - - # send to s3 - move_file_to_bucket_async(tempfile_path) - else - @logger.debug("S3: tempfile file size report.", :tempfile_size => tempfile_size, :size_file => @size_file) + def rotate_if_needed(prefixes) + prefixes.each do |prefix| + # Each file access is thread safe, until the rotation is done then only + # one thread has access to the resource. + @file_repository.get_factory(prefix) do |factory| + temp_file = factory.current + + if @rotation.rotate?(temp_file) + @logger.debug("Rotate file", + :strategy => @rotation.class.name, + :key => temp_file.key, + :path => temp_file.path) + + upload_file(temp_file) + factory.rotate! + end end end - - write_to_tempfile(encoded_event) end - private - def configure_periodic_rotation - @periodic_rotation_thread = Stud::Task.new do - LogStash::Util::set_thread_name(" true) do - @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path) - - tempfile_path = @tempfile.path - # close and start next file before sending the previous one - next_page - create_temporary_file + def upload_file(temp_file) + @logger.debug("Queue for upload", :path => temp_file.path) - # send to s3 - move_file_to_bucket_async(tempfile_path) - end + # if the queue is full the calling thread will be used to upload + temp_file.fsync # make sure we flush the fd before uploading it. + if temp_file.size > 0 + @uploader.upload_async(temp_file, + :on_complete => method(:clean_temporary_file), + :upload_options => upload_options ) end end - private - def configure_upload_workers - @logger.debug("S3: Configure upload workers") - - @upload_workers = @upload_workers_count.times.map do |worker_id| - Stud::Task.new do - LogStash::Util::set_thread_name(" worker_id) - - continue = upload_worker - end - end + def rotation_strategy + case @rotation_strategy + when "size" + SizeRotationPolicy.new(size_file) + when "time" + TimeRotationPolicy.new(time_file) + when "size_and_time" + SizeAndTimeRotationPolicy.new(size_file, time_file) end end - private - def upload_worker - file = nil - begin - file = @upload_queue.deq - - if file == LogStash::SHUTDOWN - @logger.debug("S3: upload worker is shutting down gracefuly") - @upload_queue.enq(LogStash::SHUTDOWN) - false - else - @logger.debug("S3: upload working is uploading a new file", :filename => File.basename(file)) - move_file_to_bucket(file) - true - end - rescue Exception => ex - @logger.error("failed to upload, will re-enqueue #{file} for upload", - :ex => ex, :backtrace => ex.backtrace) - unless file.nil? # Rare case if the first line of the begin doesn't execute - @upload_queue.enq(file) - end - true - end + def clean_temporary_file(file) + @logger.debug("Removing temporary file", :file => file.path) + file.delete! end - private - def next_page - @page_counter += 1 - end + # The upload process will use a separate uploader/threadpool with less resource allocated to it. + # but it will use an unbounded queue for the work, it may take some time before all the older files get processed. + def restore_from_crash + @crash_uploader = Uploader.new(bucket_resource, @logger, CRASH_RECOVERY_THREADPOOL) - private - def reset_page_counter - @page_counter = 0 - end + temp_folder_path = Pathname.new(@temporary_directory) + Dir.glob(::File.join(@temporary_directory, "**/*")) do |file| + if ::File.file?(file) + key_parts = Pathname.new(file).relative_path_from(temp_folder_path).to_s.split(::File::SEPARATOR) + temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r")) - private - def move_file_to_bucket_async(file) - @logger.debug("S3: Sending the file to the upload queue.", :filename => File.basename(file)) - @upload_queue.enq(file) + @logger.debug("Recover from crash and uploading", :file => temp_file.path) + @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file)) + end + end end end diff --git a/lib/logstash/outputs/s3/file_repository.rb b/lib/logstash/outputs/s3/file_repository.rb new file mode 100644 index 00000000..3da7576f --- /dev/null +++ b/lib/logstash/outputs/s3/file_repository.rb @@ -0,0 +1,96 @@ +# encoding: utf-8 +require "concurrent" +require "concurrent/map" +require "concurrent/timer_task" +require "logstash/util" + +module LogStash + module Outputs + class S3 + class FileRepository + DEFAULT_STATE_SWEEPER_INTERVAL_SECS = 60 + DEFAULT_STALE_TIME_SECS = 15 * 60 + # Ensure that all access or work done + # on a factory is threadsafe + class PrefixedValue + def initialize(factory, stale_time) + @factory = factory + @lock = Mutex.new + @stale_time = stale_time + end + + def with_lock + @lock.synchronize { + yield @factory + } + end + + def stale? + with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) } + end + end + + def initialize(tags, encoding, temporary_directory, + stale_time = DEFAULT_STALE_TIME_SECS, + sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) + # The path need to contains the prefix so when we start + # logtash after a crash we keep the remote structure + @prefixed_factories = Concurrent::Map.new + + @tags = tags + @encoding = encoding + @temporary_directory = temporary_directory + + @stale_time = stale_time + @sweeper_interval = sweeper_interval + + start_stale_sweeper + end + + def keys + @prefixed_factories.keys + end + + def each_files + @prefixed_factories.each_value do |prefixed_file| + prefixed_file.with_lock { |factory| yield factory.current } + end + end + + # Return the file factory + def get_factory(prefix_key) + @prefixed_factories.compute_if_absent(prefix_key) { PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) } + .with_lock { |factory| yield factory } + end + + def get_file(prefix_key) + get_factory(prefix_key) { |factory| yield factory.current } + end + + def shutdown + stop_stale_sweeper + end + + def size + @prefixed_factories.size + end + + def start_stale_sweeper + @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do + LogStash::Util.set_thread_name("S3, Stale factory sweeper") + + @prefixed_factories.each_pair do |k, v| + @prefixed_factories.delete_pair(k, v) if v.stale? + end + end + + @stale_sweeper.execute + end + + def stop_stale_sweeper + @stale_sweeper.shutdown + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/path_validator.rb b/lib/logstash/outputs/s3/path_validator.rb new file mode 100644 index 00000000..0311e450 --- /dev/null +++ b/lib/logstash/outputs/s3/path_validator.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 +module LogStash + module Outputs + class S3 + class PathValidator + INVALID_CHARACTERS = "\^`><" + + def self.valid?(name) + name.match(matches_re).nil? + end + + def self.matches_re + /[#{Regexp.escape(INVALID_CHARACTERS)}]/ + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb b/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb new file mode 100644 index 00000000..356d86cf --- /dev/null +++ b/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb @@ -0,0 +1,24 @@ +# encoding: utf-8 +require "logstash/outputs/s3/size_rotation_policy" +require "logstash/outputs/s3/time_rotation_policy" + +module LogStash + module Outputs + class S3 + class SizeAndTimeRotationPolicy + def initialize(file_size, time_file) + @size_strategy = SizeRotationPolicy.new(file_size) + @time_strategy = TimeRotationPolicy.new(time_file) + end + + def rotate?(file) + @size_strategy.rotate?(file) || @time_strategy.rotate?(file) + end + + def need_periodic? + true + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/size_rotation_policy.rb b/lib/logstash/outputs/s3/size_rotation_policy.rb new file mode 100644 index 00000000..2b1dbdbc --- /dev/null +++ b/lib/logstash/outputs/s3/size_rotation_policy.rb @@ -0,0 +1,26 @@ +# encoding: utf-8 +module LogStash + module Outputs + class S3 + class SizeRotationPolicy + attr_reader :size_file + + def initialize(size_file) + if size_file <= 0 + raise LogStash::ConfigurationError, "`size_file` need to be greather than 0" + end + + @size_file = size_file + end + + def rotate?(file) + file.size >= size_file + end + + def need_periodic? + false + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/temporary_file.rb b/lib/logstash/outputs/s3/temporary_file.rb new file mode 100644 index 00000000..a604e084 --- /dev/null +++ b/lib/logstash/outputs/s3/temporary_file.rb @@ -0,0 +1,45 @@ +# encoding: utf-8 +require "thread" +require "forwardable" +require "fileutils" + +module LogStash + module Outputs + class S3 + # Wrap the actual file descriptor into an utility classe + # It make it more OOP and easier to reason with the paths. + class TemporaryFile + extend Forwardable + DELEGATES_METHODS = [:path, :write, :close, :size, :fsync] + + def_delegators :@fd, *DELEGATES_METHODS + + def initialize(key, fd) + @fd = fd + @key = key + @created_at = Time.now + end + + def ctime + @created_at + end + + def key + @key.gsub(/^\//, "") + end + + # Each temporary file is made inside a directory named with an UUID, + # instead of deleting the file directly and having the risk of deleting other files + # we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as + # a sandbox. + def delete! + ::FileUtils.rm_rf(path.gsub(/#{Regexp.escape(key)}$/, "")) + end + + def empty? + size == 0 + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/temporary_file_factory.rb b/lib/logstash/outputs/s3/temporary_file_factory.rb new file mode 100644 index 00000000..a93932e9 --- /dev/null +++ b/lib/logstash/outputs/s3/temporary_file_factory.rb @@ -0,0 +1,90 @@ +# encoding: utf-8 +require "socket" +require "securerandom" +require "fileutils" + +module LogStash + module Outputs + class S3 + # Since the file can contains dynamic part, we have to handle a more local structure to + # allow a nice recovery from a crash. + # + # The local structure will look like this. + # + # ///ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz + # + # Since the UUID should be fairly unique I can destroy the whole path when an upload is complete. + # I do not have to mess around to check if the other directory have file in it before destroying them. + class TemporaryFileFactory + FILE_MODE = "a" + GZIP_ENCODING = "gzip" + GZIP_EXTENSION = "txt.gz" + TXT_EXTENSION = "txt" + STRFTIME = "%Y-%m-%dT%H.%M" + + attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current + + def initialize(prefix, tags, encoding, temporary_directory) + @counter = 0 + @prefix = prefix + + @tags = tags + @encoding = encoding + @temporary_directory = temporary_directory + + rotate! + end + + def rotate! + @current = new_file + increment_counter + @current + end + + private + def extension + gzip? ? GZIP_EXTENSION : TXT_EXTENSION + end + + def gzip? + encoding == GZIP_ENCODING + end + + def increment_counter + @counter += 1 + end + + def current_time + Time.now.strftime(STRFTIME) + end + + def generate_name + filename = "ls.s3.#{Socket.gethostname}.#{current_time}" + + if tags.size > 0 + "#{filename}.tag_#{tags.join('.')}.part#{counter}.#{extension}" + else + "#{filename}.part#{counter}.#{extension}" + end + end + + def new_file + uuid = SecureRandom.uuid + name = generate_name + path = ::File.join(temporary_directory, uuid, prefix) + key = ::File.join(prefix, name) + + FileUtils.mkdir_p(path) + + io = if gzip? + Zlib::GzipWriter.open(::File.join(path, name)) + else + ::File.open(::File.join(path, name), FILE_MODE) + end + + TemporaryFile.new(key, io) + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/time_rotation_policy.rb b/lib/logstash/outputs/s3/time_rotation_policy.rb new file mode 100644 index 00000000..e772d983 --- /dev/null +++ b/lib/logstash/outputs/s3/time_rotation_policy.rb @@ -0,0 +1,26 @@ +# encoding: utf-8 +module LogStash + module Outputs + class S3 + class TimeRotationPolicy + attr_reader :time_file + + def initialize(time_file) + if time_file <= 0 + raise LogStash::ConfigurationError, "`time_file` need to be greather than 0" + end + + @time_file = time_file + end + + def rotate?(file) + file.size > 0 && Time.now - file.ctime >= time_file + end + + def need_periodic? + true + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/uploader.rb b/lib/logstash/outputs/s3/uploader.rb new file mode 100644 index 00000000..3a7251a2 --- /dev/null +++ b/lib/logstash/outputs/s3/uploader.rb @@ -0,0 +1,60 @@ +# encoding: utf-8 +require "logstash/util" +require "aws-sdk-resources" + +module LogStash + module Outputs + class S3 + class Uploader + TIME_BEFORE_RETRYING_SECONDS = 1 + DEFAULT_THREADPOOL = Concurrent::ThreadPoolExecutor.new({ + :min_threads => 1, + :max_threads => 8, + :max_queue => 1, + :fallback_policy => :caller_runs + }) + + + attr_reader :bucket, :upload_options, :logger + + def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL) + @bucket = bucket + @workers_pool = threadpool + @logger = logger + end + + def upload_async(file, options = {}) + @workers_pool.post do + LogStash::Util.set_thread_name("S3 output uploader, file: #{file.path}") + upload(file, options) + end + end + + def upload(file, options = {}) + upload_options = options.fetch(:upload_options, {}) + + begin + obj = bucket.object(file.key) + obj.upload_file(file.path, upload_options) + rescue => e + # When we get here it usually mean that S3 tried to do some retry by himself (default is 3) + # When the retry limit is reached or another error happen we will wait and retry. + # + # Thread might be stuck here, but I think its better than losing anything + # its either a transient errors or something bad really happened. + sleep(TIME_BEFORE_RETRYING_SECONDS) + logger.error("Uploading failed, retrying", :exception => e, :path => file.path) + retry + end + + options[:on_complete].call(file) unless options[:on_complete].nil? + end + + def stop + @workers_pool.shutdown + @workers_pool.wait_for_termination(nil) # block until its done + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/writable_directory_validator.rb b/lib/logstash/outputs/s3/writable_directory_validator.rb new file mode 100644 index 00000000..bdcf4256 --- /dev/null +++ b/lib/logstash/outputs/s3/writable_directory_validator.rb @@ -0,0 +1,17 @@ +# encoding: utf-8 +module LogStash + module Outputs + class S3 + class WritableDirectoryValidator + def self.valid?(path) + begin + FileUtils.mkdir_p(path) unless Dir.exist?(path) + ::File.writable?(path) + rescue + false + end + end + end + end + end +end diff --git a/lib/logstash/outputs/s3/write_bucket_permission_validator.rb b/lib/logstash/outputs/s3/write_bucket_permission_validator.rb new file mode 100644 index 00000000..0b52a736 --- /dev/null +++ b/lib/logstash/outputs/s3/write_bucket_permission_validator.rb @@ -0,0 +1,49 @@ +# encoding: utf-8 +require "stud/temporary" +require "socket" +require "fileutils" + +module LogStash + module Outputs + class S3 + class WriteBucketPermissionValidator + def self.valid?(bucket_resource) + begin + upload_test_file(bucket_resource) + true + rescue + false + end + end + + private + def self.upload_test_file(bucket_resource) + generated_at = Time.now + + key = "logstash-programmatic-access-test-object-#{generated_at}" + content = "Logstash permission check on #{generated_at}, by #{Socket.gethostname}" + + begin + f = Stud::Temporary.file + f.write(content) + f.fsync + f.close + + obj = bucket_resource.object(key) + obj.upload_file(f) + + begin + obj.delete + rescue + # Try to remove the files on the remote bucket, + # but don't raise any errors if that doesn't work. + # since we only really need `putobject`. + end + ensure + FileUtils.rm_rf(f.path) + end + end + end + end + end +end diff --git a/logstash-output-s3.gemspec b/logstash-output-s3.gemspec index 3ed6eab6..17a7bb5f 100644 --- a/logstash-output-s3.gemspec +++ b/logstash-output-s3.gemspec @@ -22,6 +22,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-mixin-aws' + s.add_runtime_dependency "concurrent-ruby" s.add_runtime_dependency 'stud', '~> 0.0.22' s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'logstash-input-generator' diff --git a/spec/integration/dynamic_prefix_spec.rb b/spec/integration/dynamic_prefix_spec.rb new file mode 100644 index 00000000..71dec731 --- /dev/null +++ b/spec/integration/dynamic_prefix_spec.rb @@ -0,0 +1,92 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Dynamic Prefix", :integration => true do + include_context "setup plugin" + + let(:options) { main_options.merge({ "rotation_strategy" => "size" }) } + let(:sandbox) { "test" } + + before do + clean_remote_files(sandbox) + subject.register + subject.multi_receive_encoded(batch) + subject.close + end + + context "With field string" do + let(:prefix) { "/#{sandbox}/%{server}/%{language}" } + let(:batch) do + b = {} + e1 = LogStash::Event.new({ "server" => "es1", "language" => "ruby"}) + b[e1] = "es1-ruby" + e2 = LogStash::Event.new({ "server" => "es2", "language" => "java"}) + b[e2] = "es2-ruby" + b + end + + it "creates a specific quantity of files" do + expect(bucket_resource.objects(:prefix => sandbox).count).to eq(batch.size) + end + + it "creates specific keys" do + re = Regexp.union(/^es1\/ruby\/ls.s3.sashimi/, /^es2\/java\/ls.s3.sashimi/) + + bucket_resource.objects(:prefix => sandbox) do |obj| + expect(obj.key).to match(re) + end + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => sandbox).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(batch.size) + end + end + + context "with unsupported char" do + let(:prefix) { "/#{sandbox}/%{server}/%{language}" } + let(:batch) do + b = {} + e1 = LogStash::Event.new({ "server" => "e>s1", "language" => "ruby"}) + b[e1] = "es2-ruby" + b + end + + it "convert them to underscore" do + re = Regexp.union(/^e_s1\/ruby\/ls.s3.sashimi/) + + bucket_resource.objects(:prefix => sandbox) do |obj| + expect(obj.key).to match(re) + end + end + end + + context "with dates" do + let(:prefix) { "/#{sandbox}/%{+YYYY-MM-d}" } + + let(:batch) do + b = {} + e1 = LogStash::Event.new({ "server" => "e>s1", "language" => "ruby"}) + b[e1] = "es2-ruby" + b + end + + it "creates dated path" do + re = /^#{sandbox}\/\d{4}-\d{2}-\d{2}\/ls\.s3\./ + expect(bucket_resource.objects(:prefix => sandbox).first.key).to match(re) + end + end +end diff --git a/spec/integration/restore_from_crash_spec.rb b/spec/integration/restore_from_crash_spec.rb new file mode 100644 index 00000000..a119be6f --- /dev/null +++ b/spec/integration/restore_from_crash_spec.rb @@ -0,0 +1,39 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Restore from crash", :integration => true do + include_context "setup plugin" + + let(:options) { main_options.merge({ "restore" => true }) } + + let(:number_of_files) { 5 } + let(:dummy_content) { "foobar\n" * 100 } + + before do + clean_remote_files(prefix) + # Use the S3 factory to create mutliples files with dummy content + factory = LogStash::Outputs::S3::TemporaryFileFactory.new(prefix, tags, "none", temporary_directory) + + # Creating a factory always create a file + factory.current.write(dummy_content) + factory.current.fsync + + (number_of_files - 1).times do + factory.current.write(dummy_content) + factory.current.fsync + factory.rotate! + end + end + + it "uploads the file to the bucket" do + subject.register + try(20) do + expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(0) + expect(bucket_resource.objects(:prefix => prefix).count).to eq(number_of_files) + end + end +end + diff --git a/spec/integration/s3_spec.rb b/spec/integration/s3_spec.rb deleted file mode 100644 index 1fe208ba..00000000 --- a/spec/integration/s3_spec.rb +++ /dev/null @@ -1,97 +0,0 @@ -# encoding: utf-8 -require "logstash/devutils/rspec/spec_helper" -require "logstash/outputs/s3" -require 'socket' -require "aws-sdk" -require "fileutils" -require "stud/temporary" -require_relative "../supports/helpers" - -describe LogStash::Outputs::S3, :integration => true, :s3 => true do - before do - Thread.abort_on_exception = true - end - - let!(:minimal_settings) { { "access_key_id" => ENV['AWS_ACCESS_KEY_ID'], - "secret_access_key" => ENV['AWS_SECRET_ACCESS_KEY'], - "bucket" => ENV['AWS_LOGSTASH_TEST_BUCKET'], - "region" => ENV["AWS_REGION"] || "us-east-1", - "temporary_directory" => Stud::Temporary.pathname('temporary_directory') }} - - let!(:s3_object) do - s3output = LogStash::Outputs::S3.new(minimal_settings) - s3output.register - s3output.s3 - end - - after(:each) do - delete_matching_keys_on_bucket('studtmp') - delete_matching_keys_on_bucket('my-prefix') - end - - describe "#register" do - it "write a file on the bucket to check permissions" do - s3 = LogStash::Outputs::S3.new(minimal_settings) - expect(s3.register).not_to raise_error - end - end - - describe "#write_on_bucket" do - after(:each) do - File.unlink(fake_data.path) - end - - let!(:fake_data) { Stud::Temporary.file } - - it "should prefix the file on the bucket if a prefix is specified" do - prefix = "my-prefix" - - config = minimal_settings.merge({ - "prefix" => prefix, - }) - - s3 = LogStash::Outputs::S3.new(config) - s3.register - s3.write_on_bucket(fake_data) - - expect(key_exists_on_bucket?("#{prefix}#{File.basename(fake_data.path)}")).to eq(true) - end - - it 'should use the same local filename if no prefix is specified' do - s3 = LogStash::Outputs::S3.new(minimal_settings) - s3.register - s3.write_on_bucket(fake_data) - - expect(key_exists_on_bucket?(File.basename(fake_data.path))).to eq(true) - end - end - - describe "#move_file_to_bucket" do - let!(:s3) { LogStash::Outputs::S3.new(minimal_settings) } - - before do - s3.register - end - - it "should upload the file if the size > 0" do - tmp = Stud::Temporary.file - allow(File).to receive(:zero?).and_return(false) - s3.move_file_to_bucket(tmp) - expect(key_exists_on_bucket?(File.basename(tmp.path))).to eq(true) - end - end - - describe "#restore_from_crashes" do - it "read the temp directory and upload the matching file to s3" do - Stud::Temporary.pathname do |temp_path| - tempfile = File.open(File.join(temp_path, 'A'), 'w+') { |f| f.write('test')} - - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => temp_path })) - s3.restore_from_crashes - - expect(File.exist?(tempfile.path)).to eq(false) - expect(key_exists_on_bucket?(File.basename(tempfile.path))).to eq(true) - end - end - end -end diff --git a/spec/integration/size_rotation_spec.rb b/spec/integration/size_rotation_spec.rb new file mode 100644 index 00000000..85dced03 --- /dev/null +++ b/spec/integration/size_rotation_spec.rb @@ -0,0 +1,58 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Size rotation", :integration => true do + include_context "setup plugin" + + let(:size_file) { batch_size.times.inject(0) { |sum, i| sum + "#{event_encoded}\n".bytesize } } + let(:options) { main_options.merge({ "rotation_strategy" => "size" }) } + let(:number_of_events) { 5000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + number_of_events.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + let(:number_of_files) { number_of_events / batch_size } + + before do + clean_remote_files(prefix) + subject.register + batch.each_slice(batch_size) do |smaller_batch| + subject.multi_receive_encoded(smaller_batch) + end + subject.close + end + + it "creates a specific quantity of files" do + expect(bucket_resource.objects(:prefix => prefix).count).to eq(number_of_files) + end + + it "Rotates the files based on size" do + bucket_resource.objects(:prefix => prefix).each do |f| + expect(f.size).to eq(size_file) + end + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + end +end diff --git a/spec/integration/stress_test_spec.rb b/spec/integration/stress_test_spec.rb new file mode 100644 index 00000000..02e74c62 --- /dev/null +++ b/spec/integration/stress_test_spec.rb @@ -0,0 +1,60 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Upload current file on shutdown", :integration => true, :slow => true do + include_context "setup plugin" + let(:stress_time) { ENV["RUNTIME"] || 1 * 60 } + let(:options) { main_options } + + let(:time_file) { 15 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + batch_size.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + let(:workers) { 3 } + + it "Persists all events" do + started_at = Time.now + events_sent = {} + + clean_remote_files(prefix) + subject.register + + workers.times do + Thread.new do + events_sent[Thread.current] = 0 + + while Time.now - started_at < stress_time + subject.multi_receive_encoded(batch) + events_sent[Thread.current] += batch_size + end + end + end + + sleep(1) while Time.now - started_at < stress_time + + subject.close + + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(events_sent.values.inject(0, :+)) + end +end diff --git a/spec/integration/time_based_rotation_with_constant_write_spec.rb b/spec/integration/time_based_rotation_with_constant_write_spec.rb new file mode 100644 index 00000000..e3efd2cf --- /dev/null +++ b/spec/integration/time_based_rotation_with_constant_write_spec.rb @@ -0,0 +1,60 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "File Time rotation with constant write", :integration => true do + include_context "setup plugin" + + let(:time_file) { 0.5 } + let(:options) { main_options.merge({ "rotation_strategy" => "time" }) } + let(:number_of_events) { 5000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + number_of_events.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + let(:minimum_number_of_time_rotation) { 3 } + let(:batch_step) { (number_of_events / minimum_number_of_time_rotation).ceil } + + before do + clean_remote_files(prefix) + subject.register + + # simulate batch read/write + batch.each_slice(batch_step) do |batch_time| + batch_time.each_slice(batch_size) do |smaller_batch| + subject.multi_receive_encoded(smaller_batch) + end + sleep(time_file * 2) + end + + subject.close + end + + it "creates multiples files" do + # using close will upload the current file + expect(bucket_resource.objects(:prefix => prefix).count).to be_between(minimum_number_of_time_rotation, minimum_number_of_time_rotation + 1).inclusive + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + end +end diff --git a/spec/integration/time_based_rotation_with_stale_write_spec.rb b/spec/integration/time_based_rotation_with_stale_write_spec.rb new file mode 100644 index 00000000..f008e6bd --- /dev/null +++ b/spec/integration/time_based_rotation_with_stale_write_spec.rb @@ -0,0 +1,55 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "File Time rotation with stale write", :integration => true do + include_context "setup plugin" + + let(:time_file) { 1 } + let(:options) { main_options.merge({ "rotation_strategy" => "time" }) } + let(:number_of_events) { 5000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + number_of_events.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + + before do + stub_const('LogStash::Outputs::S3::PERIODIC_CHECK_INTERVAL_IN_SECONDS', 1) + clean_remote_files(prefix) + subject.register + subject.multi_receive_encoded(batch) + sleep(time_file * 5) # the periodic check should have kick int + end + + after do + subject.close + end + + it "create one file" do + # using close will upload the current file + expect(bucket_resource.objects(:prefix => prefix).count).to eq(1) + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + end +end diff --git a/spec/integration/upload_current_file_on_shutdown_spec.rb b/spec/integration/upload_current_file_on_shutdown_spec.rb new file mode 100644 index 00000000..efd32993 --- /dev/null +++ b/spec/integration/upload_current_file_on_shutdown_spec.rb @@ -0,0 +1,53 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Upload current file on shutdown", :integration => true do + include_context "setup plugin" + let(:options) { main_options } + + let(:size_file) { 1000000 } + let(:time_file) { 100000 } + let(:number_of_events) { 5000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + number_of_events.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + + before do + clean_remote_files(prefix) + subject.register + batch.each_slice(batch_size) do |smaller_batch| + subject.multi_receive_encoded(smaller_batch) + end + subject.close + end + + it "creates a specific quantity of files" do + # Since we have really big value of time_file and size_file + expect(bucket_resource.objects(:prefix => prefix).count).to eq(1) + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt") + object.get(:response_target => target) + counter += 1 + end + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + end +end diff --git a/spec/outputs/s3/file_repository_spec.rb b/spec/outputs/s3/file_repository_spec.rb new file mode 100644 index 00000000..9b9614fc --- /dev/null +++ b/spec/outputs/s3/file_repository_spec.rb @@ -0,0 +1,123 @@ +# encoding: utf-8 +require "logstash/outputs/s3" +require "stud/temporary" +require "fileutils" +require_relative "../../spec_helper" + +describe LogStash::Outputs::S3::FileRepository do + let(:tags) { ["secret", "service"] } + let(:encoding) { "none" } + let(:temporary_directory) { Stud::Temporary.pathname } + let(:prefix_key) { "a-key" } + + before do + FileUtils.mkdir_p(temporary_directory) + end + + subject { described_class.new(tags, encoding, temporary_directory) } + + it "returns a temporary file" do + subject.get_file(prefix_key) do |file| + expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile) + end + end + + it "returns the same file for the same prefix key" do + file_path = nil + + subject.get_file(prefix_key) do |file| + file_path = file.path + end + + subject.get_file(prefix_key) do |file| + expect(file.path).to eq(file_path) + end + end + + it "returns different file for different prefix keys" do + file_path = nil + + subject.get_file(prefix_key) do |file| + file_path = file.path + end + + subject.get_file("another_prefix_key") do |file| + expect(file.path).not_to eq(file_path) + end + end + + it "allows to get the file factory for a specific prefix" do + subject.get_factory(prefix_key) do |factory| + expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory) + end + end + + it "returns a different file factory for a different prefix keys" do + factory = nil + + subject.get_factory(prefix_key) do |f| + factory = f + end + + subject.get_factory("another_prefix_key") do |f| + expect(factory).not_to eq(f) + end + end + + it "returns the number of prefix keys" do + expect(subject.size).to eq(0) + subject.get_file(prefix_key) { |file| file.write("something") } + expect(subject.size).to eq(1) + end + + it "returns all available keys" do + subject.get_file(prefix_key) { |file| file.write("something") } + expect(subject.keys).to eq([prefix_key]) + end + + it "clean stale factories" do + file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) + expect(file_repository.size).to eq(0) + file_repository.get_factory(prefix_key) do |factory| + factory.current.write("hello") + # force a rotation so we get an empty file that will get stale. + factory.rotate! + end + + file_repository.get_file("another-prefix") { |file| file.write("hello") } + expect(file_repository.size).to eq(2) + try(10) { expect(file_repository.size).to eq(1) } + end +end + + +describe LogStash::Outputs::S3::FileRepository::PrefixedValue do + let(:factory) { spy("factory", :current => file) } + subject { described_class.new(factory, 1) } + + context "#stale?" do + context "the file is empty and older than stale time" do + let(:file) { double("file", :size => 0, :ctime => Time.now - 5) } + + it "returns true" do + expect(subject.stale?).to be_truthy + end + end + + context "when the file has data in it" do + let(:file) { double("file", :size => 200, :ctime => Time.now - 5) } + + it "returns false" do + expect(subject.stale?).to be_falsey + end + end + + context "when the file is not old enough" do + let(:file) { double("file", :size => 0, :ctime => Time.now + 100) } + + it "returns false" do + expect(subject.stale?).to be_falsey + end + end + end +end diff --git a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb new file mode 100644 index 00000000..41d2494c --- /dev/null +++ b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb @@ -0,0 +1,76 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/size_and_time_rotation_policy" +require "logstash/outputs/s3/temporary_file" + +describe LogStash::Outputs::S3::SizeAndTimeRotationPolicy do + let(:file_size) { 10 } + let(:time_file) { 1 } + subject { described_class.new(file_size, time_file) } + + let(:temporary_file) { Stud::Temporary.file } + let(:name) { "foobar" } + let(:content) { "hello" * 1000 } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) } + + it "raises an exception if the `time_file` is set to 0" do + expect { described_class.new(100, 0) }.to raise_error(LogStash::ConfigurationError, /time_file/) + end + + it "raises an exception if the `time_file` is < 0" do + expect { described_class.new(100, -100) }.to raise_error(LogStash::ConfigurationError, /time_file/) + end + + it "raises an exception if the `size_file` is 0" do + expect { described_class.new(0, 100) }.to raise_error(LogStash::ConfigurationError, /size_file/) + end + + it "raises an exception if the `size_file` is < 0" do + expect { described_class.new(-100, 100) }.to raise_error(LogStash::ConfigurationError, /size_file/) + end + + it "returns true if the size on disk is higher than the `file_size`" do + file.write(content) + file.fsync + expect(subject.rotate?(file)).to be_truthy + end + + it "returns false if the size is inferior than the `file_size`" do + expect(subject.rotate?(file)).to be_falsey + end + + context "when the size of the file is superior to 0" do + let(:file_size) { 10000 } + + before :each do + file.write(content) + file.fsync + end + + it "returns true if the file old enough" do + allow(file).to receive(:ctime).and_return(Time.now - time_file * 2) + expect(subject.rotate?(file)).to be_truthy + end + + it "returns false is not old enough" do + allow(file).to receive(:ctime).and_return(Time.now + time_file * 10) + expect(subject.rotate?(file)).to be_falsey + end + end + + context "When the size of the file is 0" do + it "returns false if the file old enough" do + expect(subject.rotate?(file)).to be_falsey + end + + it "returns false is not old enough" do + expect(subject.rotate?(file)).to be_falsey + end + end + + context "#need_periodic?" do + it "return true" do + expect(subject.need_periodic?).to be_truthy + end + end +end diff --git a/spec/outputs/s3/size_rotation_policy_spec.rb b/spec/outputs/s3/size_rotation_policy_spec.rb new file mode 100644 index 00000000..7700bbf9 --- /dev/null +++ b/spec/outputs/s3/size_rotation_policy_spec.rb @@ -0,0 +1,40 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/size_rotation_policy" +require "logstash/outputs/s3/temporary_file" +require "fileutils" + +describe LogStash::Outputs::S3::SizeRotationPolicy do + subject { described_class.new(size_file) } + + let(:temporary_file) { Stud::Temporary.file } + let(:name) { "foobar" } + let(:content) { "hello" * 1000 } + let(:size_file) { 10 } # in bytes + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) } + + it "returns true if the size on disk is higher than the `size_file`" do + file.write(content) + file.fsync + expect(subject.rotate?(file)).to be_truthy + end + + it "returns false if the size is inferior than the `size_file`" do + expect(subject.rotate?(file)).to be_falsey + end + + it "raises an exception if the `size_file` is 0" do + expect { described_class.new(0) }.to raise_error(LogStash::ConfigurationError, /need to be greather than 0/) + end + + it "raises an exception if the `size_file` is < 0" do + expect { described_class.new(-100) }.to raise_error(LogStash::ConfigurationError, /need to be greather than 0/) + end + + context "#need_periodic?" do + it "return false" do + expect(subject.need_periodic?).to be_falsey + end + end + +end diff --git a/spec/outputs/s3/temporary_file_factory_spec.rb b/spec/outputs/s3/temporary_file_factory_spec.rb new file mode 100644 index 00000000..551e7cbe --- /dev/null +++ b/spec/outputs/s3/temporary_file_factory_spec.rb @@ -0,0 +1,85 @@ +# encoding: utf-8 +require "logstash/outputs/s3/temporary_file_factory" +require "logstash/outputs/s3/temporary_file" +require "stud/temporary" +require "fileutils" + +describe LogStash::Outputs::S3::TemporaryFileFactory do + let(:prefix) { "foobar" } + let(:tags) { [] } + let(:temporary_directory) { Stud::Temporary.pathname } + + before do + FileUtils.mkdir_p(temporary_directory) + end + + subject { described_class.new(prefix, tags, encoding, temporary_directory) } + + shared_examples "file factory" do + it "creates the file on disk" do + expect(File.exist?(subject.current.path)).to be_truthy + end + + it "create a temporary file when initialized" do + expect(subject.current).to be_kind_of(LogStash::Outputs::S3::TemporaryFile) + end + + it "create a file in the right format" do + expect(subject.current.path).to match(extension) + end + + it "allow to rotate the file" do + file_path = subject.current.path + expect(subject.rotate!.path).not_to eq(file_path) + end + + it "increments the part name on rotation" do + expect(subject.current.path).to match(/part0/) + expect(subject.rotate!.path).to match(/part1/) + end + + it "includes the date" do + n = Time.now + expect(subject.current.path).to match(/ls.s3.#{Socket.gethostname}.#{n.strftime("%Y-%m-%dT")}\d+\.\d+\./) + end + + it "include the file key in the path" do + file = subject.current + expect(file.path).to match(/#{file.key}/) + end + + it "create a unique directory in the temporary directory for each file" do + uuid = "hola" + expect(SecureRandom).to receive(:uuid).and_return(uuid) + expect(subject.current.path).to include(uuid) + end + + context "with tags supplied" do + let(:tags) { ["secret", "service"] } + + it "adds tags to the filename" do + expect(subject.current.path).to match(/tag_#{tags.join('.')}.part/) + end + end + + context "without tags" do + it "doesn't add tags to the filename" do + expect(subject.current.path).not_to match(/tag_/) + end + end + end + + context "when gzip" do + let(:encoding) { "gzip" } + let(:extension) { /\.txt.gz$/ } + + include_examples "file factory" + end + + context "when encoding set to `none`" do + let(:encoding) { "none" } + let(:extension) { /\.txt$/ } + + include_examples "file factory" + end +end diff --git a/spec/outputs/s3/temporary_file_spec.rb b/spec/outputs/s3/temporary_file_spec.rb new file mode 100644 index 00000000..46f4fe9f --- /dev/null +++ b/spec/outputs/s3/temporary_file_spec.rb @@ -0,0 +1,46 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/temporary_file" +require "stud/temporary" +require "fileutils" +require "securerandom" + +describe LogStash::Outputs::S3::TemporaryFile do + let(:content) { "hello world" } + let(:key) { "foo" } + let(:uuid) { SecureRandom.uuid } + let(:temporary_file) { ::File.open(::File.join(temporary_directory, uuid, key), "w+") } + let(:temporary_directory) { Stud::Temporary.directory } + + before :each do + FileUtils.mkdir_p(::File.join(temporary_directory, uuid)) + end + + subject { described_class.new(key, temporary_file) } + + it "returns the key of the file" do + expect(subject.key).to eq(key) + end + + it "saves content to a file" do + subject.write(content) + subject.close + expect(File.read(subject.path).strip).to eq(content) + end + + it "deletes a file" do + expect(File.exist?(subject.path)).to be_truthy + subject.delete! + expect(File.exist?(subject.path)).to be_falsey + end + + described_class::DELEGATES_METHODS.each do |method_name| + it "delegates method `#{method_name}` to file descriptor" do + expect(subject.respond_to?(method_name)).to be_truthy + end + end + + it "returns the creation time" do + expect(subject.ctime).to be < Time.now + 0.5 + end +end diff --git a/spec/outputs/s3/time_rotation_policy_spec.rb b/spec/outputs/s3/time_rotation_policy_spec.rb new file mode 100644 index 00000000..8accce72 --- /dev/null +++ b/spec/outputs/s3/time_rotation_policy_spec.rb @@ -0,0 +1,55 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/time_rotation_policy" +require "logstash/outputs/s3/temporary_file" + +describe LogStash::Outputs::S3::TimeRotationPolicy do + subject { described_class.new(max_time) } + + let(:max_time) { 1 } + let(:temporary_file) { Stud::Temporary.file } + let(:name) { "foobar" } + let(:content) { "hello" * 1000 } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) } + + it "raises an exception if the `file_time` is set to 0" do + expect { described_class.new(0) }.to raise_error(LogStash::ConfigurationError, /`time_file` need to be greather than 0/) + end + + it "raises an exception if the `file_time` is < 0" do + expect { described_class.new(-100) }.to raise_error(LogStash::ConfigurationError, /`time_file` need to be greather than 0/) + end + + context "when the size of the file is superior to 0" do + before :each do + file.write(content) + file.fsync + end + + it "returns true if the file old enough" do + allow(file).to receive(:ctime).and_return(Time.now - max_time * 2) + expect(subject.rotate?(file)).to be_truthy + end + + it "returns false is not old enough" do + expect(subject.rotate?(file)).to be_falsey + end + end + + context "When the size of the file is 0" do + it "returns false if the file old enough" do + allow(file).to receive(:ctime).and_return(Time.now - max_time * 2) + expect(subject.rotate?(file)).to be_falsey + end + + it "returns false is not old enough" do + expect(subject.rotate?(file)).to be_falsey + end + end + + context "#need_periodic?" do + it "return false" do + expect(subject.need_periodic?).to be_truthy + end + end +end diff --git a/spec/outputs/s3/uploader_spec.rb b/spec/outputs/s3/uploader_spec.rb new file mode 100644 index 00000000..5613289a --- /dev/null +++ b/spec/outputs/s3/uploader_spec.rb @@ -0,0 +1,56 @@ +# Encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/uploader" +require "logstash/outputs/s3/temporary_file" +require "aws-sdk" +require "stud/temporary" + +describe LogStash::Outputs::S3::Uploader do + let(:logger) { spy(:logger ) } + let(:max_upload_workers) { 1 } + let(:bucket_name) { "foobar-bucket" } + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) } + let(:temporary_file) { Stud::Temporary.file } + let(:key) { "foobar" } + let(:upload_options) { {} } + let(:threadpool) do + Concurrent::ThreadPoolExecutor.new({ + :min_threads => 1, + :max_threads => 8, + :max_queue => 1, + :fallback_policy => :caller_runs + }) + end + + let(:file) do + f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file) + f.write("random content") + f.fsync + f + end + + subject { described_class.new(bucket, logger, threadpool) } + + it "upload file to the s3 bucket" do + subject.upload(file) + end + + it "execute a callback when the upload is complete" do + callback = proc { |f| } + + expect(callback).to receive(:call).with(file) + subject.upload(file, { :on_complete => callback }) + end + + it "retries errors indefinitively" do + s3 = double("s3").as_null_object + + expect(logger).to receive(:error).with(any_args).once + expect(bucket).to receive(:object).with(file.key).and_return(s3).twice + expect(s3).to receive(:upload_file).with(any_args).and_raise(StandardError) + expect(s3).to receive(:upload_file).with(any_args).and_return(true) + + subject.upload(file) + end +end diff --git a/spec/outputs/s3/writable_directory_validator_spec.rb b/spec/outputs/s3/writable_directory_validator_spec.rb new file mode 100644 index 00000000..96762177 --- /dev/null +++ b/spec/outputs/s3/writable_directory_validator_spec.rb @@ -0,0 +1,40 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/writable_directory_validator" +require "stud/temporary" + +describe LogStash::Outputs::S3::WritableDirectoryValidator do + let(:temporary_directory) { File.join(Stud::Temporary.directory, Time.now.to_i.to_s) } + + subject { described_class } + + context "when the directory doesn't exists" do + it "creates the directory" do + expect(Dir.exist?(temporary_directory)).to be_falsey + expect(subject.valid?(temporary_directory)).to be_truthy + expect(Dir.exist?(temporary_directory)).to be_truthy + end + end + + context "when the directory exist" do + before do + FileUtils.mkdir_p(temporary_directory) + end + + it "doesn't change the directory" do + expect(Dir.exist?(temporary_directory)).to be_truthy + expect(subject.valid?(temporary_directory)).to be_truthy + expect(Dir.exist?(temporary_directory)).to be_truthy + end + end + + it "return false if the directory is not writable" do + expect(::File).to receive(:writable?).with(temporary_directory).and_return(false) + expect(subject.valid?(temporary_directory)).to be_falsey + end + + it "return true if the directory is writable" do + expect(::File).to receive(:writable?).with(temporary_directory).and_return(true) + expect(subject.valid?(temporary_directory)).to be_truthy + end +end diff --git a/spec/outputs/s3/write_bucket_permission_validator_spec.rb b/spec/outputs/s3/write_bucket_permission_validator_spec.rb new file mode 100644 index 00000000..7f36f39f --- /dev/null +++ b/spec/outputs/s3/write_bucket_permission_validator_spec.rb @@ -0,0 +1,38 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3/write_bucket_permission_validator" +require "aws-sdk" + +describe LogStash::Outputs::S3::WriteBucketPermissionValidator do + let(:bucket_name) { "foobar" } + let(:obj) { double("s3_object") } + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) } + + subject { described_class } + + before do + expect(bucket).to receive(:object).with(any_args).and_return(obj) + end + + context "when permissions are sufficient" do + it "returns true" do + expect(obj).to receive(:upload_file).with(any_args).and_return(true) + expect(obj).to receive(:delete).and_return(true) + expect(subject.valid?(bucket)).to be_truthy + end + + it "hides delete errors" do + expect(obj).to receive(:upload_file).with(any_args).and_return(true) + expect(obj).to receive(:delete).and_raise(StandardError) + expect(subject.valid?(bucket)).to be_truthy + end + end + + context "when permission aren't sufficient" do + it "returns false" do + expect(obj).to receive(:upload_file).with(any_args).and_raise(StandardError) + expect(subject.valid?(bucket)).to be_falsey + end + end +end diff --git a/spec/outputs/s3_spec.rb b/spec/outputs/s3_spec.rb index 77803174..641707d0 100644 --- a/spec/outputs/s3_spec.rb +++ b/spec/outputs/s3_spec.rb @@ -1,371 +1,78 @@ # encoding: utf-8 -require "logstash/devutils/rspec/spec_helper" require "logstash/outputs/s3" +require "logstash/event" require "logstash/codecs/line" -require "logstash/pipeline" -require "aws-sdk" -require "fileutils" -require_relative "../supports/helpers" +require "stud/temporary" describe LogStash::Outputs::S3 do + let(:prefix) { "super/%{server}" } + let(:region) { "us-east-1" } + let(:bucket_name) { "mybucket" } + let(:options) { { "region" => region, "bucket" => bucket_name, "prefix" => prefix } } + let(:client) { Aws::S3::Client.new(stub_responses: true) } + let(:mock_bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) } + let(:event) { LogStash::Event.new({ "server" => "overwatch" }) } + let(:event_encoded) { "super hype" } + let(:events_and_encoded) { { event => event_encoded } } + + subject { described_class.new(options) } + before do - # We stub all the calls from S3, for more information see: - # http://ruby.awsblog.com/post/Tx2SU6TYJWQQLC3/Stubbing-AWS-Responses - AWS.stub! - Thread.abort_on_exception = true + allow(subject).to receive(:bucket_resource).and_return(mock_bucket) + allow(LogStash::Outputs::S3::WriteBucketPermissionValidator).to receive(:valid?).with(mock_bucket).and_return(true) end - let(:minimal_settings) { { "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "my-bucket" } } - - describe "configuration" do - let!(:config) { { "region" => "sa-east-1" } } - + context "#register configuration validation" do describe "signature version" do it "should set the signature version if specified" do - s3 = LogStash::Outputs::S3.new(config.merge({ "signature_version" => 'v4' })) - expect(s3.full_options[:s3_signature_version]).to eql('v4') + ["v2", "v4"].each do |version| + s3 = described_class.new(options.merge({ "signature_version" => version })) + expect(s3.full_options).to include(:s3_signature_version => version) + end end it "should omit the option completely if not specified" do - s3 = LogStash::Outputs::S3.new(config) + s3 = described_class.new(options) expect(s3.full_options.has_key?(:s3_signature_version)).to eql(false) end end - end - - describe "#register" do - it "should create the tmp directory if it doesn't exist" do - temporary_directory = Stud::Temporary.pathname("temporary_directory") - - config = { - "access_key_id" => "1234", - "secret_access_key" => "secret", - "bucket" => "logstash", - "size_file" => 10, - "temporary_directory" => temporary_directory - } - - s3 = LogStash::Outputs::S3.new(config) - allow(s3).to receive(:test_s3_write) - s3.register - - expect(Dir.exist?(temporary_directory)).to eq(true) - s3.close - FileUtils.rm_r(temporary_directory) - end - - it "should raise a ConfigurationError if the prefix contains one or more '\^`><' characters" do - config = { - "prefix" => "`no\><^" - } - - s3 = LogStash::Outputs::S3.new(config) - - expect { - s3.register - }.to raise_error(LogStash::ConfigurationError) - end - end - - describe "#generate_temporary_filename" do - before do - allow(Socket).to receive(:gethostname) { "logstash.local" } - end - - it "should add tags to the filename if present" do - config = minimal_settings.merge({ "tags" => ["elasticsearch", "logstash", "kibana"], "temporary_directory" => "/tmp/logstash"}) - s3 = LogStash::Outputs::S3.new(config) - expect(s3.get_temporary_filename).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.tag_#{config["tags"].join("\.")}\.part0\.txt\Z/) - end - - it "should not add the tags to the filename" do - config = minimal_settings.merge({ "tags" => [], "temporary_directory" => "/tmp/logstash" }) - s3 = LogStash::Outputs::S3.new(config) - expect(s3.get_temporary_filename(3)).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.part3\.txt\Z/) - end - - it "normalized the temp directory to include the trailing slash if missing" do - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash" })) - expect(s3.get_temporary_filename).to match(/^ls\.s3\.logstash\.local\.\d{4}-\d{2}\-\d{2}T\d{2}\.\d{2}\.part0\.txt\Z/) - end - end - - describe "#write_on_bucket" do - let!(:fake_data) { Stud::Temporary.file } - - let(:fake_bucket) do - s3 = double('S3Object') - allow(s3).to receive(:write) - s3 - end - - it "should prefix the file on the bucket if a prefix is specified" do - prefix = "my-prefix" - - config = minimal_settings.merge({ - "prefix" => prefix, - "bucket" => "my-bucket" - }) - - expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with("#{prefix}#{File.basename(fake_data)}") { fake_bucket } - - s3 = LogStash::Outputs::S3.new(config) - allow(s3).to receive(:test_s3_write) - s3.register - s3.write_on_bucket(fake_data) - end - - it 'should use the same local filename if no prefix is specified' do - config = minimal_settings.merge({ - "bucket" => "my-bucket" - }) - - expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with(File.basename(fake_data)) { fake_bucket } - - s3 = LogStash::Outputs::S3.new(minimal_settings) - allow(s3).to receive(:test_s3_write) - s3.register - s3.write_on_bucket(fake_data) - end - end - - describe "#write_events_to_multiple_files?" do - it 'returns true if the size_file is != 0 ' do - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 200 })) - expect(s3.write_events_to_multiple_files?).to eq(true) - end - - it 'returns false if size_file is zero or not set' do - s3 = LogStash::Outputs::S3.new(minimal_settings) - expect(s3.write_events_to_multiple_files?).to eq(false) - end - end - - describe "#write_to_tempfile" do - it "should append the event to a file" do - Stud::Temporary.file("logstash", "a+") do |tmp| - s3 = LogStash::Outputs::S3.new(minimal_settings) - allow(s3).to receive(:test_s3_write) - s3.register - s3.tempfile = tmp - s3.write_to_tempfile("test-write") - tmp.rewind - expect(tmp.read).to eq("test-write") - end - end - end - - describe "#rotate_events_log" do - - context "having a single worker" do - let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024 })) } - - before(:each) do - s3.register - end - - it "returns true if the tempfile is over the file_size limit" do - Stud::Temporary.file do |tmp| - allow(tmp).to receive(:size) { 2024001 } - - s3.tempfile = tmp - expect(s3.rotate_events_log?).to be(true) - end - end - - it "returns false if the tempfile is under the file_size limit" do - Stud::Temporary.file do |tmp| - allow(tmp).to receive(:size) { 100 } - - s3.tempfile = tmp - expect(s3.rotate_events_log?).to eq(false) - end - end - end - context "having periodic rotations" do - let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024, "time_file" => 6e-10 })) } - let(:tmp) { Tempfile.new('s3_rotation_temp_file') } + describe "temporary directory" do + let(:temporary_directory) { Stud::Temporary.pathname } + let(:options) { super.merge({ "temporary_directory" => temporary_directory }) } - before(:each) do - s3.tempfile = tmp - s3.register + it "creates the directory when it doesn't exist" do + expect(Dir.exist?(temporary_directory)).to be_falsey + subject.register + expect(Dir.exist?(temporary_directory)).to be_truthy end - after(:each) do - s3.close - tmp.close - tmp.unlink - end - - it "raises no error when periodic rotation happen" do - 1000.times do - expect { s3.rotate_events_log? }.not_to raise_error - end + it "raises an error if we cannot write to the directory" do + expect(LogStash::Outputs::S3::WritableDirectoryValidator).to receive(:valid?).with(temporary_directory).and_return(false) + expect { subject.register }.to raise_error(LogStash::ConfigurationError) end end - end - - describe "#move_file_to_bucket" do - subject { LogStash::Outputs::S3.new(minimal_settings) } - - it "should always delete the source file" do - tmp = Stud::Temporary.file - allow(File).to receive(:zero?).and_return(true) - expect(File).to receive(:delete).with(tmp) - - subject.move_file_to_bucket(tmp) + it "validates the prefix" do + s3 = described_class.new(options.merge({ "prefix" => "`no\><^" })) + expect { s3.register }.to raise_error(LogStash::ConfigurationError) end - it 'should not upload the file if the size of the file is zero' do - temp_file = Stud::Temporary.file - allow(temp_file).to receive(:zero?).and_return(true) - - expect(subject).not_to receive(:write_on_bucket) - subject.move_file_to_bucket(temp_file) - end - - it "should upload the file if the size > 0" do - tmp = Stud::Temporary.file - - allow(File).to receive(:zero?).and_return(false) - expect(subject).to receive(:write_on_bucket) - - subject.move_file_to_bucket(tmp) - end - end - - describe "#restore_from_crashes" do - it "read the temp directory and upload the matching file to s3" do - s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash/" })) - - expect(Dir).to receive(:[]).with("/tmp/logstash/*.txt").and_return(["/tmp/logstash/01.txt"]) - expect(s3).to receive(:move_file_to_bucket_async).with("/tmp/logstash/01.txt") - - - s3.restore_from_crashes - end - end - - describe "#receive" do - it "should send the event through the codecs" do - data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} - event = LogStash::Event.new(data) - - expect_any_instance_of(LogStash::Codecs::Line).to receive(:encode).with(event) - - s3 = LogStash::Outputs::S3.new(minimal_settings) - allow(s3).to receive(:test_s3_write) + it "allow to not validate credentials" do + s3 = described_class.new(options.merge({"validate_credentials_on_root_bucket" => false})) + expect(LogStash::Outputs::S3::WriteBucketPermissionValidator).not_to receive(:valid?).with(any_args) s3.register - - s3.receive(event) end end - describe "when rotating the temporary file" do - before { allow(File).to receive(:delete) } - - it "doesn't skip events if using the size_file option" do - Stud::Temporary.directory do |temporary_directory| - size_file = rand(200..20000) - event_count = rand(300..15000) - - config = %Q[ - input { - generator { - count => #{event_count} - } - } - output { - s3 { - access_key_id => "1234" - secret_access_key => "secret" - size_file => #{size_file} - codec => line - temporary_directory => '#{temporary_directory}' - bucket => 'testing' - } - } - ] - - pipeline = LogStash::Pipeline.new(config) - - pipeline_thread = Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? - pipeline_thread.join - - events_written_count = events_in_files(Dir[File.join(temporary_directory, 'ls.*.txt')]) - expect(events_written_count).to eq(event_count) - end - end - - describe "closing" do - let(:options) do - { - "access_key_id" => 1234, - "secret_access_key" => "secret", - "bucket" => "mahbucket" - } - end - subject do - ::LogStash::Outputs::S3.new(options) - end - - before do - subject.register - end - - it "should be clean" do - subject.do_close - end - - it "should remove all worker threads" do - subject.do_close - sleep 1 - expect(subject.upload_workers.map(&:thread).any?(&:alive?)).to be false - end + context "receiving events" do + before do + subject.register end - it "doesn't skip events if using the time_file option", :tag => :slow do - Stud::Temporary.directory do |temporary_directory| - time_file = rand(1..2) - number_of_rotation = rand(2..5) - - config = { - "time_file" => time_file, - "codec" => "line", - "temporary_directory" => temporary_directory, - "bucket" => "testing" - } - - s3 = LogStash::Outputs::S3.new(minimal_settings.merge(config)) - # Make the test run in seconds intead of minutes.. - expect(s3).to receive(:periodic_interval).and_return(time_file) - s3.register - - # Force to have a few files rotation - stop_time = Time.now + (number_of_rotation * time_file) - event_count = 0 - - event = LogStash::Event.new("message" => "Hello World") - - until Time.now > stop_time do - s3.receive(event) - event_count += 1 - end - s3.close - - generated_files = Dir[File.join(temporary_directory, 'ls.*.txt')] - - events_written_count = events_in_files(generated_files) - - # Skew times can affect the number of rotation.. - expect(generated_files.count).to be_within(number_of_rotation).of(number_of_rotation + 1) - expect(events_written_count).to eq(event_count) - end + it "uses `Event#sprintf` for the prefix" do + expect(event).to receive(:sprintf).with(prefix).and_return("super/overwatch") + subject.multi_receive_encoded(events_and_encoded) end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 00000000..36a4a794 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,3 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require_relative "supports/helpers" diff --git a/spec/supports/helpers.rb b/spec/supports/helpers.rb index e34f1ff3..09aa6bc7 100644 --- a/spec/supports/helpers.rb +++ b/spec/supports/helpers.rb @@ -1,14 +1,38 @@ -def delete_matching_keys_on_bucket(prefix) - s3_object.buckets[minimal_settings["bucket"]].objects.with_prefix(prefix).each do |obj| - obj.delete +# encoding: utf-8 +shared_context "setup plugin" do + let(:temporary_directory) { Stud::Temporary.pathname } + + let(:bucket) { ENV["AWS_LOGSTASH_TEST_BUCKET"] } + let(:access_key_id) { ENV["AWS_ACCESS_KEY_ID"] } + let(:secret_access_key) { ENV["AWS_SECRET_ACCESS_KEY"] } + let(:size_file) { 100 } + let(:time_file) { 100 } + let(:tags) { [] } + let(:prefix) { "home" } + let(:region) { "us-east-1" } + + let(:main_options) do + { + "bucket" => bucket, + "prefix" => prefix, + "temporary_directory" => temporary_directory, + "access_key_id" => access_key_id, + "secret_access_key" => secret_access_key, + "size_file" => size_file, + "time_file" => time_file, + "region" => region, + "tags" => [] + } end -end -def key_exists_on_bucket?(key) - s3_object.buckets[minimal_settings["bucket"]].objects[key].exists? -end + let(:client_credentials) { Aws::Credentials.new(access_key_id, secret_access_key) } + let(:bucket_resource) { Aws::S3::Bucket.new(bucket, { :credentials => client_credentials, :region => region }) } -def events_in_files(files) - files.collect { |file| File.foreach(file).count }.inject(&:+) + subject { LogStash::Outputs::S3.new(options) } end +def clean_remote_files(prefix = "") + bucket_resource.objects(:prefix => prefix).each do |object| + object.delete + end +end From 527616742498c16c468049e23a32eb91852a37b8 Mon Sep 17 00:00:00 2001 From: Nevins Bartolomeo Date: Tue, 8 Nov 2016 16:08:55 -0500 Subject: [PATCH 2/8] making changes sugguested in https://github.com/logstash-plugins/logstash-output-s3/pull/102 --- lib/logstash/outputs/s3.rb | 8 +++--- lib/logstash/outputs/s3/file_repository.rb | 25 +++++++++++++------ .../outputs/s3/temporary_file_factory.rb | 4 +-- .../outputs/s3/time_rotation_policy.rb | 2 +- .../s3/size_and_time_rotation_policy_spec.rb | 2 +- .../outputs/s3/temporary_file_factory_spec.rb | 4 +-- spec/outputs/s3/time_rotation_policy_spec.rb | 4 +-- 7 files changed, 29 insertions(+), 20 deletions(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 9492b6a1..05ec64fd 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -44,7 +44,7 @@ ## Both time_file and size_file settings can trigger a log "file rotation" ## A log rotation pushes the current log "part" to s3 and deleted from local temporary storage. # -## If you specify BOTH size_file and time_file then it will create file for each tag (if specified). +## If you specify BOTH size_file and time_file then it will create file for each tag (if specified). ## When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered. ## ## If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created. @@ -136,7 +136,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base config :prefix, :validate => :string, :default => '' # Specify how many workers to use to upload the files to S3 - config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.25).round + config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).round # Number of items we can keep in the local queue before uploading them config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).round @@ -171,7 +171,7 @@ def register # to prepare for the new config validation that will be part of the core so the core can # be moved easily. unless @prefix.empty? - if !PathValidator.valid?(prefix) + if !PathValidator.valid?(prefix) raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}" end end @@ -180,7 +180,7 @@ def register raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}" end - if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.valid?(bucket_resource) + if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.valid?(bucket_resource) raise LogStash::ConfigurationError, "Logstash must have the privileges to write to root bucket `#{@bucket}`, check you credentials or your permissions." end diff --git a/lib/logstash/outputs/s3/file_repository.rb b/lib/logstash/outputs/s3/file_repository.rb index 3da7576f..bfaa71bc 100644 --- a/lib/logstash/outputs/s3/file_repository.rb +++ b/lib/logstash/outputs/s3/file_repository.rb @@ -1,9 +1,11 @@ # encoding: utf-8 +require "java" require "concurrent" -require "concurrent/map" require "concurrent/timer_task" require "logstash/util" +java_import "java.util.concurrent.ConcurrentHashMap" + module LogStash module Outputs class S3 @@ -28,6 +30,10 @@ def with_lock def stale? with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) } end + + def apply(prefix) + return self + end end def initialize(tags, encoding, temporary_directory, @@ -35,7 +41,7 @@ def initialize(tags, encoding, temporary_directory, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) # The path need to contains the prefix so when we start # logtash after a crash we keep the remote structure - @prefixed_factories = Concurrent::Map.new + @prefixed_factories = ConcurrentHashMap.new @tags = tags @encoding = encoding @@ -48,19 +54,20 @@ def initialize(tags, encoding, temporary_directory, end def keys - @prefixed_factories.keys + arr = [] + @prefixed_factories.keySet.each {|k| arr << k} + arr end def each_files - @prefixed_factories.each_value do |prefixed_file| + @prefixed_factories.values do |prefixed_file| prefixed_file.with_lock { |factory| yield factory.current } end end # Return the file factory def get_factory(prefix_key) - @prefixed_factories.compute_if_absent(prefix_key) { PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) } - .with_lock { |factory| yield factory } + @prefixed_factories.computeIfAbsent(prefix_key, PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)).with_lock { |factory| yield factory } end def get_file(prefix_key) @@ -79,8 +86,10 @@ def start_stale_sweeper @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do LogStash::Util.set_thread_name("S3, Stale factory sweeper") - @prefixed_factories.each_pair do |k, v| - @prefixed_factories.delete_pair(k, v) if v.stale? + @prefixed_factories.entrySet.each do |s| + if s.getValue.stale? + @prefixed_factories.remove(s.getKey, s.getValue) + end end end diff --git a/lib/logstash/outputs/s3/temporary_file_factory.rb b/lib/logstash/outputs/s3/temporary_file_factory.rb index a93932e9..a7d8205b 100644 --- a/lib/logstash/outputs/s3/temporary_file_factory.rb +++ b/lib/logstash/outputs/s3/temporary_file_factory.rb @@ -11,7 +11,7 @@ class S3 # # The local structure will look like this. # - # ///ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz + # ///ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz # # Since the UUID should be fairly unique I can destroy the whole path when an upload is complete. # I do not have to mess around to check if the other directory have file in it before destroying them. @@ -59,7 +59,7 @@ def current_time end def generate_name - filename = "ls.s3.#{Socket.gethostname}.#{current_time}" + filename = "ls.s3.#{SecureRandom.uuid}.#{current_time}" if tags.size > 0 "#{filename}.tag_#{tags.join('.')}.part#{counter}.#{extension}" diff --git a/lib/logstash/outputs/s3/time_rotation_policy.rb b/lib/logstash/outputs/s3/time_rotation_policy.rb index e772d983..7289dcb9 100644 --- a/lib/logstash/outputs/s3/time_rotation_policy.rb +++ b/lib/logstash/outputs/s3/time_rotation_policy.rb @@ -14,7 +14,7 @@ def initialize(time_file) end def rotate?(file) - file.size > 0 && Time.now - file.ctime >= time_file + file.size > 0 && ((Time.now - file.ctime)/60).floor >= time_file end def need_periodic? diff --git a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb index 41d2494c..e91f562d 100644 --- a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb +++ b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb @@ -48,7 +48,7 @@ end it "returns true if the file old enough" do - allow(file).to receive(:ctime).and_return(Time.now - time_file * 2) + allow(file).to receive(:ctime).and_return(Time.now - (time_file * 2 * 60) ) expect(subject.rotate?(file)).to be_truthy end diff --git a/spec/outputs/s3/temporary_file_factory_spec.rb b/spec/outputs/s3/temporary_file_factory_spec.rb index 551e7cbe..25b7c5a9 100644 --- a/spec/outputs/s3/temporary_file_factory_spec.rb +++ b/spec/outputs/s3/temporary_file_factory_spec.rb @@ -40,7 +40,7 @@ it "includes the date" do n = Time.now - expect(subject.current.path).to match(/ls.s3.#{Socket.gethostname}.#{n.strftime("%Y-%m-%dT")}\d+\.\d+\./) + expect(subject.current.path).to include(n.strftime("%Y-%m-%dT")) end it "include the file key in the path" do @@ -50,7 +50,7 @@ it "create a unique directory in the temporary directory for each file" do uuid = "hola" - expect(SecureRandom).to receive(:uuid).and_return(uuid) + expect(SecureRandom).to receive(:uuid).and_return(uuid).twice expect(subject.current.path).to include(uuid) end diff --git a/spec/outputs/s3/time_rotation_policy_spec.rb b/spec/outputs/s3/time_rotation_policy_spec.rb index 8accce72..a6a69c35 100644 --- a/spec/outputs/s3/time_rotation_policy_spec.rb +++ b/spec/outputs/s3/time_rotation_policy_spec.rb @@ -27,7 +27,7 @@ end it "returns true if the file old enough" do - allow(file).to receive(:ctime).and_return(Time.now - max_time * 2) + allow(file).to receive(:ctime).and_return(Time.now - (max_time * 2 * 60)) expect(subject.rotate?(file)).to be_truthy end @@ -38,7 +38,7 @@ context "When the size of the file is 0" do it "returns false if the file old enough" do - allow(file).to receive(:ctime).and_return(Time.now - max_time * 2) + allow(file).to receive(:ctime).and_return(Time.now - (max_time * 2 * 60)) expect(subject.rotate?(file)).to be_falsey end From 171fd1ef95aaf5c02a87449092b01b370595e94f Mon Sep 17 00:00:00 2001 From: Nevins Bartolomeo Date: Tue, 8 Nov 2016 16:15:59 -0500 Subject: [PATCH 3/8] minor grammer fixes --- lib/logstash/outputs/s3.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 05ec64fd..5eafedf1 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -162,7 +162,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # The default strategy is to check for both size and time, the first one to match will rotate the file. config :rotation_strategy, :validate => ["size_and_time", "size", "time"], :default => "size_and_time" - # The common use case is to define permission on the root bucket and give Logstash full access to write his logs. + # The common use case is to define permission on the root bucket and give Logstash full access to write its logs. # In some circonstances you need finer grained permission on subfolder, this allow you to disable the check at startup. config :validate_credentials_on_root_bucket, :validate => :boolean, :default => true @@ -185,7 +185,7 @@ def register end if @time_file.nil? && @size_file.nil? || @size_file == 0 && @time_file == 0 - raise LogStash::ConfigurationError, "Logstash need at `time_file` or `size_file` greather than 0" + raise LogStash::ConfigurationError, "The S3 plugin must have at least one of time_file or size_file set to a value greater than 0" end @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @@ -216,7 +216,7 @@ def multi_receive_encoded(events_and_encoded) begin @file_repository.get_file(prefix_key) { |file| file.write(encoded) } - # The output should stop accepting new events coming in, since he cannot do anything with them anymore. + # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) @@ -351,7 +351,7 @@ def restore_from_crash key_parts = Pathname.new(file).relative_path_from(temp_folder_path).to_s.split(::File::SEPARATOR) temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r")) - @logger.debug("Recover from crash and uploading", :file => temp_file.path) + @logger.debug("Recovering from crash and uploading", :file => temp_file.path) @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file)) end end From ccd9e5c57ba11699e0e08bf9af6f18d353edb09f Mon Sep 17 00:00:00 2001 From: Nevins Bartolomeo Date: Thu, 10 Nov 2016 12:54:54 -0500 Subject: [PATCH 4/8] simpler solutions for correcting seconds vs minutes --- lib/logstash/outputs/s3/time_rotation_policy.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/s3/time_rotation_policy.rb b/lib/logstash/outputs/s3/time_rotation_policy.rb index 7289dcb9..9fb8adc0 100644 --- a/lib/logstash/outputs/s3/time_rotation_policy.rb +++ b/lib/logstash/outputs/s3/time_rotation_policy.rb @@ -10,11 +10,11 @@ def initialize(time_file) raise LogStash::ConfigurationError, "`time_file` need to be greather than 0" end - @time_file = time_file + @time_file = time_file * 60 end def rotate?(file) - file.size > 0 && ((Time.now - file.ctime)/60).floor >= time_file + file.size > 0 && (Time.now - file.ctime) >= time_file end def need_periodic? From 1c50b942cf4fb96de7df2b4a9a84c84b384edada Mon Sep 17 00:00:00 2001 From: Nevins Bartolomeo Date: Thu, 10 Nov 2016 12:56:20 -0500 Subject: [PATCH 5/8] fixing bug where stale files where removed but never deleted --- lib/logstash/outputs/s3/file_repository.rb | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/logstash/outputs/s3/file_repository.rb b/lib/logstash/outputs/s3/file_repository.rb index bfaa71bc..b49e13f1 100644 --- a/lib/logstash/outputs/s3/file_repository.rb +++ b/lib/logstash/outputs/s3/file_repository.rb @@ -55,12 +55,12 @@ def initialize(tags, encoding, temporary_directory, def keys arr = [] - @prefixed_factories.keySet.each {|k| arr << k} + @prefixed_factories.keys.each {|k| arr << k} arr end def each_files - @prefixed_factories.values do |prefixed_file| + @prefixed_factories.elements.each do |prefixed_file| prefixed_file.with_lock { |factory| yield factory.current } end end @@ -82,15 +82,18 @@ def size @prefixed_factories.size end + def remove_stale(k, v) + if v.stale? + @prefixed_factories.remove(k, v) + v.with_lock{ |factor| factor.current.delete!} + end + end + def start_stale_sweeper @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do LogStash::Util.set_thread_name("S3, Stale factory sweeper") - @prefixed_factories.entrySet.each do |s| - if s.getValue.stale? - @prefixed_factories.remove(s.getKey, s.getValue) - end - end + @prefixed_factories.forEach{|k,v| remove_stale(k,v)} end @stale_sweeper.execute From 4025febd07fa7af2195d7aff4d7683cbdda01ef1 Mon Sep 17 00:00:00 2001 From: Nevins Bartolomeo Date: Fri, 11 Nov 2016 16:34:01 -0500 Subject: [PATCH 6/8] cleanup, also fixing a bug where was causing creation of large number of unused files/directories --- lib/logstash/outputs/s3.rb | 8 ++--- lib/logstash/outputs/s3/file_repository.rb | 34 ++++++++++++------ lib/logstash/outputs/s3/temporary_file.rb | 10 ++++-- .../outputs/s3/temporary_file_factory.rb | 19 +++++----- spec/outputs/s3/file_repository_spec.rb | 36 +++++++++++++++---- .../s3/size_and_time_rotation_policy_spec.rb | 3 +- spec/outputs/s3/size_rotation_policy_spec.rb | 3 +- spec/outputs/s3/temporary_file_spec.rb | 2 +- spec/outputs/s3/time_rotation_policy_spec.rb | 3 +- spec/outputs/s3/uploader_spec.rb | 3 +- 10 files changed, 84 insertions(+), 37 deletions(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 5eafedf1..fb4a5df0 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -112,7 +112,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # If you define file_size you have a number of files in consideration of the section and the current tag. # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, # for now the only thing this plugin can do is to put the file when logstash restart. - config :time_file, :validate => :number, :default => 15 * 60 + config :time_file, :validate => :number, :default => 15 ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false". ## This is hack for not destroy the new files after restoring the initial files. @@ -136,10 +136,10 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base config :prefix, :validate => :string, :default => '' # Specify how many workers to use to upload the files to S3 - config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).round + config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).ceil # Number of items we can keep in the local queue before uploading them - config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).round + config :upload_queue_size, :validate => :number, :default => 2 * (Concurrent.processor_count * 0.25).ceil # The version of the S3 signature hash to use. Normally uses the internal client default, can be explicitly # specified here @@ -349,7 +349,7 @@ def restore_from_crash Dir.glob(::File.join(@temporary_directory, "**/*")) do |file| if ::File.file?(file) key_parts = Pathname.new(file).relative_path_from(temp_folder_path).to_s.split(::File::SEPARATOR) - temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r")) + temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r"), key_parts.slice(0, 1)) @logger.debug("Recovering from crash and uploading", :file => temp_file.path) @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file)) diff --git a/lib/logstash/outputs/s3/file_repository.rb b/lib/logstash/outputs/s3/file_repository.rb index b49e13f1..1b1cd5c0 100644 --- a/lib/logstash/outputs/s3/file_repository.rb +++ b/lib/logstash/outputs/s3/file_repository.rb @@ -4,7 +4,7 @@ require "concurrent/timer_task" require "logstash/util" -java_import "java.util.concurrent.ConcurrentHashMap" +ConcurrentHashMap = java.util.concurrent.ConcurrentHashMap module LogStash module Outputs @@ -34,6 +34,23 @@ def stale? def apply(prefix) return self end + + def delete! + with_lock{ |factory| factory.current.delete! } + end + end + + class FactoryInitializer + def initialize(tags, encoding, temporary_directory, stale_time) + @tags = tags + @encoding = encoding + @temporary_directory = temporary_directory + @stale_time = stale_time + end + + def apply(prefix_key) + PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) + end end def initialize(tags, encoding, temporary_directory, @@ -43,20 +60,15 @@ def initialize(tags, encoding, temporary_directory, # logtash after a crash we keep the remote structure @prefixed_factories = ConcurrentHashMap.new - @tags = tags - @encoding = encoding - @temporary_directory = temporary_directory - - @stale_time = stale_time @sweeper_interval = sweeper_interval + @factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time) + start_stale_sweeper end def keys - arr = [] - @prefixed_factories.keys.each {|k| arr << k} - arr + @prefixed_factories.keySet end def each_files @@ -67,7 +79,7 @@ def each_files # Return the file factory def get_factory(prefix_key) - @prefixed_factories.computeIfAbsent(prefix_key, PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)).with_lock { |factory| yield factory } + @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory } end def get_file(prefix_key) @@ -85,7 +97,7 @@ def size def remove_stale(k, v) if v.stale? @prefixed_factories.remove(k, v) - v.with_lock{ |factor| factor.current.delete!} + v.delete! end end diff --git a/lib/logstash/outputs/s3/temporary_file.rb b/lib/logstash/outputs/s3/temporary_file.rb index a604e084..176db6b3 100644 --- a/lib/logstash/outputs/s3/temporary_file.rb +++ b/lib/logstash/outputs/s3/temporary_file.rb @@ -14,9 +14,10 @@ class TemporaryFile def_delegators :@fd, *DELEGATES_METHODS - def initialize(key, fd) + def initialize(key, fd, temp_path) @fd = fd @key = key + @temp_path = temp_path @created_at = Time.now end @@ -24,6 +25,10 @@ def ctime @created_at end + def temp_path + @temp_path + end + def key @key.gsub(/^\//, "") end @@ -33,7 +38,8 @@ def key # we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as # a sandbox. def delete! - ::FileUtils.rm_rf(path.gsub(/#{Regexp.escape(key)}$/, "")) + @fd.close + ::FileUtils.rm_rf(@temp_path, :secure => true) end def empty? diff --git a/lib/logstash/outputs/s3/temporary_file_factory.rb b/lib/logstash/outputs/s3/temporary_file_factory.rb index a7d8205b..3b34eb9e 100644 --- a/lib/logstash/outputs/s3/temporary_file_factory.rb +++ b/lib/logstash/outputs/s3/temporary_file_factory.rb @@ -31,14 +31,17 @@ def initialize(prefix, tags, encoding, temporary_directory) @tags = tags @encoding = encoding @temporary_directory = temporary_directory + @lock = Mutex.new rotate! end def rotate! - @current = new_file - increment_counter - @current + @lock.synchronize { + @current = new_file + increment_counter + @current + } end private @@ -71,18 +74,18 @@ def generate_name def new_file uuid = SecureRandom.uuid name = generate_name - path = ::File.join(temporary_directory, uuid, prefix) + path = ::File.join(temporary_directory, uuid) key = ::File.join(prefix, name) - FileUtils.mkdir_p(path) + FileUtils.mkdir_p(::File.join(path, prefix)) io = if gzip? - Zlib::GzipWriter.open(::File.join(path, name)) + Zlib::GzipWriter.open(::File.join(path, key)) else - ::File.open(::File.join(path, name), FILE_MODE) + ::File.open(::File.join(path, key), FILE_MODE) end - TemporaryFile.new(key, io) + TemporaryFile.new(key, io, path) end end end diff --git a/spec/outputs/s3/file_repository_spec.rb b/spec/outputs/s3/file_repository_spec.rb index 9b9614fc..49d7c096 100644 --- a/spec/outputs/s3/file_repository_spec.rb +++ b/spec/outputs/s3/file_repository_spec.rb @@ -34,6 +34,22 @@ end end + it "returns the same file for the same dynamic prefix key" do + prefix = "%{type}/%{+YYYY}/%{+MM}/%{+dd}/" + event = LogStash::Event.new({ "type" => "syslog"}) + key = event.sprintf(prefix) + file_path = nil + + + subject.get_file(key) do |file| + file_path = file.path + end + + subject.get_file(key) do |file| + expect(file.path).to eq(file_path) + end + end + it "returns different file for different prefix keys" do file_path = nil @@ -72,21 +88,27 @@ it "returns all available keys" do subject.get_file(prefix_key) { |file| file.write("something") } - expect(subject.keys).to eq([prefix_key]) + expect(subject.keys.toArray).to eq([prefix_key]) end it "clean stale factories" do - file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) - expect(file_repository.size).to eq(0) - file_repository.get_factory(prefix_key) do |factory| + @file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) + expect(@file_repository.size).to eq(0) + path = "" + @file_repository.get_factory(prefix_key) do |factory| factory.current.write("hello") # force a rotation so we get an empty file that will get stale. factory.rotate! + path = factory.current.temp_path end - file_repository.get_file("another-prefix") { |file| file.write("hello") } - expect(file_repository.size).to eq(2) - try(10) { expect(file_repository.size).to eq(1) } + @file_repository.get_file("another-prefix") { |file| file.write("hello") } + expect(@file_repository.size).to eq(2) + @file_repository.keys.each do |k| + puts k + end + try(10) { expect(@file_repository.size).to eq(1) } + expect(File.directory?(path)).to be_falsey end end diff --git a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb index e91f562d..3e822783 100644 --- a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb +++ b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb @@ -8,10 +8,11 @@ let(:time_file) { 1 } subject { described_class.new(file_size, time_file) } + let(:temporary_directory) { Stud::Temporary.pathname } let(:temporary_file) { Stud::Temporary.file } let(:name) { "foobar" } let(:content) { "hello" * 1000 } - let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) } it "raises an exception if the `time_file` is set to 0" do expect { described_class.new(100, 0) }.to raise_error(LogStash::ConfigurationError, /time_file/) diff --git a/spec/outputs/s3/size_rotation_policy_spec.rb b/spec/outputs/s3/size_rotation_policy_spec.rb index 7700bbf9..e8ca74a3 100644 --- a/spec/outputs/s3/size_rotation_policy_spec.rb +++ b/spec/outputs/s3/size_rotation_policy_spec.rb @@ -7,11 +7,12 @@ describe LogStash::Outputs::S3::SizeRotationPolicy do subject { described_class.new(size_file) } + let(:temporary_directory) { Stud::Temporary.directory } let(:temporary_file) { Stud::Temporary.file } let(:name) { "foobar" } let(:content) { "hello" * 1000 } let(:size_file) { 10 } # in bytes - let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) } it "returns true if the size on disk is higher than the `size_file`" do file.write(content) diff --git a/spec/outputs/s3/temporary_file_spec.rb b/spec/outputs/s3/temporary_file_spec.rb index 46f4fe9f..fd88e1e0 100644 --- a/spec/outputs/s3/temporary_file_spec.rb +++ b/spec/outputs/s3/temporary_file_spec.rb @@ -16,7 +16,7 @@ FileUtils.mkdir_p(::File.join(temporary_directory, uuid)) end - subject { described_class.new(key, temporary_file) } + subject { described_class.new(key, temporary_file, temporary_directory) } it "returns the key of the file" do expect(subject.key).to eq(key) diff --git a/spec/outputs/s3/time_rotation_policy_spec.rb b/spec/outputs/s3/time_rotation_policy_spec.rb index a6a69c35..d6f3407a 100644 --- a/spec/outputs/s3/time_rotation_policy_spec.rb +++ b/spec/outputs/s3/time_rotation_policy_spec.rb @@ -7,10 +7,11 @@ subject { described_class.new(max_time) } let(:max_time) { 1 } + let(:temporary_directory) { Stud::Temporary.directory } let(:temporary_file) { Stud::Temporary.file } let(:name) { "foobar" } let(:content) { "hello" * 1000 } - let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file) } + let(:file) { LogStash::Outputs::S3::TemporaryFile.new(name, temporary_file, temporary_directory) } it "raises an exception if the `file_time` is set to 0" do expect { described_class.new(0) }.to raise_error(LogStash::ConfigurationError, /`time_file` need to be greather than 0/) diff --git a/spec/outputs/s3/uploader_spec.rb b/spec/outputs/s3/uploader_spec.rb index 5613289a..16da5427 100644 --- a/spec/outputs/s3/uploader_spec.rb +++ b/spec/outputs/s3/uploader_spec.rb @@ -11,6 +11,7 @@ let(:bucket_name) { "foobar-bucket" } let(:client) { Aws::S3::Client.new(stub_responses: true) } let(:bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) } + let(:temporary_directory) { Stud::Temporary.pathname } let(:temporary_file) { Stud::Temporary.file } let(:key) { "foobar" } let(:upload_options) { {} } @@ -24,7 +25,7 @@ end let(:file) do - f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file) + f = LogStash::Outputs::S3::TemporaryFile.new(key, temporary_file, temporary_directory) f.write("random content") f.fsync f From ff77cde0248bcb27ee61f1cb3918e7e7ae19c70a Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Mon, 14 Nov 2016 10:11:35 -0500 Subject: [PATCH 7/8] make sure the test pass when using java classes --- lib/logstash/outputs/s3.rb | 41 ++++++------ lib/logstash/outputs/s3/file_repository.rb | 6 +- lib/logstash/outputs/s3/patch.rb | 22 +++++++ .../s3/size_and_time_rotation_policy.rb | 2 +- .../outputs/s3/size_rotation_policy.rb | 2 +- lib/logstash/outputs/s3/temporary_file.rb | 24 ++++++- .../outputs/s3/temporary_file_factory.rb | 32 +++++++++- .../outputs/s3/time_rotation_policy.rb | 2 +- lib/logstash/outputs/s3/uploader.rb | 5 +- spec/integration/dynamic_prefix_spec.rb | 2 +- spec/integration/gzip_file_spec.rb | 62 ++++++++++++++++++ spec/integration/gzip_size_rotation_spec.rb | 63 +++++++++++++++++++ spec/integration/restore_from_crash_spec.rb | 4 +- spec/integration/size_rotation_spec.rb | 9 +-- ...based_rotation_with_constant_write_spec.rb | 4 +- ...me_based_rotation_with_stale_write_spec.rb | 13 ++-- .../upload_current_file_on_shutdown_spec.rb | 4 +- spec/outputs/s3/file_repository_spec.rb | 3 +- .../s3/size_and_time_rotation_policy_spec.rb | 4 +- spec/outputs/s3/size_rotation_policy_spec.rb | 4 +- spec/outputs/s3/temporary_file_spec.rb | 6 -- spec/outputs/s3/time_rotation_policy_spec.rb | 8 ++- spec/outputs/s3/uploader_spec.rb | 2 +- spec/outputs/s3_spec.rb | 14 ++++- spec/spec_helper.rb | 3 + 25 files changed, 278 insertions(+), 63 deletions(-) create mode 100644 lib/logstash/outputs/s3/patch.rb create mode 100644 spec/integration/gzip_file_spec.rb create mode 100644 spec/integration/gzip_size_rotation_spec.rb diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index fb4a5df0..23d197c1 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -11,7 +11,10 @@ require "fileutils" require "set" require "pathname" +require "aws-sdk" +require "logstash/outputs/s3/patch" +Aws.eager_autoload! # INFORMATION: # @@ -118,7 +121,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base ## This is hack for not destroy the new files after restoring the initial files. ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket, ## for example if you have single Instance. - config :restore, :validate => :boolean, :default => false + config :restore, :validate => :boolean, :default => true # The S3 canned ACL to use when putting the file. Defaults to "private". config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"], @@ -191,12 +194,13 @@ def register @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory) @rotation = rotation_strategy - @uploader = Uploader.new(bucket_resource, @logger, Concurrent::ThreadPoolExecutor.new({ - :min_threads => 1, - :max_threads => @upload_workers_count, - :max_queue => @upload_queue_size, - :fallback_policy => :caller_runs - })) + + executor = Concurrent::ThreadPoolExecutor.new({ :min_threads => 1, + :max_threads => @upload_workers_count, + :max_queue => @upload_queue_size, + :fallback_policy => :caller_runs }) + + @uploader = Uploader.new(bucket_resource, @logger, executor) # Restoring from crash will use a new threadpool to slowly recover # New events should have more priority. @@ -204,7 +208,7 @@ def register # If we need time based rotation we need to do periodic check on the file # to take care of file that were not updated recently - start_periodic_check if @rotation.need_periodic? + start_periodic_check if @rotation.needs_periodic? end def multi_receive_encoded(events_and_encoded) @@ -229,7 +233,7 @@ def multi_receive_encoded(events_and_encoded) end def close - stop_periodic_check if @rotation.need_periodic? + stop_periodic_check if @rotation.needs_periodic? @logger.debug("Uploading current workspace") @@ -294,7 +298,8 @@ def upload_options def rotate_if_needed(prefixes) prefixes.each do |prefix| - # Each file access is thread safe, until the rotation is done then only + # Each file access is thread safe, + # until the rotation is done then only # one thread has access to the resource. @file_repository.get_factory(prefix) do |factory| temp_file = factory.current @@ -316,7 +321,7 @@ def upload_file(temp_file) @logger.debug("Queue for upload", :path => temp_file.path) # if the queue is full the calling thread will be used to upload - temp_file.fsync # make sure we flush the fd before uploading it. + temp_file.close # make sure the content is on disk if temp_file.size > 0 @uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file), @@ -346,14 +351,12 @@ def restore_from_crash @crash_uploader = Uploader.new(bucket_resource, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@temporary_directory) - Dir.glob(::File.join(@temporary_directory, "**/*")) do |file| - if ::File.file?(file) - key_parts = Pathname.new(file).relative_path_from(temp_folder_path).to_s.split(::File::SEPARATOR) - temp_file = TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), ::File.open(file, "r"), key_parts.slice(0, 1)) - - @logger.debug("Recovering from crash and uploading", :file => temp_file.path) - @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file)) - end + Dir.glob(::File.join(@temporary_directory, "**/*")) + .select { |file| ::File.file?(file) } + .each do |file| + temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) + @logger.debug("Recovering from crash and uploading", :file => temp_file.path) + @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file)) end end end diff --git a/lib/logstash/outputs/s3/file_repository.rb b/lib/logstash/outputs/s3/file_repository.rb index 1b1cd5c0..442c46e0 100644 --- a/lib/logstash/outputs/s3/file_repository.rb +++ b/lib/logstash/outputs/s3/file_repository.rb @@ -15,15 +15,15 @@ class FileRepository # Ensure that all access or work done # on a factory is threadsafe class PrefixedValue - def initialize(factory, stale_time) - @factory = factory + def initialize(file_factory, stale_time) + @file_factory = file_factory @lock = Mutex.new @stale_time = stale_time end def with_lock @lock.synchronize { - yield @factory + yield @file_factory } end diff --git a/lib/logstash/outputs/s3/patch.rb b/lib/logstash/outputs/s3/patch.rb new file mode 100644 index 00000000..76b76a6f --- /dev/null +++ b/lib/logstash/outputs/s3/patch.rb @@ -0,0 +1,22 @@ +# This is patch related to the autoloading and ruby +# +# The fix exist in jruby 9k but not in the current jruby, not sure when or it will be backported +# https://github.com/jruby/jruby/issues/3645 +# +# AWS is doing tricky name discovery in the module to generate the correct error class and +# this strategy is bogus in jruby and `eager_autoload` don't fix this issue. +# +# This will be a short lived patch since AWS is removing the need. +# see: https://github.com/aws/aws-sdk-ruby/issues/1301#issuecomment-261115960 +old_stderr = $stderr + +$stderr = StringIO.new +begin + module Aws + const_set(:S3, Aws::S3) + end +ensure + $stderr = old_stderr +end + + diff --git a/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb b/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb index 356d86cf..b1c885a8 100644 --- a/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb +++ b/lib/logstash/outputs/s3/size_and_time_rotation_policy.rb @@ -15,7 +15,7 @@ def rotate?(file) @size_strategy.rotate?(file) || @time_strategy.rotate?(file) end - def need_periodic? + def needs_periodic? true end end diff --git a/lib/logstash/outputs/s3/size_rotation_policy.rb b/lib/logstash/outputs/s3/size_rotation_policy.rb index 2b1dbdbc..d47ccd17 100644 --- a/lib/logstash/outputs/s3/size_rotation_policy.rb +++ b/lib/logstash/outputs/s3/size_rotation_policy.rb @@ -17,7 +17,7 @@ def rotate?(file) file.size >= size_file end - def need_periodic? + def needs_periodic? false end end diff --git a/lib/logstash/outputs/s3/temporary_file.rb b/lib/logstash/outputs/s3/temporary_file.rb index 176db6b3..7ce86904 100644 --- a/lib/logstash/outputs/s3/temporary_file.rb +++ b/lib/logstash/outputs/s3/temporary_file.rb @@ -10,9 +10,10 @@ class S3 # It make it more OOP and easier to reason with the paths. class TemporaryFile extend Forwardable - DELEGATES_METHODS = [:path, :write, :close, :size, :fsync] - def_delegators :@fd, *DELEGATES_METHODS + def_delegators :@fd, :path, :write, :close, :fsync + + attr_reader :fd def initialize(key, fd, temp_path) @fd = fd @@ -29,6 +30,17 @@ def temp_path @temp_path end + def size + # Use the fd size to get the accurate result, + # so we dont have to deal with fsync + # if the file is close we will use the File::size + begin + @fd.size + rescue IOError + ::File.size(path) + end + end + def key @key.gsub(/^\//, "") end @@ -45,6 +57,14 @@ def delete! def empty? size == 0 end + + def self.create_from_existing_file(file_path, temporary_folder) + key_parts = Pathname.new(file_path).relative_path_from(temporary_folder).to_s.split(::File::SEPARATOR) + + TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"), + ::File.open(file_path, "r"), + ::File.join(temporary_folder, key_parts.slice(0, 1))) + end end end end diff --git a/lib/logstash/outputs/s3/temporary_file_factory.rb b/lib/logstash/outputs/s3/temporary_file_factory.rb index 3b34eb9e..dc4d4879 100644 --- a/lib/logstash/outputs/s3/temporary_file_factory.rb +++ b/lib/logstash/outputs/s3/temporary_file_factory.rb @@ -2,6 +2,8 @@ require "socket" require "securerandom" require "fileutils" +require "zlib" +require "forwardable" module LogStash module Outputs @@ -80,13 +82,41 @@ def new_file FileUtils.mkdir_p(::File.join(path, prefix)) io = if gzip? - Zlib::GzipWriter.open(::File.join(path, key)) + # We have to use this wrapper because we cannot access the size of the + # file directly on the gzip writer. + IOWrappedGzip.new(::File.open(::File.join(path, key), FILE_MODE)) else ::File.open(::File.join(path, key), FILE_MODE) end TemporaryFile.new(key, io, path) end + + class IOWrappedGzip + extend Forwardable + + def_delegators :@gzip_writer, :write, :close + attr_reader :file_io, :gzip_writer + + def initialize(file_io) + @file_io = file_io + @gzip_writer = Zlib::GzipWriter.open(file_io) + end + + def path + @gzip_writer.to_io.path + end + + def size + # to get the current file size + @gzip_writer.flush + @gzip_writer.to_io.size + end + + def fsync + @gzip_writer.to_io.fsync + end + end end end end diff --git a/lib/logstash/outputs/s3/time_rotation_policy.rb b/lib/logstash/outputs/s3/time_rotation_policy.rb index 9fb8adc0..b3de61c4 100644 --- a/lib/logstash/outputs/s3/time_rotation_policy.rb +++ b/lib/logstash/outputs/s3/time_rotation_policy.rb @@ -17,7 +17,7 @@ def rotate?(file) file.size > 0 && (Time.now - file.ctime) >= time_file end - def need_periodic? + def needs_periodic? true end end diff --git a/lib/logstash/outputs/s3/uploader.rb b/lib/logstash/outputs/s3/uploader.rb index 3a7251a2..c3949c91 100644 --- a/lib/logstash/outputs/s3/uploader.rb +++ b/lib/logstash/outputs/s3/uploader.rb @@ -1,6 +1,6 @@ # encoding: utf-8 require "logstash/util" -require "aws-sdk-resources" +require "aws-sdk" module LogStash module Outputs @@ -42,8 +42,7 @@ def upload(file, options = {}) # # Thread might be stuck here, but I think its better than losing anything # its either a transient errors or something bad really happened. - sleep(TIME_BEFORE_RETRYING_SECONDS) - logger.error("Uploading failed, retrying", :exception => e, :path => file.path) + logger.error("Uploading failed, retrying", :exception => e, :path => file.path, :backtrace => e.backtrace) retry end diff --git a/spec/integration/dynamic_prefix_spec.rb b/spec/integration/dynamic_prefix_spec.rb index 71dec731..39a9e779 100644 --- a/spec/integration/dynamic_prefix_spec.rb +++ b/spec/integration/dynamic_prefix_spec.rb @@ -85,7 +85,7 @@ end it "creates dated path" do - re = /^#{sandbox}\/\d{4}-\d{2}-\d{2}\/ls\.s3\./ + re = /^#{sandbox}\/\d{4}-\d{2}-\d{1,2}\/ls\.s3\./ expect(bucket_resource.objects(:prefix => sandbox).first.key).to match(re) end end diff --git a/spec/integration/gzip_file_spec.rb b/spec/integration/gzip_file_spec.rb new file mode 100644 index 00000000..7bd15097 --- /dev/null +++ b/spec/integration/gzip_file_spec.rb @@ -0,0 +1,62 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Gzip File Time rotation with constant write", :integration => true do + include_context "setup plugin" + + let(:time_file) { 0.004 } + let(:options) { main_options.merge({ "encoding" => "gzip", + "rotation_strategy" => "time" }) } + let(:number_of_events) { 5000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" } + let(:batch) do + b = {} + number_of_events.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + let(:minimum_number_of_time_rotation) { 3 } + let(:batch_step) { (number_of_events / minimum_number_of_time_rotation).ceil } + + before do + clean_remote_files(prefix) + subject.register + + # simulate batch read/write + batch.each_slice(batch_step) do |batch_time| + batch_time.each_slice(batch_size) do |smaller_batch| + subject.multi_receive_encoded(smaller_batch) + end + sleep(1) + end + + subject.close + end + + it "creates multiples files" do + # using close will upload the current file + expect(bucket_resource.objects(:prefix => prefix).count).to be_between(minimum_number_of_time_rotation, minimum_number_of_time_rotation + 1).inclusive + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.gz") + object.get(:response_target => target) + counter += 1 + end + + expect(Dir.glob(File.join(download_directory, "**", "*.gz")).inject(0) { |sum, f| sum + Zlib::GzipReader.new(File.open(f)).readlines.size }).to eq(number_of_events) + end +end diff --git a/spec/integration/gzip_size_rotation_spec.rb b/spec/integration/gzip_size_rotation_spec.rb new file mode 100644 index 00000000..110f732a --- /dev/null +++ b/spec/integration/gzip_size_rotation_spec.rb @@ -0,0 +1,63 @@ +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "stud/temporary" + +describe "Gzip Size rotation", :integration => true do + include_context "setup plugin" + + let(:document_size) { 20 * 1024 } # in bytes + + let(:options) do + main_options.merge({ + "encoding" => "gzip", + "size_file" => document_size, + "rotation_strategy" => "size" }) + end + + let(:number_of_events) { 1_000_000 } + let(:batch_size) { 125 } + let(:event_encoded) { "Hello world" * 20 } + let(:batch) do + b = {} + batch_size.times do + event = LogStash::Event.new({ "message" => event_encoded }) + b[event] = "#{event_encoded}\n" + end + b + end + let(:number_of_files) { number_of_events / 50000 } + + before do + clean_remote_files(prefix) + subject.register + (number_of_events/batch_size).times do + subject.multi_receive_encoded(batch) + end + subject.close + end + + it "Rotates the files based on size" do + f = bucket_resource.objects(:prefix => prefix).first + expect(f.size).to be_between(document_size, document_size * 2).inclusive + end + + it "Persists all events" do + download_directory = Stud::Temporary.pathname + + FileUtils.rm_rf(download_directory) + FileUtils.mkdir_p(download_directory) + + counter = 0 + bucket_resource.objects(:prefix => prefix).each do |object| + target = File.join(download_directory, "#{counter}.txt.gz") + object.get(:response_target => target) + counter += 1 + end + + expect(Dir.glob(File.join(download_directory, "**", "*.gz")).inject(0) do |sum, f| + sum + Zlib::GzipReader.new(File.open(f)).readlines.size + end).to eq(number_of_events) + end +end diff --git a/spec/integration/restore_from_crash_spec.rb b/spec/integration/restore_from_crash_spec.rb index a119be6f..2e883aa1 100644 --- a/spec/integration/restore_from_crash_spec.rb +++ b/spec/integration/restore_from_crash_spec.rb @@ -22,17 +22,17 @@ factory.current.fsync (number_of_files - 1).times do + factory.rotate! factory.current.write(dummy_content) factory.current.fsync - factory.rotate! end end it "uploads the file to the bucket" do subject.register try(20) do - expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(0) expect(bucket_resource.objects(:prefix => prefix).count).to eq(number_of_files) + expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(0) end end end diff --git a/spec/integration/size_rotation_spec.rb b/spec/integration/size_rotation_spec.rb index 85dced03..f2b4107a 100644 --- a/spec/integration/size_rotation_spec.rb +++ b/spec/integration/size_rotation_spec.rb @@ -7,8 +7,9 @@ describe "Size rotation", :integration => true do include_context "setup plugin" - let(:size_file) { batch_size.times.inject(0) { |sum, i| sum + "#{event_encoded}\n".bytesize } } - let(:options) { main_options.merge({ "rotation_strategy" => "size" }) } + let(:event_size) { "Hello world".bytesize } + let(:size_file) { batch_size * event_size * 2 } + let(:options) { main_options.merge({ "rotation_strategy" => "size", "size_file" => size_file }) } let(:number_of_events) { 5000 } let(:batch_size) { 125 } let(:event_encoded) { "Hello world" } @@ -20,7 +21,7 @@ end b end - let(:number_of_files) { number_of_events / batch_size } + let(:number_of_files) { number_of_events * event_size / size_file } before do clean_remote_files(prefix) @@ -37,7 +38,7 @@ it "Rotates the files based on size" do bucket_resource.objects(:prefix => prefix).each do |f| - expect(f.size).to eq(size_file) + expect(f.size).to be_between(size_file, size_file * 2).inclusive end end diff --git a/spec/integration/time_based_rotation_with_constant_write_spec.rb b/spec/integration/time_based_rotation_with_constant_write_spec.rb index e3efd2cf..31bb435d 100644 --- a/spec/integration/time_based_rotation_with_constant_write_spec.rb +++ b/spec/integration/time_based_rotation_with_constant_write_spec.rb @@ -7,7 +7,7 @@ describe "File Time rotation with constant write", :integration => true do include_context "setup plugin" - let(:time_file) { 0.5 } + let(:time_file) { 0.004 } let(:options) { main_options.merge({ "rotation_strategy" => "time" }) } let(:number_of_events) { 5000 } let(:batch_size) { 125 } @@ -32,7 +32,7 @@ batch_time.each_slice(batch_size) do |smaller_batch| subject.multi_receive_encoded(smaller_batch) end - sleep(time_file * 2) + sleep(1) end subject.close diff --git a/spec/integration/time_based_rotation_with_stale_write_spec.rb b/spec/integration/time_based_rotation_with_stale_write_spec.rb index f008e6bd..a76c85c4 100644 --- a/spec/integration/time_based_rotation_with_stale_write_spec.rb +++ b/spec/integration/time_based_rotation_with_stale_write_spec.rb @@ -7,7 +7,7 @@ describe "File Time rotation with stale write", :integration => true do include_context "setup plugin" - let(:time_file) { 1 } + let(:time_file) { 0.0004 } let(:options) { main_options.merge({ "rotation_strategy" => "time" }) } let(:number_of_events) { 5000 } let(:batch_size) { 125 } @@ -26,7 +26,7 @@ clean_remote_files(prefix) subject.register subject.multi_receive_encoded(batch) - sleep(time_file * 5) # the periodic check should have kick int + sleep(1) # the periodic check should have kick int end after do @@ -35,7 +35,9 @@ it "create one file" do # using close will upload the current file - expect(bucket_resource.objects(:prefix => prefix).count).to eq(1) + try(20) do + expect(bucket_resource.objects(:prefix => prefix).count).to eq(1) + end end it "Persists all events" do @@ -50,6 +52,9 @@ object.get(:response_target => target) counter += 1 end - expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + + try(20) do + expect(Dir.glob(File.join(download_directory, "**", "*.txt")).inject(0) { |sum, f| sum + IO.readlines(f).size }).to eq(number_of_events) + end end end diff --git a/spec/integration/upload_current_file_on_shutdown_spec.rb b/spec/integration/upload_current_file_on_shutdown_spec.rb index efd32993..d52e651d 100644 --- a/spec/integration/upload_current_file_on_shutdown_spec.rb +++ b/spec/integration/upload_current_file_on_shutdown_spec.rb @@ -25,9 +25,7 @@ before do clean_remote_files(prefix) subject.register - batch.each_slice(batch_size) do |smaller_batch| - subject.multi_receive_encoded(smaller_batch) - end + subject.multi_receive_encoded(batch) subject.close end diff --git a/spec/outputs/s3/file_repository_spec.rb b/spec/outputs/s3/file_repository_spec.rb index 49d7c096..befa3bb9 100644 --- a/spec/outputs/s3/file_repository_spec.rb +++ b/spec/outputs/s3/file_repository_spec.rb @@ -88,7 +88,8 @@ it "returns all available keys" do subject.get_file(prefix_key) { |file| file.write("something") } - expect(subject.keys.toArray).to eq([prefix_key]) + expect(subject.keys.toArray).to include(prefix_key) + expect(subject.keys.toArray.size).to eq(1) end it "clean stale factories" do diff --git a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb index 3e822783..a7329b01 100644 --- a/spec/outputs/s3/size_and_time_rotation_policy_spec.rb +++ b/spec/outputs/s3/size_and_time_rotation_policy_spec.rb @@ -69,9 +69,9 @@ end end - context "#need_periodic?" do + context "#needs_periodic?" do it "return true" do - expect(subject.need_periodic?).to be_truthy + expect(subject.needs_periodic?).to be_truthy end end end diff --git a/spec/outputs/s3/size_rotation_policy_spec.rb b/spec/outputs/s3/size_rotation_policy_spec.rb index e8ca74a3..79e23f02 100644 --- a/spec/outputs/s3/size_rotation_policy_spec.rb +++ b/spec/outputs/s3/size_rotation_policy_spec.rb @@ -32,9 +32,9 @@ expect { described_class.new(-100) }.to raise_error(LogStash::ConfigurationError, /need to be greather than 0/) end - context "#need_periodic?" do + context "#needs_periodic?" do it "return false" do - expect(subject.need_periodic?).to be_falsey + expect(subject.needs_periodic?).to be_falsey end end diff --git a/spec/outputs/s3/temporary_file_spec.rb b/spec/outputs/s3/temporary_file_spec.rb index fd88e1e0..40fb65e9 100644 --- a/spec/outputs/s3/temporary_file_spec.rb +++ b/spec/outputs/s3/temporary_file_spec.rb @@ -34,12 +34,6 @@ expect(File.exist?(subject.path)).to be_falsey end - described_class::DELEGATES_METHODS.each do |method_name| - it "delegates method `#{method_name}` to file descriptor" do - expect(subject.respond_to?(method_name)).to be_truthy - end - end - it "returns the creation time" do expect(subject.ctime).to be < Time.now + 0.5 end diff --git a/spec/outputs/s3/time_rotation_policy_spec.rb b/spec/outputs/s3/time_rotation_policy_spec.rb index d6f3407a..53aabcdd 100644 --- a/spec/outputs/s3/time_rotation_policy_spec.rb +++ b/spec/outputs/s3/time_rotation_policy_spec.rb @@ -48,9 +48,13 @@ end end - context "#need_periodic?" do + context "#needs_periodic?" do it "return false" do - expect(subject.need_periodic?).to be_truthy + expect(subject.needs_periodic?).to be_truthy end end + + it "convert minute into seconds" do + expect(subject.time_file).to eq(60) + end end diff --git a/spec/outputs/s3/uploader_spec.rb b/spec/outputs/s3/uploader_spec.rb index 16da5427..c7925d8b 100644 --- a/spec/outputs/s3/uploader_spec.rb +++ b/spec/outputs/s3/uploader_spec.rb @@ -34,7 +34,7 @@ subject { described_class.new(bucket, logger, threadpool) } it "upload file to the s3 bucket" do - subject.upload(file) + expect { subject.upload(file) }.not_to raise_error end it "execute a callback when the upload is complete" do diff --git a/spec/outputs/s3_spec.rb b/spec/outputs/s3_spec.rb index 641707d0..6a1f3dbb 100644 --- a/spec/outputs/s3_spec.rb +++ b/spec/outputs/s3_spec.rb @@ -8,9 +8,15 @@ let(:prefix) { "super/%{server}" } let(:region) { "us-east-1" } let(:bucket_name) { "mybucket" } - let(:options) { { "region" => region, "bucket" => bucket_name, "prefix" => prefix } } + let(:options) { { "region" => region, + "bucket" => bucket_name, + "prefix" => prefix, + "restore" => false, + "access_key_id" => "access_key_id", + "secret_access_key" => "secret_access_key" + } } let(:client) { Aws::S3::Client.new(stub_responses: true) } - let(:mock_bucket) { Aws::S3::Bucket.new(bucket_name, :client => client) } + let(:mock_bucket) { Aws::S3::Bucket.new(:name => bucket_name, :stub_responses => true, :client => client) } let(:event) { LogStash::Event.new({ "server" => "overwatch" }) } let(:event_encoded) { "super hype" } let(:events_and_encoded) { { event => event_encoded } } @@ -70,6 +76,10 @@ subject.register end + after do + subject.close + end + it "uses `Event#sprintf` for the prefix" do expect(event).to receive(:sprintf).with(prefix).and_return("super/overwatch") subject.multi_receive_encoded(events_and_encoded) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 36a4a794..af1891c4 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,3 +1,6 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require_relative "supports/helpers" +require "logstash/logging/logger" + +LogStash::Logging::Logger::configure_logging("debug") if ENV["DEBUG"] From 458de2818eb54fc9f61b4939d2603133d537d72f Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Wed, 14 Dec 2016 20:46:29 -0500 Subject: [PATCH 8/8] bump to 4.0.0 --- CHANGELOG.md | 21 +++++++++++++++++++++ logstash-output-s3.gemspec | 3 +-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 093937d1..9a1a7641 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,24 @@ +## 4.0.0 + - This version is a complete rewrite over version 3.0.0 See #103 + - This Plugin now uses the V2 version of the SDK, this make sure we receive the latest updates and changes. + - We now uses S3's `upload_file` instead of reading chunks, this method is more efficient and will uses the multipart with threads if the files is too big. + - You can now use the `fieldref` syntax in the prefix to dynamically changes the target with the events it receives. + - The Upload queue is now a bounded list, this options is necessary to allow back pressure to be communicated back to the pipeline but its configurable by the user. + - If the queue is full the plugin will start the upload in the current thread. + - The plugin now threadsafe and support the concurrency model `shared` + - The rotation strategy can be selected, the recommended is `size_and_time` that will check for both the configured limits (`size` and `time` are also available) + - The `restore` option will now use a separate threadpool with an unbounded queue + - The `restore` option will not block the launch of logstash and will uses less resources than the real time path + - The plugin now uses `multi_receive_encode`, this will optimize the writes to the files + - rotate operation are now batched to reduce the number of IO calls. + - Empty file will not be uploaded by any rotation rotation strategy + - We now use Concurrent-Ruby for the implementation of the java executor + - If you have finer grain permission on prefixes or want faster boot, you can disable the credentials check with `validate_credentials_on_root_bucket` + - The credentials check will no longer fails if we can't delete the file + - We now have a full suite of integration test for all the defined rotation + +Fixes: #4 #81 #44 #59 #50 + ## 3.2.0 - Move to the new concurrency model `:single` - use correct license identifier #99 diff --git a/logstash-output-s3.gemspec b/logstash-output-s3.gemspec index 17a7bb5f..22a9d7df 100644 --- a/logstash-output-s3.gemspec +++ b/logstash-output-s3.gemspec @@ -1,7 +1,6 @@ Gem::Specification.new do |s| - s.name = 'logstash-output-s3' - s.version = '3.2.0' + s.version = '4.0.0' s.licenses = ['Apache-2.0'] s.summary = "This plugin was created for store the logstash's events into Amazon Simple Storage Service (Amazon S3)" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"