Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## Upcoming
- Added `filename` as an optional configuration setting to specify a custom format for file names. [#134](https://github.com/logstash-plugins/logstash-output-s3/issues/134)

## 4.2.0
- Added ability to specify [ONEZONE_IA](https://aws.amazon.com/s3/storage-classes/#__) as storage_class

Expand All @@ -21,7 +24,7 @@
- Fixed bucket validation failures when bucket policy requires encryption [#191](https://github.com/logstash-plugins/logstash-output-s3/pull/191)

## 4.1.4
- [#185](https://github.com/logstash-plugins/logstash-output-s3/pull/184) Internal: Revert rake pinning to fix upstream builds
- [#185](https://github.com/logstash-plugins/logstash-output-s3/pull/185) Internal: Revert rake pinning to fix upstream builds

## 4.1.3
- [#181](https://github.com/logstash-plugins/logstash-output-s3/pull/181) Docs: Fix incorrect characterization of parameters as `required` in example configuration.
Expand Down
11 changes: 11 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-encoding>> |<<string,string>>, one of `["none", "gzip"]`|No
| <<plugins-{type}s-{plugin}-endpoint>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-prefix>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-filename>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-proxy_uri>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-region>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-restore>> |<<boolean,boolean>>|No
Expand Down Expand Up @@ -216,6 +217,16 @@ This option supports logstash interpolation: https://www.elastic.co/guide/en/log
for example, files can be prefixed with the event date using `prefix = "%{+YYYY}/%{+MM}/%{+dd}"`.
Be warned this can create a lot of temporary local files.

[id="plugins-{type}s-{plugin}-filename"]
===== `filename`

* Value type is <<string,string>>
* Default value is `""`

Specify a filename format to use for uploaded files. If not defined, a unique S3 output file will be generated as described above.
This option supports logstash interpolation: https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#sprintf;
If you do not configure a unique filename using interpolation, the plugin may overwrite the same file each time an S3 upload takes place.

[id="plugins-{type}s-{plugin}-proxy_uri"]
===== `proxy_uri`

Expand Down
14 changes: 12 additions & 2 deletions lib/logstash/outputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,13 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
config :temporary_directory, :validate => :string, :default => File.join(Dir.tmpdir, "logstash")

# Specify a prefix to the uploaded filename, this can simulate directories on S3. Prefix does not require leading slash.
# This option support string interpolation, be warned this can created a lot of temporary local files.
# This option supports logstash string interpolation with sprintf. Be warned this can create a lot of temporary local files.
config :prefix, :validate => :string, :default => ''

# Specify a filename format to use for uploaded files. If not defined, a unique filename is generated. Invalid characters are replaced with underscores (_).
# This option supports logstash string interpolation with sprintf. If you do not configure a unique filename using interpolation, the plugin may overwrite the same file each time an S3 upload takes place.
config :filename, :validate => :string, :default => ''

# Specify how many workers to use to upload the files to S3
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).ceil

Expand Down Expand Up @@ -197,6 +201,11 @@ def register
raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}"
end
end
unless filename.empty?
if !PathValidator.valid?(filename)
raise LogStash::ConfigurationError, "Filename must not contains: #{PathValidator::INVALID_CHARACTERS}"
end
end

if !WritableDirectoryValidator.valid?(@temporary_directory)
raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}"
Expand Down Expand Up @@ -236,9 +245,10 @@ def multi_receive_encoded(events_and_encoded)
events_and_encoded.each do |event, encoded|
prefix_key = normalize_key(event.sprintf(@prefix))
prefix_written_to << prefix_key
filename_key = normalize_key(event.sprintf(@filename))

begin
@file_repository.get_file(prefix_key) { |file| file.write(encoded) }
@file_repository.get_file(prefix_key, filename_key: filename_key) { |file| file.write(encoded) }
# The output should stop accepting new events coming in, since it cannot do anything with them anymore.
# Log the error and rethrow it.
rescue Errno::ENOSPC => e
Expand Down
21 changes: 14 additions & 7 deletions lib/logstash/outputs/s3/file_repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def stale?
with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) }
end

def apply(prefix)
def apply(prefix, filename)
return self
end

Expand All @@ -49,8 +49,8 @@ def initialize(tags, encoding, temporary_directory, stale_time)
@stale_time = stale_time
end

def apply(prefix_key)
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)
def apply(prefix_key, filename_key: "")
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory, filename: filename_key), @stale_time)
end
end

