diff --git a/.gitignore b/.gitignore index e3608ef5..065886d4 100644 --- a/.gitignore +++ b/.gitignore @@ -59,9 +59,12 @@ gradle/wrapper/gradle-wrapper.properties .vscode/settings.json rspec.xml e2e/output_file.txt +e2e/input_file.txt logs.txt docker-e2e/.env local-run.sh logs2.txt **/.vscode/*.* **/settings.json +Run.sh +vendor diff --git a/README.md b/README.md index b256a266..c3e2f1e0 100755 --- a/README.md +++ b/README.md @@ -35,19 +35,21 @@ Perform configuration before sending events from Logstash to Azure Data Explorer ```ruby output { - kusto { - path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm}.txt" - ingest_url => "https://ingest-.kusto.windows.net/" - app_id => "" - app_key => "" - app_tenant => "" - database => "" - table => "" - json_mapping => "" - proxy_host => "" - proxy_port => - proxy_protocol => <"http"|"https"> - } + kusto { + ingest_url => "https://ingest-.kusto.windows.net/" + app_id => "" + app_key => "" + app_tenant => "" + database => "" + table => "" + json_mapping => "" + proxy_host => "" + proxy_port => + proxy_protocol => <"http"|"https"> + max_size => 10 + max_interval => 10 + latch_timeout => 60 + } } ``` More information about configuring Logstash can be found in the [logstash configuration guide](https://www.elastic.co/guide/en/logstash/current/configuration.html) @@ -56,22 +58,22 @@ More information about configuring Logstash can be found in the [logstash config | Parameter Name | Description | Notes | | --- | --- | --- | -| **path** | The plugin writes events to temporary files before sending them to ADX. This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the ADX service. The example above shows how to rotate the files every minute and check the Logstash docs for more information on time expressions. | Required -| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal.| Required| -| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional| -| **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional| -| **database**| Database name to place events | Required | -| **table** | Target table name to place events | Required +| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal. | Required | +| **app_id, app_key, app_tenant** | Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional | +| **managed_identity** | Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional | +| **database** | Database name to place events | Required | +| **table** | Target table name to place events | Required | | **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional | -| **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | | -| **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| | -| **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| | -| **proxy_host** | The proxy hostname for redirecting traffic to Kusto.| | -| **proxy_port** | The proxy port for the proxy. Defaults to 80.| | -| **proxy_protocol** | The proxy server protocol , is one of http or https.| | +| **proxy_host** | The proxy hostname for redirecting traffic to Kusto. | Optional | +| **proxy_port** | The proxy port for the proxy. Defaults to 80. | Optional | +| **proxy_protocol** | The proxy server protocol, is one of http or https. | Optional | +| **max_size** | Maximum size of the buffer before it gets flushed, defaults to 10MB. | Optional | +| **latch_timeout** | Latch timeout in seconds, defaults to 60. This is the maximum wait time after which the flushing attempt is timed out and the network is considered to be down. The system waits for the network to be back to retry flushing the same batch. | Optional | > Note : LS_JAVA_OPTS can be used to set proxy parameters as well (using export or SET options) +> Note: **path** config parameter is no longer used in the latest release (3.0.0) and will be deprecated in future releases + ```bash export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.proxyHost=1.2.3.4 -Dhttps.proxyPort=8989" ``` @@ -81,12 +83,13 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox | Version | Release Date | Notes | | --- | --- | --- | -| 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library | -| 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK | -| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | -| 2.0.2 | 2023-11-28 | - Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped | -| 2.0.0 | 2023-09-19 | - Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries | -| 1.0.6 | 2022-11-29 | - Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.| +| 3.0.0 | 2024-11-01 | Updated configuration options | +| 2.0.8 | 2024-10-23 | Fix library deprecations, fix issues in the Azure Identity library | +| 2.0.7 | 2024-01-01 | Update Kusto JAVA SDK | +| 2.0.3 | 2023-12-12 | Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | +| 2.0.2 | 2023-11-28 | Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped | +| 2.0.0 | 2023-09-19 | Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries | +| 1.0.6 | 2022-11-29 | Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.| ## Development Requirements diff --git a/build.gradle b/build.gradle index f14b1e31..4b4fc702 100644 --- a/build.gradle +++ b/build.gradle @@ -29,8 +29,8 @@ repositories { // update dependencies to bom azure-sdk-bom/1.2.24 dependencies { - implementation 'com.microsoft.azure.kusto:kusto-data:5.2.0' - implementation 'com.microsoft.azure.kusto:kusto-ingest:5.2.0' + implementation 'com.microsoft.azure.kusto:kusto-data:6.0.0' + implementation 'com.microsoft.azure.kusto:kusto-ingest:6.0.0' implementation 'com.azure:azure-core-http-netty:1.15.1' implementation 'com.azure:azure-core:1.49.1' implementation 'com.azure:azure-data-tables:12.4.2' @@ -52,7 +52,7 @@ dependencies { implementation 'com.nimbusds:nimbus-jose-jwt:9.40' implementation 'com.nimbusds:oauth2-oidc-sdk:11.13' implementation 'com.univocity:univocity-parsers:2.9.1' - implementation 'commons-codec:commons-codec:1.16.1' + implementation 'commons-codec:commons-codec:1.17.1' implementation 'commons-logging:commons-logging:1.3.1' implementation 'io.github.resilience4j:resilience4j-core:1.7.1' implementation 'io.github.resilience4j:resilience4j-retry:1.7.1' diff --git a/build.sh b/build.sh new file mode 100755 index 00000000..65a0f823 --- /dev/null +++ b/build.sh @@ -0,0 +1,11 @@ +export LOGSTASH_SOURCE=1 +export LOGSTASH_PATH=/softwares/logstash +export JRUBY_HOME=$LOGSTASH_PATH/vendor/jruby +export JAVA_HOME=$LOGSTASH_PATH/jdk +export PATH=$PATH:/softwares/logstash/vendor/jruby/bin:/softwares/logstash/bin +jruby -S gem install bundler -v 2.4.19 +jruby -S bundle install +gem build *.gemspec +rm Gemfile.lock +/softwares/logstash/bin/logstash-plugin uninstall logstash-output-kusto +/softwares/logstash/bin/logstash-plugin install logstash-output-kusto-3.0.0-java.gem \ No newline at end of file diff --git a/e2e/e2e.rb b/e2e/e2e.rb index cabc1f45..f8a24b11 100755 --- a/e2e/e2e.rb +++ b/e2e/e2e.rb @@ -7,8 +7,8 @@ class E2E def initialize super - @input_file = "/tmp/input_file.txt" - @output_file = "output_file.txt" + @input_file = File.expand_path("input_file.txt", __dir__) + @output_file = File.expand_path("output_file.txt", __dir__) @columns = "(rownumber:int, rowguid:string, xdouble:real, xfloat:real, xbool:bool, xint16:int, xint32:int, xint64:long, xuint8:long, xuint16:long, xuint32:long, xuint64:long, xdate:datetime, xsmalltext:string, xtext:string, xnumberAsText:string, xtime:timespan, xtextWithNulls:string, xdynamicWithNulls:dynamic)" @csv_columns = '"rownumber", "rowguid", "xdouble", "xfloat", "xbool", "xint16", "xint32", "xint64", "xuint8", "xuint16", "xuint32", "xuint64", "xdate", "xsmalltext", "xtext", "xnumberAsText", "xtime", "xtextWithNulls", "xdynamicWithNulls"' @column_count = 19 @@ -35,7 +35,6 @@ def initialize file { path => "#{@output_file}"} stdout { codec => rubydebug } kusto { - path => "tmp%{+YYYY-MM-dd-HH-mm}.txt" ingest_url => "#{@ingest_url}" cli_auth => true database => "#{@database}" @@ -43,7 +42,6 @@ def initialize json_mapping => "#{@mapping_name}" } kusto { - path => "nomaptmp%{+YYYY-MM-dd-HH-mm}.txt" cli_auth => true ingest_url => "#{@ingest_url}" database => "#{@database}" @@ -56,20 +54,20 @@ def initialize def create_table_and_mapping Array[@table_with_mapping, @table_without_mapping].each { |tableop| puts "Creating table #{tableop}" - @query_client.execute(@database, ".drop table #{tableop} ifexists") + @query_client.executeMgmt(@database, ".drop table #{tableop} ifexists") sleep(1) - @query_client.execute(@database, ".create table #{tableop} #{@columns}") - @query_client.execute(@database, ".alter table #{tableop} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'") + @query_client.executeMgmt(@database, ".create table #{tableop} #{@columns}") + @query_client.executeMgmt(@database, ".alter table #{tableop} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'") } # Mapping only for one table - @query_client.execute(@database, ".create table #{@table_with_mapping} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'") + @query_client.executeMgmt(@database, ".create table #{@table_with_mapping} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'") end def drop_and_cleanup Array[@table_with_mapping, @table_without_mapping].each { |tableop| puts "Dropping table #{tableop}" - @query_client.execute(@database, ".drop table #{tableop} ifexists") + @query_client.executeMgmt(@database, ".drop table #{tableop} ifexists") sleep(1) } end @@ -99,7 +97,7 @@ def assert_data (0...max_timeout).each do |_| begin sleep(5) - query = @query_client.execute(@database, "#{tableop} | sort by rownumber asc") + query = @query_client.executeQuery(@database, "#{tableop} | sort by rownumber asc") result = query.getPrimaryResults() raise "Wrong count - expected #{csv_data.length}, got #{result.count()} in table #{tableop}" unless result.count() == csv_data.length rescue Exception => e diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ffed3a25..48c0a02c 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index cd61d5d2..bc2ad7ee 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -4,424 +4,127 @@ require 'logstash/namespace' require 'logstash/errors' -require 'logstash/outputs/kusto/ingestor' -require 'logstash/outputs/kusto/interval' +require 'logstash/outputs/kusto/customSizeBasedBuffer' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' +require 'logstash/outputs/kusto/logStashFlushBuffer' +require 'logstash/outputs/kusto/filePersistence' ## # This plugin sends messages to Azure Kusto in batches. # class LogStash::Outputs::Kusto < LogStash::Outputs::Base - config_name 'kusto' - concurrency :shared - - FIELD_REF = /%\{[^}]+\}/ - - attr_reader :failure_path - - # The path to the file to write. Event fields can be used here, - # like `/var/log/logstash/%{host}/%{application}` - # One may also utilize the path option for date-based log - # rotation via the joda time format. This will use the event - # timestamp. - # E.g.: `path => "./test-%{+YYYY-MM-dd}.txt"` to create - # `./test-2013-05-29.txt` - # - # If you use an absolute path you cannot start with a dynamic string. - # E.g: `/%{myfield}/`, `/test-%{myfield}/` are not valid paths - config :path, validate: :string, required: true - - # Flush interval (in seconds) for flushing writes to files. - # 0 will flush on every message. Increase this value to recude IO calls but keep - # in mind that events buffered before flush can be lost in case of abrupt failure. - config :flush_interval, validate: :number, default: 2 - - # If the generated path is invalid, the events will be saved - # into this file and inside the defined path. - config :filename_failure, validate: :string, default: '_filepath_failures' - - # If the configured file is deleted, but an event is handled by the plugin, - # the plugin will recreate the file. Default => true - config :create_if_deleted, validate: :boolean, default: true - - # Dir access mode to use. Note that due to the bug in jruby system umask - # is ignored on linux: https://github.com/jruby/jruby/issues/3426 - # Setting it to -1 uses default OS value. - # Example: `"dir_mode" => 0750` - config :dir_mode, validate: :number, default: -1 - - # File access mode to use. Note that due to the bug in jruby system umask - # is ignored on linux: https://github.com/jruby/jruby/issues/3426 - # Setting it to -1 uses default OS value. - # Example: `"file_mode" => 0640` - config :file_mode, validate: :number, default: -1 - - # TODO: fix the interval type... - config :stale_cleanup_interval, validate: :number, default: 10 - config :stale_cleanup_type, validate: %w[events interval], default: 'events' - - # Should the plugin recover from failure? - # - # If `true`, the plugin will look for temp files from past runs within the - # path (before any dynamic pattern is added) and try to process them - # - # If `false`, the plugin will disregard temp files found - config :recovery, validate: :boolean, default: true - - - # The Kusto endpoint for ingestion related communication. You can see it on the Azure Portal. - config :ingest_url, validate: :string, required: true - - # The following are the credentails used to connect to the Kusto service - # application id - config :app_id, validate: :string, required: false - # application key (secret) - config :app_key, validate: :password, required: false - # aad tenant id - config :app_tenant, validate: :string, default: nil - # managed identity id - config :managed_identity, validate: :string, default: nil - # CLI credentials for dev-test - config :cli_auth, validate: :boolean, default: false - # The following are the data settings that impact where events are written to - # Database name - config :database, validate: :string, required: true - # Target table name - config :table, validate: :string, required: true - # Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table. - # Note that this must be in JSON format, as this is the interface between Logstash and Kusto - # Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings - config :json_mapping, validate: :string, default: nil - - # Mapping name - deprecated, use json_mapping - config :mapping, validate: :string, deprecated: true - - - # Determines if local files used for temporary storage will be deleted - # after upload is successful - config :delete_temp_files, validate: :boolean, default: true - - # TODO: will be used to route events to many tables according to event properties - config :dynamic_event_routing, validate: :boolean, default: false - - # Specify how many files can be uploaded concurrently - config :upload_concurrent_count, validate: :number, default: 3 - - # Specify how many files can be kept in the upload queue before the main process - # starts processing them in the main thread (not healthy) - config :upload_queue_size, validate: :number, default: 30 - - # Host of the proxy , is an optional field. Can connect directly - config :proxy_host, validate: :string, required: false - - # Port where the proxy runs , defaults to 80. Usually a value like 3128 - config :proxy_port, validate: :number, required: false , default: 80 - - # Check Proxy URL can be over http or https. Dowe need it this way or ignore this & remove this - config :proxy_protocol, validate: :string, required: false , default: 'http' - - default :codec, 'json_lines' - - def register - require 'fileutils' # For mkdir_p - - @files = {} - @io_mutex = Mutex.new - - final_mapping = json_mapping - if final_mapping.nil? || final_mapping.empty? - final_mapping = mapping - end - - # TODO: add id to the tmp path to support multiple outputs of the same type. - # TODO: Fix final_mapping when dynamic routing is supported - # add fields from the meta that will note the destination of the events in the file - @path = if dynamic_event_routing - File.expand_path("#{path}.%{[@metadata][database]}.%{[@metadata][table]}.%{[@metadata][final_mapping]}") - else - File.expand_path("#{path}.#{database}.#{table}") - end - - validate_path - - @file_root = if path_with_field_ref? - extract_file_root - else - File.dirname(path) - end - @failure_path = File.join(@file_root, @filename_failure) - - executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, - max_threads: upload_concurrent_count, - max_queue: upload_queue_size, - fallback_policy: :caller_runs) - - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor) - - # send existing files - recover_past_files if recovery - - @last_stale_cleanup_cycle = Time.now - - @flush_interval = @flush_interval.to_i - if @flush_interval > 0 - @flusher = Interval.start(@flush_interval, -> { flush_pending_files }) - end - - if (@stale_cleanup_type == 'interval') && (@stale_cleanup_interval > 0) - @cleaner = Interval.start(stale_cleanup_interval, -> { close_stale_files }) - end - end - - private - def validate_path - if (root_directory =~ FIELD_REF) != nil - @logger.error('The starting part of the path should not be dynamic.', path: @path) - raise LogStash::ConfigurationError.new('The starting part of the path should not be dynamic.') - end - - if !path_with_field_ref? - @logger.error('Path should include some time related fields to allow for file rotation.', path: @path) - raise LogStash::ConfigurationError.new('Path should include some time related fields to allow for file rotation.') - end - end - - private - def root_directory - parts = @path.split(File::SEPARATOR).reject(&:empty?) - if Gem.win_platform? - # First part is the drive letter - parts[1] - else - parts.first - end - end - - public - def multi_receive_encoded(events_and_encoded) - encoded_by_path = Hash.new { |h, k| h[k] = [] } - - events_and_encoded.each do |event, encoded| - file_output_path = event_path(event) - encoded_by_path[file_output_path] << encoded - end - - @io_mutex.synchronize do - encoded_by_path.each do |path, chunks| - fd = open(path) - # append to the file - chunks.each { |chunk| fd.write(chunk) } - fd.flush unless @flusher && @flusher.alive? - end - - close_stale_files if @stale_cleanup_type == 'events' - end - end - - def close - @flusher.stop unless @flusher.nil? - @cleaner.stop unless @cleaner.nil? - @io_mutex.synchronize do - @logger.debug('Close: closing files') - - @files.each do |path, fd| - begin - fd.close - @logger.debug("Closed file #{path}", fd: fd) - - kusto_send_file(path) - rescue Exception => e - @logger.error('Exception while flushing and closing files.', exception: e) - end - end - end - - @ingestor.stop unless @ingestor.nil? - end - - private - def inside_file_root?(log_path) - target_file = File.expand_path(log_path) - return target_file.start_with?("#{@file_root}/") - end - - private - def event_path(event) - file_output_path = generate_filepath(event) - if path_with_field_ref? && !inside_file_root?(file_output_path) - @logger.warn('The event tried to write outside the files root, writing the event to the failure file', event: event, filename: @failure_path) - file_output_path = @failure_path - elsif !@create_if_deleted && deleted?(file_output_path) - file_output_path = @failure_path - end - @logger.debug('Writing event to tmp file.', filename: file_output_path) - - file_output_path - end - - private - def generate_filepath(event) - event.sprintf(@path) - end - - private - def path_with_field_ref? - path =~ FIELD_REF - end - - private - def extract_file_root - parts = File.expand_path(path).split(File::SEPARATOR) - parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR) - end - - # the back-bone of @flusher, our periodic-flushing interval. - private - def flush_pending_files - @io_mutex.synchronize do - @logger.debug('Starting flush cycle') - - @files.each do |path, fd| - @logger.debug('Flushing file', path: path, fd: fd) - fd.flush - end - end - rescue Exception => e - # squash exceptions caught while flushing after logging them - @logger.error('Exception flushing files', exception: e.message, backtrace: e.backtrace) - end - - # every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway) - private - def close_stale_files - now = Time.now - return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval - - @logger.debug('Starting stale files cleanup cycle', files: @files) - inactive_files = @files.select { |path, fd| not fd.active } - @logger.debug("#{inactive_files.count} stale files found", inactive_files: inactive_files) - inactive_files.each do |path, fd| - @logger.info("Closing file #{path}") - fd.close - @files.delete(path) - - kusto_send_file(path) - end - # mark all files as inactive, a call to write will mark them as active again - @files.each { |path, fd| fd.active = false } - @last_stale_cleanup_cycle = now - end - - private - def cached?(path) - @files.include?(path) && !@files[path].nil? - end - - private - def deleted?(path) - !File.exist?(path) - end - - private - def open(path) - return @files[path] if !deleted?(path) && cached?(path) - - if deleted?(path) - if @create_if_deleted - @logger.debug('Required file does not exist, creating it.', path: path) - @files.delete(path) - else - return @files[path] if cached?(path) - end - end - - @logger.info('Opening file', path: path) - - dir = File.dirname(path) - if !Dir.exist?(dir) - @logger.info('Creating directory', directory: dir) - if @dir_mode != -1 - FileUtils.mkdir_p(dir, mode: @dir_mode) - else - FileUtils.mkdir_p(dir) - end - end - - # work around a bug opening fifos (bug JRUBY-6280) - stat = begin - File.stat(path) - rescue - nil - end - fd = if stat && stat.ftype == 'fifo' && LogStash::Environment.jruby? - java.io.FileWriter.new(java.io.File.new(path)) - elsif @file_mode != -1 - File.new(path, 'a+', @file_mode) - else - File.new(path, 'a+') - end - # fd = if @file_mode != -1 - # File.new(path, 'a+', @file_mode) - # else - # File.new(path, 'a+') - # end - # end - @files[path] = IOWriter.new(fd) - end - - private - def kusto_send_file(file_path) - @ingestor.upload_async(file_path, delete_temp_files) - end - - private - def recover_past_files - require 'find' - - # we need to find the last "regular" part in the path before any dynamic vars - path_last_char = @path.length - 1 - - pattern_start = @path.index('%') || path_last_char - last_folder_before_pattern = @path.rindex('/', pattern_start) || path_last_char - new_path = path[0..last_folder_before_pattern] - - begin - return unless Dir.exist?(new_path) - @logger.info("Going to recover old files in path #{@new_path}") - - old_files = Find.find(new_path).select { |p| /.*\.#{database}\.#{table}$/ =~ p } - @logger.info("Found #{old_files.length} old file(s), sending them now...") - - old_files.each do |file| - kusto_send_file(file) - end - rescue Errno::ENOENT => e - @logger.warn('No such file or directory', exception: e.class, message: e.message, path: new_path, backtrace: e.backtrace) - end - end -end - -# wrapper class -class IOWriter - def initialize(io) - @io = io - end - - def write(*args) - @io.write(*args) - @active = true - end - - def flush - @io.flush - end - - def method_missing(method_name, *args, &block) - if @io.respond_to?(method_name) - - @io.send(method_name, *args, &block) - else - super - end - end - attr_accessor :active + config_name 'kusto' + concurrency :shared + + FIELD_REF = /%\{[^}]+\}/ + + attr_reader :failure_path + + # The Kusto endpoint for ingestion related communication. You can see it on the Azure Portal. + config :ingest_url, validate: :string, required: true + # The following are the credentials used to connect to the Kusto service + # application id + config :app_id, validate: :string, required: false + # application key (secret) + config :app_key, validate: :password, required: false + # aad tenant id + config :app_tenant, validate: :string, default: nil + # managed identity id + config :managed_identity, validate: :string, default: nil + # CLI credentials for dev-test + config :cli_auth, validate: :boolean, default: false + # The following are the data settings that impact where events are written to + # Database name + config :database, validate: :string, required: true + # Target table name + config :table, validate: :string, required: true + # Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table. + # Note that this must be in JSON format, as this is the interface between Logstash and Kusto + # Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings + config :json_mapping, validate: :string, default: nil + + # Mapping name - deprecated, use json_mapping + config :mapping, validate: :string, deprecated: true + + + # TODO: will be used to route events to many tables according to event properties + config :dynamic_event_routing, validate: :boolean, default: false + + # Specify how many files can be uploaded concurrently + config :upload_concurrent_count, validate: :number, default: 3 + + # Specify how many files can be kept in the upload queue before the main process + # starts processing them in the main thread (not healthy) + config :upload_queue_size, validate: :number, default: 30 + + # Host of the proxy , is an optional field. Can connect directly + config :proxy_host, validate: :string, required: false + + # Port where the proxy runs , defaults to 80. Usually a value like 3128 + config :proxy_port, validate: :number, required: false , default: 80 + + # Check Proxy URL can be over http or https. Do we need it this way or ignore this & remove this + config :proxy_protocol, validate: :string, required: false , default: 'https' + # Maximum size of the buffer before it gets flushed, in MB. Defaults to 10. + config :max_batch_size, validate: :number, required: false, default: 10 + + # Interval (in seconds) before the buffer gets flushed, regardless of size. Defaults to 10. + config :plugin_flush_interval, validate: :number, required: false, default: 10 + + # Maximum number of items in the buffer before it gets flushed. Defaults to 1000. + config :max_items, validate: :number, required: false, default: 1000 + + # Process failed batches on startup. Defaults to false. + config :process_failed_batches_on_startup, validate: :boolean, required: false, default: false + + # Directory to store the failed batches that were not uploaded to Kusto. If the directory does not exist, it will be created, defaults to a temporary directory called "logstash_backout" in Dir.tmpdir + config :failed_dir_name, validate: :string, required: false, default: nil + + default :codec, 'json_lines' + + def register + dir = failed_dir_name.nil? || failed_dir_name.empty? ? ::File.join(Dir.tmpdir, "logstash_backout") : failed_dir_name + @file_persistence = LogStash::Outputs::KustoOutputInternal::FilePersistence.new(dir, @logger) + + kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, json_mapping) + kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth) + kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false) + kusto_flush_config = LogStash::Outputs::KustoInternal::KustoFlushConfiguration.new(max_items, plugin_flush_interval, max_batch_size, process_failed_batches_on_startup) + kusto_upload_config = LogStash::Outputs::KustoInternal::KustoUploadConfiguration.new(upload_concurrent_count, upload_queue_size) + kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, kusto_flush_config, kusto_upload_config, @logger, @file_persistence) + kusto_logstash_configuration.validate_config + # Initialize the custom buffer with size and interval + @buffer = LogStash::Outputs::KustoOutputInternal::LogStashEventsBatcher.new(kusto_logstash_configuration, @logger) + end + + + public + def multi_receive_encoded(events_and_encoded) + events_and_encoded.each do |event, encoded| + begin + @buffer.batch_event(event.to_hash) + rescue => e + @logger.error("Error processing event: #{e.message}") + end + end + end + + def close + @logger.info("Closing Kusto output plugin") + begin + @buffer.shutdown unless @buffer.nil? + @logger.info("Buffer shutdown") unless @buffer.nil? + rescue => e + @logger.error("Error shutting down buffer: #{e.message}") + @logger.error(e.backtrace.join("\n")) + end + begin + @ingestor.stop unless @ingestor.nil? + @logger.info("Ingestor stopped") unless @ingestor.nil? + rescue => e + @logger.error("Error stopping ingestor: #{e.message}") + @logger.error(e.backtrace.join("\n")) + end + @logger.info("Kusto output plugin Closed") + end end diff --git a/lib/logstash/outputs/kusto/customSizeBasedBuffer.rb b/lib/logstash/outputs/kusto/customSizeBasedBuffer.rb new file mode 100644 index 00000000..3a3e4dad --- /dev/null +++ b/lib/logstash/outputs/kusto/customSizeBasedBuffer.rb @@ -0,0 +1,379 @@ + # This code is from a PR for the official repo of ruby-stud + # with a small change to calculating the event size in the var_size function + # https://github.com/jordansissel/ruby-stud/pull/19 + # + # @author {Alex Dean}[http://github.com/alexdean] + # + # Implements a generic framework for accepting events which are later flushed + # in batches. Flushing occurs whenever +:max_items+ or +:max_interval+ (seconds) + # has been reached or if the event size outgrows +:flush_each+ (bytes) + # + # Including class must implement +flush+, which will be called with all + # accumulated items either when the output buffer fills (+:max_items+ or + # +:flush_each+) or when a fixed amount of time (+:max_interval+) passes. + # + # == batch_receive and flush + # General receive/flush can be implemented in one of two ways. + # + # === batch_receive(event) / flush(events) + # +flush+ will receive an array of events which were passed to +buffer_receive+. + # + # batch_receive('one') + # batch_receive('two') + # + # will cause a flush invocation like + # + # flush(['one', 'two']) + # + # === batch_receive(event, group) / flush(events, group) + # flush() will receive an array of events, plus a grouping key. + # + # batch_receive('one', :server => 'a') + # batch_receive('two', :server => 'b') + # batch_receive('three', :server => 'a') + # batch_receive('four', :server => 'b') + # + # will result in the following flush calls + # + # flush(['one', 'three'], {:server => 'a'}) + # flush(['two', 'four'], {:server => 'b'}) + # + # Grouping keys can be anything which are valid Hash keys. (They don't have to + # be hashes themselves.) Strings or Fixnums work fine. Use anything which you'd + # like to receive in your +flush+ method to help enable different handling for + # various groups of events. + # + # == on_flush_error + # Including class may implement +on_flush_error+, which will be called with an + # Exception instance whenever buffer_flush encounters an error. + # + # * +buffer_flush+ will automatically re-try failed flushes, so +on_flush_error+ + # should not try to implement retry behavior. + # * Exceptions occurring within +on_flush_error+ are not handled by + # +buffer_flush+. + # + # == on_full_buffer_receive + # Including class may implement +on_full_buffer_receive+, which will be called + # whenever +buffer_receive+ is called while the buffer is full. + # + # +on_full_buffer_receive+ will receive a Hash like {:pending => 30, + # :outgoing => 20} which describes the internal state of the module at + # the moment. + # + # == final flush + # Including class should call buffer_flush(:final => true) + # during a teardown/shutdown routine (after the last call to buffer_receive) + # to ensure that all accumulated messages are flushed. + module LogStash; module Outputs; class KustoOutputInternal + module CustomSizeBasedBuffer + + public + # Initialize the buffer. + # + # Call directly from your constructor if you wish to set some non-default + # options. Otherwise buffer_initialize will be called automatically during the + # first buffer_receive call. + # + # Options: + # * :max_items, Max number of items to buffer before flushing. Default 50. + # * :flush_each, Flush each bytes of buffer. Default 0 (no flushing fired by + # a buffer size). + # * :max_interval, Max number of seconds to wait between flushes. Default 5. + # * :logger, A logger to write log messages to. No default. Optional. + # + # @param [Hash] options + def buffer_initialize(options={}) + if ! self.class.method_defined?(:flush) + raise ArgumentError, "Any class including Stud::Buffer must define a flush() method." + end + @file_persistence = options[:file_persistence] + + @buffer_config = { + :max_items => options[:max_items] || 50, + :flush_each => options[:flush_each].to_i || 0, + :max_interval => options[:max_interval] || 5, + :logger => options[:logger] || Logger.new(STDOUT), + :process_failed_batches_on_startup => options[:process_failed_batches_on_startup] || false, + :has_on_flush_error => self.class.method_defined?(:on_flush_error), + :has_on_full_buffer_receive => self.class.method_defined?(:on_full_buffer_receive) + } + + @shutdown = false + + @buffer_state = { + # items accepted from including class + :pending_items => {}, + :pending_count => 0, + :pending_size => 0, + + # guard access to pending_items & pending_count & pending_size + :pending_mutex => Mutex.new, + + # items which are currently being flushed + :outgoing_items => {}, + :outgoing_count => 0, + :outgoing_size => 0, + + # ensure only 1 flush is operating at once + :flush_mutex => Mutex.new, + + # data for timed flushes + :last_flush => Time.now.to_i, + :timer => Thread.new do + loop do + break if @shutdown + sleep(@buffer_config[:max_interval]) + buffer_flush(:force => true) + end + end + } + + # events we've accumulated + buffer_clear_pending + process_failed_batches if options[:process_failed_batches_on_startup] + + end + + # Determine if +:max_items+ or +:flush_each+ has been reached. + # + # buffer_receive calls will block while buffer_full? == true. + # + # @return [bool] Is the buffer full? + def buffer_full? + + c1 = (@buffer_state[:pending_count] + @buffer_state[:outgoing_count] >= @buffer_config[:max_items]) + c2 = (@buffer_config[:flush_each] != 0 && @buffer_state[:pending_size] + @buffer_state[:outgoing_size] >= @buffer_config[:flush_each]) + + if c1 || c2 + @buffer_config[:logger].debug("---------------Entering buffer_full?-----------------") + end + + + if c1 + @buffer_config[:logger].debug("Buffer is full: max_items reached") + @buffer_config[:logger].debug("Pending count: #{@buffer_state[:pending_count]}") + @buffer_config[:logger].debug("Outgoing count: #{@buffer_state[:outgoing_count]}") + @buffer_config[:logger].debug("Pending count: #{@buffer_config[:max_items]}") + end + if c2 + @buffer_config[:logger].debug("Buffer is full: flush_each reached") + @buffer_config[:logger].debug("Pending size: #{@buffer_state[:pending_size]}") + @buffer_config[:logger].debug("Outgoing size: #{@buffer_state[:outgoing_size]}") + @buffer_config[:logger].debug("Flush each: #{@buffer_config[:flush_each]}") + @buffer_config[:logger].debug("Max items: #{@buffer_config[:max_items]}") + end + + if c1 || c2 + @buffer_config[:logger].debug("---------------Exiting buffer_full?-----------------") + end + + (@buffer_state[:pending_count] + @buffer_state[:outgoing_count] >= @buffer_config[:max_items]) || \ + (@buffer_config[:flush_each] != 0 && @buffer_state[:pending_size] + @buffer_state[:outgoing_size] >= @buffer_config[:flush_each]) + end + + # Save an event for later delivery + # + # Events are grouped by the (optional) group parameter you provide. + # Groups of events, plus the group name, are later passed to +flush+. + # + # This call will block if +:max_items+ or +:flush_each+ has been reached. + # + # @see Stud::Buffer The overview has more information on grouping and flushing. + # + # @param event An item to buffer for flushing later. + # @param group Optional grouping key. All events with the same key will be + # passed to +flush+ together, along with the grouping key itself. + def buffer_receive(event, group=nil) + buffer_initialize if ! @buffer_state + # block if we've accumulated too many events + while buffer_full? do + on_full_buffer_receive( + :pending => @buffer_state[:pending_count], + :outgoing => @buffer_state[:outgoing_count] + ) if @buffer_config[:has_on_full_buffer_receive] + sleep 0.1 + end + @buffer_state[:pending_mutex].synchronize do + @buffer_state[:pending_items][group] << event + @buffer_state[:pending_count] += 1 + @buffer_state[:pending_size] += var_size(event) if @buffer_config[:flush_each] != 0 + end + buffer_flush + end + + # Try to flush events. + # + # Returns immediately if flushing is not necessary/possible at the moment: + # * :max_items or :flush_each have not been accumulated + # * :max_interval seconds have not elapased since the last flush + # * another flush is in progress + # + # buffer_flush(:force => true) will cause a flush to occur even + # if +:max_items+ or +:flush_each+ or +:max_interval+ have not been reached. A forced flush + # will still return immediately (without flushing) if another flush is + # currently in progress. + # + # buffer_flush(:final => true) is identical to buffer_flush(:force => true), + # except that if another flush is already in progress, buffer_flush(:final => true) + # will block/wait for the other flush to finish before proceeding. + # + # @param [Hash] options Optional. May be {:force => true} or {:final => true}. + # @return [Fixnum] The number of items successfully passed to +flush+. + def buffer_flush(options={}) + force = options[:force] || options[:final] + final = options[:final] + + # final flush will wait for lock, so we are sure to flush out all buffered events + if options[:final] + @buffer_state[:flush_mutex].lock + elsif ! @buffer_state[:flush_mutex].try_lock # failed to get lock, another flush already in progress + return 0 + end + + items_flushed = 0 + + begin + return 0 if @buffer_state[:pending_count] == 0 + + # compute time_since_last_flush only when some item is pending + time_since_last_flush = get_time_since_last_flush + + return 0 if (!force) && + (@buffer_state[:pending_count] < @buffer_config[:max_items]) && + (@buffer_config[:flush_each] == 0 || @buffer_state[:pending_size] < @buffer_config[:flush_each]) && + (time_since_last_flush < @buffer_config[:max_interval]) + + @buffer_state[:pending_mutex].synchronize do + @buffer_state[:outgoing_items] = @buffer_state[:pending_items] + @buffer_state[:outgoing_count] = @buffer_state[:pending_count] + @buffer_state[:outgoing_size] = @buffer_state[:pending_size] + buffer_clear_pending + end + @buffer_config[:logger].info("Flushing output", + :outgoing_count => @buffer_state[:outgoing_count], + :time_since_last_flush => time_since_last_flush, + :outgoing_events_count => @buffer_state[:outgoing_items].length, + :batch_timeout => @buffer_config[:max_interval], + :force => force, + :final => final + ) if @buffer_config[:logger] + + @buffer_state[:outgoing_items].each do |group, events| + begin + + if group.nil? + flush(events,final) + else + flush(events, group, final) + end + + @buffer_state[:outgoing_items].delete(group) + events_size = events.size + @buffer_state[:outgoing_count] -= events_size + if @buffer_config[:flush_each] != 0 + events_volume = 0 + events.each do |event| + events_volume += var_size(event) + end + @buffer_state[:outgoing_size] -= events_volume + end + items_flushed += events_size + @buffer_state[:last_flush] = Time.now.to_i + rescue => e + @buffer_config[:logger].warn("Unexpected error during flush: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}") + next + end + end + + ensure + @buffer_state[:flush_mutex].unlock + end + + return items_flushed + end + + def process_failed_batches + process_orphaned_processing_files + process_new_json_files + end + + def process_orphaned_processing_files + Dir.glob(::File.join(@file_persistence.failed_dir, "failed_batch_*.json.processing")).each do |processing_file| + process_failed_batch_file(processing_file) + sleep(0.1) + end + end + + def process_new_json_files + Dir.glob(::File.join(@file_persistence.failed_dir, "failed_batch_*.json")).each do |file| + processing_file = file + ".processing" + begin + ::File.rename(file, processing_file) + process_failed_batch_file(processing_file) + rescue Errno::ENOENT, Errno::EACCES + # File already claimed or deleted, skip + next + end + sleep(0.1) + end + end + + def process_failed_batch_file(processing_file) + begin + batch = JSON.load(::File.read(processing_file)) + @buffer_state[:flush_mutex].lock + begin + flush(batch, true) + @file_persistence.delete_batch(processing_file) + @buffer_config[:logger].info("Successfully flushed and deleted failed batch file: #{processing_file}") if @buffer_config[:logger] + rescue => e + @buffer_config[:logger].warn("Failed to flush persisted batch: #{e.message}") if @buffer_config[:logger] + ensure + @buffer_state[:flush_mutex].unlock + end + rescue Errno::ENOENT + @buffer_config[:logger].warn("Batch file #{processing_file} was not found when attempting to read. It may have been deleted by another process.") if @buffer_config[:logger] + rescue => e + @buffer_config[:logger].warn("Failed to load batch file #{processing_file}: #{e.message}. Moving to quarantine.") if @buffer_config[:logger] + begin + quarantine_dir = File.join(@file_persistence.failed_dir, "quarantine") + FileUtils.mkdir_p(quarantine_dir) unless Dir.exist?(quarantine_dir) + FileUtils.mv(processing_file, quarantine_dir) + rescue => del_err + @buffer_config[:logger].warn("Failed to move corrupted batch file #{processing_file} to quarantine: #{del_err.message}") if @buffer_config[:logger] + end + end + end + + def shutdown + # Graceful shutdown of timer thread + if @buffer_state && @buffer_state[:timer] + @shutdown = true + @buffer_state[:timer].join + @buffer_state[:timer] = nil + end + # Final flush of any remaining in-memory events + buffer_flush(:final => true) if @buffer_state + end + + private + def buffer_clear_pending + @buffer_state[:pending_items] = Hash.new { |h, k| h[k] = [] } + @buffer_state[:pending_count] = 0 + @buffer_state[:pending_size] = 0 + end + + private + def var_size(var) + # Calculate event size as a json. + # assuming event is a hash + return var.to_json.bytesize + 2 + end + + protected + def get_time_since_last_flush + Time.now.to_i - @buffer_state[:last_flush] + end + + end +end ;end ;end \ No newline at end of file diff --git a/lib/logstash/outputs/kusto/filePersistence.rb b/lib/logstash/outputs/kusto/filePersistence.rb new file mode 100644 index 00000000..05c20178 --- /dev/null +++ b/lib/logstash/outputs/kusto/filePersistence.rb @@ -0,0 +1,64 @@ +require 'securerandom' +require 'json' +require 'fileutils' +require 'tmpdir' +require 'thread' + +module LogStash; module Outputs; class KustoOutputInternal + class FilePersistence + attr_reader :failed_dir + + def initialize(dir = nil, logger = nil) + @failed_dir = dir || ::File.join(Dir.tmpdir, "logstash_backout") + begin + ::FileUtils.mkdir_p(@failed_dir) unless Dir.exist?(@failed_dir) + rescue => e + logger&.fatal("Failed to create backup directory #{@failed_dir}: #{e.message}") + raise + end + @logger = logger + @write_mutex = Mutex.new + @logger&.info("Backup file directory for failed batches: #{::File.expand_path(@failed_dir)}") + end + + def persist_batch(batch, max_retries = 3) + attempts = 0 + begin + @write_mutex.synchronize do + tmpfile = ::File.join(@failed_dir, "tmp_#{SecureRandom.uuid}.json") + filename = ::File.join(@failed_dir, "failed_batch_#{Time.now.to_i}_#{SecureRandom.uuid}.json") + begin + ::File.write(tmpfile, JSON.dump(batch)) + ::File.rename(tmpfile, filename) + @logger&.info("Persisted failed batch to #{filename}") + return # Success! + rescue => e + @logger&.error("Failed to persist batch to #{filename}: #{e.message}") + begin + ::File.delete(tmpfile) if ::File.exist?(tmpfile) + rescue + # Ignore cleanup errors + end + raise + end + end + rescue => e + attempts += 1 + if attempts < max_retries + sleep 0.1 * (2 ** (attempts - 1)) # Exponential backoff + retry + else + @logger&.fatal("Failed to persist batch after #{attempts} attempts. Data loss may occur: #{e.message}") + end + end + end + + def delete_batch(file) + begin + ::File.delete(file) if ::File.exist?(file) + rescue => e + @logger&.warn("Failed to delete batch file #{file}: #{e.message}") + end + end + end +end; end; end \ No newline at end of file diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index d9b5d836..74411b4b 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -3,58 +3,57 @@ require 'logstash/outputs/base' require 'logstash/namespace' require 'logstash/errors' +require 'concurrent' +require 'json' -class LogStash::Outputs::Kusto < LogStash::Outputs::Base +module LogStash; module Outputs; class KustoOutputInternal ## # This handles the overall logic and communication with Kusto # class Ingestor require 'logstash-output-kusto_jars' RETRY_DELAY_SECONDS = 3 - DEFAULT_THREADPOOL = Concurrent::ThreadPoolExecutor.new( - min_threads: 1, - max_threads: 8, - max_queue: 1, - fallback_policy: :caller_runs - ) - LOW_QUEUE_LENGTH = 3 + FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) - @workers_pool = threadpool + def initialize(kusto_logstash_configuration, logger) + @kusto_logstash_configuration = kusto_logstash_configuration @logger = logger - validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth) + @file_persistence = kusto_logstash_configuration.file_persistence + @workers_pool = Concurrent::ThreadPoolExecutor.new(min_threads: 1, + max_threads: kusto_logstash_configuration.kusto_upload_config.upload_concurrent_count, + max_queue: kusto_logstash_configuration.kusto_upload_config.upload_queue_size, + fallback_policy: :caller_runs + ) + #Validate and assign @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http - # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) - # If there is managed identity, use it. This means the AppId and AppKey are empty/nil - # If there is CLI Auth, use that instead of managed identity - is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth) + + is_managed_identity = @kusto_logstash_configuration.kusto_auth.is_managed_identity # If it is system managed identity, propagate the system identity - is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) + is_system_assigned_managed_identity = @kusto_logstash_configuration.kusto_auth.is_system_assigned_managed_identity # Is it direct connection - is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + is_direct_conn = @kusto_logstash_configuration.kusto_proxy.is_direct_conn # Create a connection string kusto_connection_string = if is_managed_identity if is_system_assigned_managed_identity @logger.info('Using system managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url) else @logger.info('Using user managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_ingest.managed_identity_id) end else - if cli_auth + if @kusto_logstash_configuration.kusto_auth.cli_auth @logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*') - kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(@kusto_logstash_configuration.kusto_ingest.ingest_url) else @logger.info('Using app id and app key.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_auth.app_id, @kusto_logstash_configuration.kusto_auth.app_key.value, @kusto_logstash_configuration.kusto_auth.app_tenant) end end - # @logger.debug(Gem.loaded_specs.to_s) # Unfortunately there's no way to avoid using the gem/plugin name directly... name_for_tracing = "logstash-output-kusto:#{Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"}" @@ -63,120 +62,66 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli tuple_utils = Java::org.apache.commons.lang3.tuple # kusto_connection_string.setClientVersionForTracing(name_for_tracing) version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" - kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); + kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,name_for_tracing.to_s,version_for_tracing.to_s,false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin if is_direct_conn kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else - kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build() + kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kusto_logstash_configuration.kusto_proxy.proxy_host,@kusto_logstash_configuration.kusto_proxy.proxy_port,@kusto_logstash_configuration.kusto_proxy.proxy_protocol)).build() kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties) end end - @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table) - is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) - if is_mapping_ref_provided - @logger.debug('Using mapping reference.', json_mapping) - @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) - @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) + @ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table) + if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided + @logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping) + @ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::MULTIJSON) else @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') - @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) + @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::MULTIJSON) end - @delete_local = delete_local @logger.debug('Kusto resources are ready.') end - def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth) - # Add an additional validation and fail this upfront - if app_id.nil? && app_key.nil? && managed_identity_id.nil? - if cli_auth - @logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production') - else - @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') - raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') + def upload(data) + data_size = data.size + @logger.info("Ingesting #{data_size} rows to database: #{@ingestion_properties.getDatabaseName} table: #{@ingestion_properties.getTableName}") + if data_size > 0 + #ingestion_status_futures = Concurrent::Future.execute(executor: @workers_pool) do + exceptions = Concurrent::Array.new + promise = Concurrent::Promises.future { + in_bytes = java.io.ByteArrayInputStream.new(data.to_json.to_java_bytes) + data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(in_bytes) + ingest_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) + #@logger.info("Ingestion result: #{ingest_result}") + } + .rescue{ |e| + @logger.error("Ingestion failed: #{e.message}") + @logger.error("Ingestion failed: #{e.backtrace.join("\n")}") + @file_persistence.persist_batch(data) + } + .on_resolution do |fulfilled, value, reason, *args| + @logger.debug("Future fulfilled: #{fulfilled}, value: #{value}, reason: #{reason}, args: #{args}, class: #{value.class}") + if value.class == Java::ComMicrosoftAzureKustoIngestResult::IngestionStatusResult + isc = value.getIngestionStatusCollection()&.get(0)&.getStatus() + @logger.info("Ingestion status: #{isc}") + else + @logger.warn("Ingestion status is non success status: #{value.class} - #{value}") + end + if exceptions.size > 0 + @logger.error("Ingestion failed with exceptions: #{exceptions.map(&:message).join(', ')}") + end end - end - if database =~ FIELD_REF - @logger.error('database config value should not be dynamic.', database) - raise LogStash::ConfigurationError.new('database config value should not be dynamic.') - end - - if table =~ FIELD_REF - @logger.error('table config value should not be dynamic.', table) - raise LogStash::ConfigurationError.new('table config value should not be dynamic.') - end - - if json_mapping =~ FIELD_REF - @logger.error('json_mapping config value should not be dynamic.', json_mapping) - raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') - end - - if not(["https", "http"].include? proxy_protocol) - @logger.error('proxy_protocol has to be http or https.', proxy_protocol) - raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') - end - - end - - def upload_async(path, delete_on_success) - if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH - @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") - end - - @workers_pool.post do - LogStash::Util.set_thread_name("Kusto to ingest file: #{path}") - upload(path, delete_on_success) - end - rescue Exception => e - @logger.error('StandardError.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace) - raise e - end - - def upload(path, delete_on_success) - file_size = File.size(path) - @logger.debug("Sending file to kusto: #{path}. size: #{file_size}") - - # TODO: dynamic routing - # file_metadata = path.partition('.kusto.').last - # file_metadata_parts = file_metadata.split('.') - - # if file_metadata_parts.length == 3 - # # this is the number we expect - database, table, json_mapping - # database = file_metadata_parts[0] - # table = file_metadata_parts[1] - # json_mapping = file_metadata_parts[2] - - # local_ingestion_properties = Java::KustoIngestionProperties.new(database, table) - # local_ingestion_properties.addJsonMappingName(json_mapping) - # end - - if file_size > 0 - file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file - @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) else - @logger.warn("File #{path} is an empty file and is not ingested.") - end - File.delete(path) if delete_on_success - @logger.debug("File #{path} sent to kusto.") - rescue Errno::ENOENT => e - @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace) - rescue Java::JavaNioFile::NoSuchFileException => e - @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace) - rescue => e - # When the retry limit is reached or another error happen we will wait and retry. - # - # Thread might be stuck here, but I think its better than losing anything - # its either a transient errors or something bad really happened. - @logger.error('Uploading failed, retrying.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace) - sleep RETRY_DELAY_SECONDS - retry - end + @logger.warn("Data is empty and is not ingested.") + end # if data.size > 0 + end # def upload def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done - end - end -end + end # def stop + end # class Ingestor +end; end; end # module LogStash::Outputs::KustoOutputInternal diff --git a/lib/logstash/outputs/kusto/interval.rb b/lib/logstash/outputs/kusto/interval.rb deleted file mode 100755 index 33046309..00000000 --- a/lib/logstash/outputs/kusto/interval.rb +++ /dev/null @@ -1,81 +0,0 @@ -# encoding: utf-8 - -require 'logstash/outputs/base' -require 'logstash/namespace' -require 'logstash/errors' - -class LogStash::Outputs::Kusto < LogStash::Outputs::Base - ## - # Bare-bones utility for running a block of code at an interval. - # - class Interval - ## - # Initializes a new Interval with the given arguments and starts it - # before returning it. - # - # @param interval [Integer] (see: Interval#initialize) - # @param procsy [#call] (see: Interval#initialize) - # - # @return [Interval] - # - def self.start(interval, procsy) - new(interval, procsy).tap(&:start) - end - - ## - # @param interval [Integer]: time in seconds to wait between calling the given proc - # @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions. - def initialize(interval, procsy) - @interval = interval - @procsy = procsy - - # Mutex, ConditionVariable, etc. - @mutex = Mutex.new - @sleeper = ConditionVariable.new - end - - ## - # Starts the interval, or returns if it has already been started. - # - # @return [void] - def start - @mutex.synchronize do - return if @thread && @thread.alive? - - @thread = Thread.new { run } - end - end - - ## - # Stop the interval. - # Does not interrupt if execution is in-progress. - def stop - @mutex.synchronize do - @stopped = true - end - - @thread && @thread.join - end - - ## - # @return [Boolean] - def alive? - @thread && @thread.alive? - end - - private - - def run - @mutex.synchronize do - loop do - @sleeper.wait(@mutex, @interval) - break if @stopped - - @procsy.call - end - end - ensure - @sleeper.broadcast - end - end -end diff --git a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb new file mode 100644 index 00000000..72eac768 --- /dev/null +++ b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb @@ -0,0 +1,204 @@ +# encoding: utf-8 +# A class just having all the configurations wrapped into a seperate object +module LogStash + module Outputs + module KustoInternal + class KustoLogstashConfiguration + FIELD_REF = /%\{[^}]+\}/ + def initialize(kusto_ingest, kusto_auth, kusto_proxy, kusto_flush_config, kusto_upload_config, logger, file_persistence) + @logger = logger + @kusto_ingest = kusto_ingest + @kusto_auth = kusto_auth + @kusto_proxy = kusto_proxy + @kusto_flush_config = kusto_flush_config + @kusto_upload_config = kusto_upload_config + @file_persistence = file_persistence + @logger.info("Kusto configuration initialized.") + end # def initialize + + # Configuration + def kusto_ingest + @kusto_ingest + end + def kusto_auth + @kusto_auth + end + def kusto_proxy + @kusto_proxy + end + def kusto_flush_config + @kusto_flush_config + end + def kusto_upload_config + @kusto_upload_config + end + def file_persistence + @file_persistence + end + def validate_config() + # Add an additional validation and fail this upfront + if @kusto_auth.app_id.to_s.empty? && @kusto_auth.managed_identity_id.to_s.empty? && !@kusto_auth.cli_auth + @logger.error('managed_identity_id is not provided, cli_auth is false and app_id/app_key is empty.') + raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') + end + # If proxy AAD is required and the proxy configuration is not provided - fail + if @kusto_proxy.proxy_aad_only && @kusto_proxy.is_direct_conn + @logger.error('proxy_aad_only can be used only when proxy is configured.', @kusto_proxy.proxy_aad_only) + raise LogStash::ConfigurationError.new('proxy_aad_only can be used only when proxy is configured.') + end + + if @kusto_ingest.database =~ FIELD_REF + @logger.error('database config value should not be dynamic.', @kusto_ingest.database) + raise LogStash::ConfigurationError.new('database config value should not be dynamic.') + end + if @kusto_ingest.table =~ FIELD_REF + @logger.error('table config value should not be dynamic.', @kusto_ingest.table) + raise LogStash::ConfigurationError.new('table config value should not be dynamic.') + end + if @kusto_ingest.json_mapping =~ FIELD_REF + @logger.error('json_mapping config value should not be dynamic.', @kusto_ingest.json_mapping) + raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') + end + if not(["https", "http"].include? @kusto_proxy.proxy_protocol) + @logger.error('proxy_protocol has to be http or https.', @kusto_proxy.proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + end + + if @kusto_proxy.proxy_aad_only && @kusto_proxy.is_direct_conn + @logger.error('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.',@kusto_proxy.proxy_host,@kusto_proxy.proxy_port,@kusto_proxy.proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.') + end + # If all validation pass then configuration is valid + return true + end #validate_config() + + end # class KustoLogstashConfiguration + class KustoAuthConfiguration + def initialize(app_id, app_key, app_tenant, managed_identity_id, cli_auth) + @app_id = app_id + @app_key = app_key + @app_tenant = app_tenant + @managed_identity_id = managed_identity_id + @cli_auth = cli_auth + @is_managed_identity = app_id.to_s.empty? && app_key.to_s.empty? && !cli_auth + @is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(kusto_auth.managed_identity_id) + end + # Authentication configuration + def app_id + @app_id + end + def app_key + @app_key + end + def app_tenant + @app_tenant + end + def managed_identity_id + @managed_identity_id + end + def is_managed_identity + @is_managed_identity + end + def cli_auth + @cli_auth + end + def is_system_assigned_managed_identity + @is_system_assigned_managed_identity + end + end # class KustoAuthConfiguration + class KustoProxyConfiguration + def initialize(proxy_host , proxy_port , proxy_protocol, proxy_aad_only) + @proxy_host = proxy_host + @proxy_port = proxy_port + @proxy_protocol = proxy_protocol + @proxy_aad_only = proxy_aad_only + # Is it direct connection + @is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + end + # proxy configuration + def proxy_host + @proxy_host + end + + def proxy_port + @proxy_port + end + + def proxy_protocol + @proxy_protocol + end + + def proxy_aad_only + @proxy_aad_only + end + + def is_direct_conn + @is_direct_conn + end + end # class KustoProxyConfiguration + class KustoIngestConfiguration + def initialize(ingest_url, database, table, json_mapping) + @ingest_url = ingest_url + @database = database + @table = table + @json_mapping = json_mapping + @is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) + end + # For ingestion + def ingest_url + @ingest_url + end + def database + @database + end + def table + @table + end + def json_mapping + @json_mapping + end + def is_mapping_ref_provided + @is_mapping_ref_provided + end + end # class KustoIngestionConfiguration + class KustoFlushConfiguration + def initialize(max_items, plugin_flush_interval, max_batch_size, process_failed_batches_on_startup) + @max_items = max_items + @plugin_flush_interval = plugin_flush_interval + @max_batch_size = max_batch_size + @flush_each = flush_each + @process_failed_batches_on_startup = process_failed_batches_on_startup + end + # Flush configuration + def max_items + @max_items + end + def plugin_flush_interval + @plugin_flush_interval + end + def max_batch_size + @max_batch_size + end + def flush_each + @flush_each + end + def process_failed_batches_on_startup + @process_failed_batches_on_startup + end + end # class KustoFlushConfiguration + class KustoUploadConfiguration + def initialize(upload_concurrent_count, upload_queue_size) + @upload_concurrent_count = upload_concurrent_count + @upload_queue_size = upload_queue_size + end + # Upload configuration + def upload_concurrent_count + @upload_concurrent_count + end + def upload_queue_size + @upload_queue_size + end + end # class KustoUploadConfiguration + end # module KustoInternal + end # module Outputs +end # module LogStash diff --git a/lib/logstash/outputs/kusto/logStashFlushBuffer.rb b/lib/logstash/outputs/kusto/logStashFlushBuffer.rb new file mode 100644 index 00000000..0857b80e --- /dev/null +++ b/lib/logstash/outputs/kusto/logStashFlushBuffer.rb @@ -0,0 +1,62 @@ +# encoding: utf-8 + +require "logstash/outputs/kusto/kustoLogstashConfiguration" +require "logstash/outputs/kusto/customSizeBasedBuffer" +require "logstash/outputs/kusto/ingestor" +require "logger" + +module LogStash; module Outputs; class KustoOutputInternal +class LogStashEventsBatcher + include CustomSizeBasedBuffer + def initialize(kusto_logstash_configuration, logger) + logger.info("Initializing LogStashEventsBatcher") + # Initialize the buffer with the configuration + # The buffer is a custom buffer that extends the LogStash::Outputs::Base#buffer_initialize + # It is used to buffer the events before sending them to Kusto + @kusto_logstash_configuration = kusto_logstash_configuration + @logger = logger + @ingestor = LogStash::Outputs::KustoOutputInternal::Ingestor.new(@kusto_logstash_configuration, logger) + + logger.info("Initializing buffer with max_items: #{kusto_logstash_configuration.kusto_flush_config.max_items}, max_interval: #{kusto_logstash_configuration.kusto_flush_config.plugin_flush_interval}, flush_each: #{kusto_logstash_configuration.kusto_flush_config.max_batch_size}") + buffer_initialize( + :max_items => kusto_logstash_configuration.kusto_flush_config.max_items, + :max_interval => kusto_logstash_configuration.kusto_flush_config.plugin_flush_interval, + :logger => logger, + #todo: There is a small discrepancy between the total size of the documents and the message body + :flush_each => kusto_logstash_configuration.kusto_flush_config.max_batch_size, + :process_failed_batches_on_startup => kusto_logstash_configuration.kusto_flush_config.process_failed_batches_on_startup, + :file_persistence => kusto_logstash_configuration.file_persistence + ) + end # initialize + + # Public methods + public + + # Adding an event document into the buffer + def batch_event(event_document) + buffer_receive(event_document) + end # def batch_event + + # Flushing all buffer content to Kusto. + # Called from Stud::Buffer#buffer_flush when there are events to flush + def flush (documents, close=false) + # Skip in case there are no candidate documents to deliver + if documents.length < 1 + @logger.warn("No documents in batch in the batch. Skipping") + return + end + @logger.info("Uploading batch of documents to Kusto #{documents.length} documents") + begin + @ingestor.upload(documents) + rescue => e + @logger.error("Error uploading batch to Kusto: #{e.message}") + end + end # def flush + + def close + @logger.info("Closing LogStashEventsBatcher...") + shutdown + end + +end # LogStashAutoResizeBuffer +end ;end ;end \ No newline at end of file diff --git a/logstash-output-kusto.gemspec b/logstash-output-kusto.gemspec index dd698322..d383f291 100755 --- a/logstash-output-kusto.gemspec +++ b/logstash-output-kusto.gemspec @@ -20,7 +20,7 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } # Gem dependencies - s.add_runtime_dependency 'logstash-core', '>= 8.7.0' + s.add_runtime_dependency 'logstash-core', '~> 8.7', '>= 8.7.0' s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-json_lines' s.add_runtime_dependency 'logstash-codec-line' diff --git a/random-data.sh b/random-data.sh new file mode 100755 index 00000000..3ea64222 --- /dev/null +++ b/random-data.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +while true +do + random_ip=$(dd if=/dev/urandom bs=4 count=1 2>/dev/null | od -An -tu1 | sed -e 's/^ *//' -e 's/ */./g') + random_size=$(( (RANDOM % 65535) + 1 )) + current_date_time=$(date '+%d/%b/%Y:%H:%M:%S %z') + echo "$random_ip - - [$current_date_time] \"GET /data.php HTTP/1.1\" 200 $random_size" | tee -a '/tmp/curllogs.txt' + sleep 0.0000001s +done \ No newline at end of file diff --git a/spec/outputs/kusto/filePersistence_spec.rb b/spec/outputs/kusto/filePersistence_spec.rb new file mode 100644 index 00000000..9ac9ac8a --- /dev/null +++ b/spec/outputs/kusto/filePersistence_spec.rb @@ -0,0 +1,50 @@ +# encoding: utf-8 +require_relative "../../spec_helpers.rb" +require 'logstash/outputs/kusto/filePersistence' +require 'fileutils' + +describe LogStash::Outputs::KustoOutputInternal::FilePersistence do + let(:tmp_dir) { File.expand_path("../../../../tmp/test_buffer_storage", __FILE__) } + let(:batch) { [{ "foo" => "bar" }, { "baz" => "qux" }] } + let(:logger) { double("Logger", info: nil) } + let(:file_persistence) { described_class.new(tmp_dir, logger) } + + before(:each) do + FileUtils.rm_rf(tmp_dir) + FileUtils.mkdir_p(tmp_dir) + end + + after(:each) do + FileUtils.rm_rf(tmp_dir) + end + + it 'persists a batch to a file and loads it back' do + file_persistence.persist_batch(batch) + files = Dir.glob(File.join(tmp_dir, 'failed_batch_*.json')) + expect(files.size).to eq(1) + loaded_batch = JSON.load(File.read(files.first)) + expect(loaded_batch).to eq(batch) + end + + it 'deletes a batch file' do + file_persistence.persist_batch(batch) + file = Dir.glob(File.join(tmp_dir, 'failed_batch_*.json')).first + expect(File.exist?(file)).to be true + file_persistence.delete_batch(file) + expect(File.exist?(file)).to be false + end + + it 'does not fail if directory does not exist' do + FileUtils.rm_rf(tmp_dir) + expect { file_persistence.persist_batch(batch) }.not_to raise_error + files = Dir.glob(File.join(tmp_dir, 'failed_batch_*.json')) + expect(files.size).to eq(1) + end + + it 'returns empty array if directory does not exist' do + file_persistence = described_class.new(tmp_dir, logger) # Re-instantiate! + FileUtils.rm_rf(tmp_dir) + files = Dir.glob(File.join(tmp_dir, 'failed_batch_*.json')) + expect(files).to eq([]) + end +end \ No newline at end of file diff --git a/spec/outputs/kusto/ingestor_spec.rb b/spec/outputs/kusto/ingestor_spec.rb deleted file mode 100755 index 515e879d..00000000 --- a/spec/outputs/kusto/ingestor_spec.rb +++ /dev/null @@ -1,132 +0,0 @@ -# encoding: utf-8 -require_relative "../../spec_helpers.rb" -require 'logstash/outputs/kusto' -require 'logstash/outputs/kusto/ingestor' - -describe LogStash::Outputs::Kusto::Ingestor do - - let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } - let(:app_id) { "myid" } - let(:app_key) { LogStash::Util::Password.new("mykey") } - let(:app_tenant) { "mytenant" } - let(:managed_identity) { "managed_identity" } - let(:database) { "mydatabase" } - let(:cliauth) { false } - let(:table) { "mytable" } - let(:proxy_host) { "localhost" } - let(:proxy_port) { 80 } - let(:proxy_protocol) { "http" } - let(:json_mapping) { "mymapping" } - let(:delete_local) { false } - let(:logger) { spy('logger') } - - describe '#initialize' do - - it 'does not throw an error when initializing' do - # note that this will cause an internal error since connection is being tried. - # however we still want to test that all the java stuff is working as expected - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger) - ingestor.stop - }.not_to raise_error - end - - dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] - - context 'doesnt allow database to have some dynamic part' do - dynamic_name_array.each do |test_database| - it "with database: #{test_database}" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - end - - context 'doesnt allow table to have some dynamic part' do - dynamic_name_array.each do |test_table| - it "with database: #{test_table}" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - end - - context 'doesnt allow mapping to have some dynamic part' do - dynamic_name_array.each do |json_mapping| - it "with database: #{json_mapping}" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - end - - context 'proxy protocol has to be http or https' do - it "with proxy protocol: socks" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - - context 'one of appid or managedid has to be provided' do - it "with empty managed identity and appid" do - expect { - ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "", cliauth, database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - - end - - # describe 'receiving events' do - - # context 'with non-zero flush interval' do - # let(:temporary_output_file) { Stud::Temporary.pathname } - - # let(:event_count) { 100 } - # let(:flush_interval) { 5 } - - # let(:events) do - # event_count.times.map do |idx| - # LogStash::Event.new('subject' => idx) - # end - # end - - # let(:output) { described_class.new(options.merge( {'path' => temporary_output_file, 'flush_interval' => flush_interval, 'delete_temp_files' => false } )) } - - # before(:each) { output.register } - - # after(:each) do - # output.close - # File.exist?(temporary_output_file) && File.unlink(temporary_output_file) - # File.exist?(temporary_output_file + '.kusto') && File.unlink(temporary_output_file + '.kusto') - # end - - # it 'eventually flushes without receiving additional events' do - # output.multi_receive_encoded(events) - - # # events should not all be flushed just yet... - # expect(File.read(temporary_output_file)).to satisfy("have less than #{event_count} lines") do |contents| - # contents && contents.lines.count < event_count - # end - - # # wait for the flusher to run... - # sleep(flush_interval + 1) - - # # events should all be flushed - # expect(File.read(temporary_output_file)).to satisfy("have exactly #{event_count} lines") do |contents| - # contents && contents.lines.count == event_count - # end - # end - # end - - # end -end diff --git a/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb new file mode 100755 index 00000000..c6c8b9c9 --- /dev/null +++ b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb @@ -0,0 +1,111 @@ +# encoding: utf-8 +require_relative "../../spec_helpers.rb" +require 'logstash/outputs/kusto' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' + +describe LogStash::Outputs::KustoInternal::KustoLogstashConfiguration do + + let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } + let(:app_id) { "myid" } + let(:app_key) { LogStash::Util::Password.new("mykey") } + let(:app_tenant) { "mytenant" } + let(:managed_identity) { "managed_identity" } + let(:database) { "mydatabase" } + let(:cliauth) { false } + let(:table) { "mytable" } + let(:proxy_host) { "localhost" } + let(:proxy_port) { 80 } + let(:proxy_protocol) { "http" } + let(:json_mapping) { "mymapping" } + let(:delete_local) { false } + let(:logger) { spy(:logger) } + let(:proxy_aad_only) { false } + let(:max_items) { 100 } + let(:plugin_flush_interval) { 10 } + let(:max_batch_size) { 10 } + let(:process_failed_batches_on_startup) { false } + let(:upload_concurrent_count) { 3 } + let(:upload_queue_size) { 30 } + + let(:kusto_ingest_base) { LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, json_mapping) } + let(:kusto_auth_base) { LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cliauth) } + let(:kusto_proxy_base) { LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, proxy_aad_only) } + let(:kusto_flush_config) { LogStash::Outputs::KustoInternal::KustoFlushConfiguration.new(max_items, plugin_flush_interval, max_batch_size, process_failed_batches_on_startup) } + let(:kusto_upload_config) { LogStash::Outputs::KustoInternal::KustoUploadConfiguration.new(upload_concurrent_count, upload_queue_size) } + let(:file_persistence) { double("FilePersistence") } + + describe '#initialize' do + it 'does not throw an error when initializing' do + expect { + kustoLogstashOutputConfiguration = described_class.new( + kusto_ingest_base, kusto_auth_base, kusto_proxy_base, kusto_flush_config, kusto_upload_config, logger, file_persistence + ) + kustoLogstashOutputConfiguration.validate_config() + }.not_to raise_error + end + + it 'exposes all configuration accessors' do + config = described_class.new( + kusto_ingest_base, kusto_auth_base, kusto_proxy_base, kusto_flush_config, kusto_upload_config, logger, file_persistence + ) + expect(config.kusto_ingest).to eq(kusto_ingest_base) + expect(config.kusto_auth).to eq(kusto_auth_base) + expect(config.kusto_proxy).to eq(kusto_proxy_base) + expect(config.kusto_flush_config).to eq(kusto_flush_config) + expect(config.kusto_upload_config).to eq(kusto_upload_config) + expect(config.file_persistence).to eq(file_persistence) + end + end + + + describe LogStash::Outputs::KustoInternal::KustoAuthConfiguration do + it 'returns correct values for auth config' do + auth = described_class.new(app_id, app_key, app_tenant, managed_identity, cliauth) + expect(auth.app_id).to eq(app_id) + expect(auth.app_key).to eq(app_key) + expect(auth.app_tenant).to eq(app_tenant) + expect(auth.managed_identity_id).to eq(managed_identity) + expect(auth.cli_auth).to eq(cliauth) + end + end + + describe LogStash::Outputs::KustoInternal::KustoProxyConfiguration do + it 'returns correct values for proxy config' do + proxy = described_class.new(proxy_host, proxy_port, proxy_protocol, proxy_aad_only) + expect(proxy.proxy_host).to eq(proxy_host) + expect(proxy.proxy_port).to eq(proxy_port) + expect(proxy.proxy_protocol).to eq(proxy_protocol) + expect(proxy.proxy_aad_only).to eq(proxy_aad_only) + expect(proxy.is_direct_conn).to eq(false) + end + end + + describe LogStash::Outputs::KustoInternal::KustoIngestConfiguration do + it 'returns correct values for ingest config' do + ingest = described_class.new(ingest_url, database, table, json_mapping) + expect(ingest.ingest_url).to eq(ingest_url) + expect(ingest.database).to eq(database) + expect(ingest.table).to eq(table) + expect(ingest.json_mapping).to eq(json_mapping) + expect(ingest.is_mapping_ref_provided).to eq(true) + end + end + + describe LogStash::Outputs::KustoInternal::KustoFlushConfiguration do + it 'returns correct values for flush config' do + flush = described_class.new(max_items, plugin_flush_interval, max_batch_size, process_failed_batches_on_startup) + expect(flush.max_items).to eq(max_items) + expect(flush.plugin_flush_interval).to eq(plugin_flush_interval) + expect(flush.max_batch_size).to eq(max_batch_size) + expect(flush.process_failed_batches_on_startup).to eq(process_failed_batches_on_startup) + end + end + + describe LogStash::Outputs::KustoInternal::KustoUploadConfiguration do + it 'returns correct values for upload config' do + upload = described_class.new(upload_concurrent_count, upload_queue_size) + expect(upload.upload_concurrent_count).to eq(upload_concurrent_count) + expect(upload.upload_queue_size).to eq(upload_queue_size) + end + end +end \ No newline at end of file diff --git a/spec/outputs/kusto_spec.rb b/spec/outputs/kusto_spec.rb deleted file mode 100755 index 366c406a..00000000 --- a/spec/outputs/kusto_spec.rb +++ /dev/null @@ -1,56 +0,0 @@ -# encoding: utf-8 -require 'logstash/outputs/kusto' -require 'logstash/codecs/plain' -require 'logstash/event' - -describe LogStash::Outputs::Kusto do - - let(:options) { { "path" => "./kusto_tst/%{+YYYY-MM-dd-HH-mm}", - "ingest_url" => "https://ingest-sdkse2etest.eastus.kusto.windows.net/", - "app_id" => "myid", - "app_key" => "mykey", - "app_tenant" => "mytenant", - "database" => "mydatabase", - "table" => "mytable", - "json_mapping" => "mymapping", - "proxy_host" => "localhost", - "proxy_port" => 3128, - "proxy_protocol" => "https" - } } - - describe '#register' do - - it 'doesnt allow the path to start with a dynamic string' do - kusto = described_class.new(options.merge( {'path' => '/%{name}'} )) - expect { kusto.register }.to raise_error(LogStash::ConfigurationError) - kusto.close - end - - it 'path must include a dynamic string to allow file rotation' do - kusto = described_class.new(options.merge( {'path' => '/{name}'} )) - expect { kusto.register }.to raise_error(LogStash::ConfigurationError) - kusto.close - end - - - dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] - - context 'doesnt allow the root directory to have some dynamic part' do - dynamic_name_array.each do |test_path| - it "with path: #{test_path}" do - kusto = described_class.new(options.merge( {'path' => test_path} )) - expect { kusto.register }.to raise_error(LogStash::ConfigurationError) - kusto.close - end - end - end - - it 'allow to have dynamic part after the file root' do - kusto = described_class.new(options.merge({'path' => '/tmp/%{name}'})) - expect { kusto.register }.not_to raise_error - kusto.close - end - - end - -end diff --git a/version b/version index ed35d092..4a36342f 100644 --- a/version +++ b/version @@ -1 +1 @@ -2.0.9 \ No newline at end of file +3.0.0