Expand Down Expand Up @@ -79,12 +79,19 @@ def each_files
end

# Return the file factory
def get_factory(prefix_key)
@prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
def get_factory(prefix_key, filename_key: "")
key = prefix_key + (filename_key == "" ? "" : ("/" + filename_key))
if @prefixed_factories.key?(key)
factory = @prefixed_factories[key]
else
factory = @factory_initializer.apply(prefix_key, filename_key: filename_key)
@prefixed_factories[key] = factory
end
factory.with_lock { |factory| yield factory }
end

def get_file(prefix_key)
get_factory(prefix_key) { |factory| yield factory.current }
def get_file(prefix_key, filename_key: "")
get_factory(prefix_key, filename_key: filename_key) { |factory| yield factory.current }
end

def shutdown
Expand Down
7 changes: 4 additions & 3 deletions lib/logstash/outputs/s3/temporary_file_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ class TemporaryFileFactory
TXT_EXTENSION = "txt"
STRFTIME = "%Y-%m-%dT%H.%M"

attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current
attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current, :filename

def initialize(prefix, tags, encoding, temporary_directory)
def initialize(prefix, tags, encoding, temporary_directory, filename: "")
@counter = 0
@prefix = prefix
@filename = filename

@tags = tags
@encoding = encoding
Expand Down Expand Up @@ -75,7 +76,7 @@ def generate_name

def new_file
uuid = SecureRandom.uuid
name = generate_name
name = filename == "" ? generate_name : filename
path = ::File.join(temporary_directory, uuid)
key = ::File.join(prefix, name)

Expand Down
87 changes: 75 additions & 12 deletions spec/outputs/s3/file_repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
let(:encoding) { "none" }
let(:temporary_directory) { Stud::Temporary.pathname }
let(:prefix_key) { "a-key" }
let(:filename_key) { "a-filename-key" }

before do
FileUtils.mkdir_p(temporary_directory)
Expand All @@ -20,64 +21,97 @@
subject.get_file(prefix_key) do |file|
expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile)
end

subject.get_file(prefix_key, filename_key: filename_key) do |file|
expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile)
end
end

it "returns the same file for the same prefix key" do
it "returns the same file for the same prefix key and filename key" do
file_path = nil

subject.get_file(prefix_key) do |file|
file_path = file.path
end

subject.get_file(prefix_key) do |file|
expect(file.path).to eq(file_path)
end

subject.get_file(prefix_key, filename_key: filename_key) do |file|
file_path = file.path
end
subject.get_file(prefix_key, filename_key: filename_key) do |file|
expect(file.path).to eq(file_path)
end
end

it "returns the same file for the same dynamic prefix key" do
it "returns the same file for the same dynamic prefix key and filename key" do
prefix = "%{type}/%{+YYYY}/%{+MM}/%{+dd}/"
name = "${type}.txt"
event = LogStash::Event.new({ "type" => "syslog"})
key = event.sprintf(prefix)
name = event.sprintf(name)
file_path = nil


subject.get_file(key) do |file|
file_path = file.path
end

subject.get_file(key) do |file|
expect(file.path).to eq(file_path)
end

subject.get_file(key, filename_key: name) do |file|
file_path = file.path
end
subject.get_file(key, filename_key: name) do |file|
expect(file.path).to eq(file_path)
end
end

it "returns different file for different prefix keys" do
it "returns different file for different prefix keys and filename keys" do
file_path = nil

subject.get_file(prefix_key) do |file|
file_path = file.path
end

subject.get_file("another_prefix_key") do |file|
expect(file.path).not_to eq(file_path)
end

subject.get_file(prefix_key, filename_key: filename_key) do |file|
file_path = file.path
end
subject.get_file(prefix_key, filename_key: "another_filename_key") do |file|
expect(file.path).not_to eq(file_path)
end
end

it "allows to get the file factory for a specific prefix" do
it "allows to get the file factory for a specific prefix or filename" do
subject.get_factory(prefix_key) do |factory|
expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory)
end

subject.get_factory(prefix_key, filename_key: filename_key) do |factory|
expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory)
end
end

it "returns a different file factory for a different prefix keys" do
it "returns a different file factory for different prefix keys and filename keys" do
factory = nil

subject.get_factory(prefix_key) do |f|
factory = f
end

subject.get_factory("another_prefix_key") do |f|
expect(factory).not_to eq(f)
end

subject.get_factory(prefix_key, filename_key: filename_key) do |f|
factory = f
end
subject.get_factory(prefix_key, filename_key: "another_filename)key") do |f|
expect(factory).not_to eq(f)
end
end

it "returns the number of prefix keys" do
Expand All @@ -86,13 +120,25 @@
expect(subject.size).to eq(1)
end

it "returns all available keys" do
it "returns the number of prefix with filename keys" do
expect(subject.size).to eq(0)
subject.get_file(prefix_key, filename_key: filename_key) { |file| file.write("something else") }
expect(subject.size).to eq(1)
end

it "returns all available prefix keys" do
subject.get_file(prefix_key) { |file| file.write("something") }
expect(subject.keys.toArray).to include(prefix_key)
expect(subject.keys.toArray.size).to eq(1)
end

it "clean stale factories" do
it "returns all available prefix with filename keys" do
subject.get_file(prefix_key, filename_key: filename_key) { |file| file.write("something else") }
expect(subject.keys.toArray).to include(prefix_key + "/" + filename_key)
expect(subject.keys.toArray.size).to eq(1)
end

it "cleans stale prefix factories" do
@file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1)
expect(@file_repository.size).to eq(0)
path = ""
Expand All @@ -108,6 +154,23 @@
try(10) { expect(@file_repository.size).to eq(1) }
expect(File.directory?(path)).to be_falsey
end

it "cleans stale prefix with filename factories" do
@file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1)
expect(@file_repository.size).to eq(0)
path = ""
@file_repository.get_factory(prefix_key, filename_key: filename_key) do |factory|
factory.current.write("hello again")
# force a rotation so we get an empty file that will get stale.
factory.rotate!
path = factory.current.temp_path
end

@file_repository.get_file(prefix_key, filename_key: "another-filename") { |file| file.write("hello again") }
expect(@file_repository.size).to eq(2)
try(10) { expect(@file_repository.size).to eq(1) }
expect(File.directory?(path)).to be_falsey
end
end


Expand Down
12 changes: 11 additions & 1 deletion spec/outputs/s3_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

describe LogStash::Outputs::S3 do
let(:prefix) { "super/%{server}" }
let(:filename) { "%{server}.txt" }
let(:region) { "us-east-1" }
let(:bucket_name) { "mybucket" }
let(:options) { { "region" => region,
"bucket" => bucket_name,
"prefix" => prefix,
"filename" => "",
"restore" => false,
"access_key_id" => "access_key_id",
"secret_access_key" => "secret_access_key"
Expand Down Expand Up @@ -158,6 +160,11 @@
s3 = described_class.new(options.merge({ "prefix" => "`no\><^" }))
expect { s3.register }.to raise_error(LogStash::ConfigurationError)
end

it "validates the filename" do
s3 = described_class.new(options.merge({ "filename" => "`no\><^" }))
expect { s3.register }.to raise_error(LogStash::ConfigurationError)
end

describe "additional_settings" do
context "when enabling force_path_style" do
Expand Down Expand Up @@ -199,8 +206,11 @@
subject.close
end

it "uses `Event#sprintf` for the prefix" do
let(:options) { super.merge({ "filename" => filename }) }

it "uses `Event#sprintf` for the prefix and filename" do
expect(event).to receive(:sprintf).with(prefix).and_return("super/overwatch")
expect(event).to receive(:sprintf).with(filename).and_return("overwatch.txt")
subject.multi_receive_encoded(events_and_encoded)
end
end
Expand Down