Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
675b9c6
added custom_size_based_buffer class
monishkadas-ms Sep 12, 2024
c82a6a8
added custom_size_based_buffer class
monishkadas-ms Sep 12, 2024
4b79a5d
added custom_size_based_buffer class
monishkadas-ms Sep 12, 2024
badeeb1
added custom_size_based_buffer class
monishkadas-ms Sep 12, 2024
654da30
Added warning for deprecated path config var
MonishkaDas Sep 24, 2024
9c352c3
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
1adc29a
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
86e10e4
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
34e682c
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
7cf7099
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
0200f21
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
8d49fc4
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
5cb6af3
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
5a859fa
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
5cab147
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
ab495f5
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
76904bd
Updated max_size config
MonishkaDas Sep 27, 2024
86dc8ae
Updated max_size config
MonishkaDas Sep 27, 2024
47f8b1a
Added tests in kusto_spec.rb
MonishkaDas Oct 2, 2024
bbf9ed4
Added tests in kusto_spec.rb
MonishkaDas Oct 2, 2024
028eec9
Updated custom_size_based_buffer.rb
MonishkaDas Oct 15, 2024
fed3fc4
Updated custom_size_based_buffer.rb
MonishkaDas Oct 15, 2024
7ca57b9
Updated custom_size_based_buffer.rb
MonishkaDas Oct 16, 2024
1f9591f
Adds temp file buffer used during network downtime
MonishkaDas Oct 17, 2024
b033dee
Updated custom_size_based_buffer.rb
MonishkaDas Oct 17, 2024
71998af
Updated custom_size_based_buffer.rb
MonishkaDas Oct 17, 2024
9d574e3
Added temporary file buffer for persistant storage
MonishkaDas Oct 24, 2024
398a301
* Refactor to classes
ag-ramachandran Oct 16, 2024
f560b09
Added max_retries and failed_items_path() configs
MonishkaDas Oct 27, 2024
2adc63c
Updated README.md and configs
MonishkaDas Nov 5, 2024
42a0075
*Make some fixes for retries and upload
ag-ramachandran Nov 11, 2024
f252095
Added Latch Timeout and updated java sdk version
MonishkaDas Dec 12, 2024
357439f
Add completableFuture with timeout
MonishkaDas Dec 26, 2024
20dc04e
Updated README.md and removed interval.rb
MonishkaDas Jan 2, 2025
7292e27
Removed network_available method
MonishkaDas Jan 2, 2025
61e47d1
* Initial commit
ag-ramachandran Mar 20, 2025
54e8efb
WIP Commit
ag-ramachandran May 23, 2025
926e037
* Some more changes
ag-ramachandran Jun 3, 2025
1524461
*Update gitignore
ag-ramachandran Jun 3, 2025
55f6631
Persist on flush failure and add configs for retries
monishkadas-ms Jun 12, 2025
b32e6ac
Removed process_failed_batches_on_shutdown config and Updated Spec an…
monishkadas-ms Jun 16, 2025
0b8550e
Upgrade gradle version
monishkadas-ms Jun 16, 2025
f2024ee
Update e2e test
monishkadas-ms Jun 16, 2025
f29b5d8
*Minor edits
ag-ramachandran Jun 17, 2025
24f0f6f
Updated file persistance logic
monishkadas-ms Jun 18, 2025
19ed184
Updated spec tests
monishkadas-ms Jun 18, 2025
1fbeffd
Updated filePersistence class and added error handling. Updated spec …
monishkadas-ms Jun 19, 2025
5f7098a
Updated process_failed_files and shutdown fn
monishkadas-ms Jul 1, 2025
168b5b7
Removed loadbatches fn
monishkadas-ms Jul 1, 2025
f2a7599
Updated spec tests
monishkadas-ms Jul 1, 2025
6418f75
Updated process_failed_files
monishkadas-ms Jul 1, 2025
be288da
Updated Error Handling in buffer flush
monishkadas-ms Jul 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ gradle/wrapper/gradle-wrapper.properties
.vscode/settings.json
rspec.xml
e2e/output_file.txt
e2e/input_file.txt
logs.txt
docker-e2e/.env
local-run.sh
logs2.txt
**/.vscode/*.*
**/settings.json
Run.sh
vendor
65 changes: 34 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@ Perform configuration before sending events from Logstash to Azure Data Explorer

```ruby
output {
kusto {
path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "https://ingest-<cluster-name>.kusto.windows.net/"
app_id => "<application id>"
app_key => "<application key/secret>"
app_tenant => "<tenant id>"
database => "<database name>"
table => "<target table>"
json_mapping => "<mapping name>"
proxy_host => "<proxy host>"
proxy_port => <proxy port>
proxy_protocol => <"http"|"https">
}
kusto {
ingest_url => "https://ingest-<cluster-name>.kusto.windows.net/"
app_id => "<application id>"
app_key => "<application key/secret>"
app_tenant => "<tenant id>"
database => "<database name>"
table => "<target table>"
json_mapping => "<mapping name>"
proxy_host => "<proxy host>"
proxy_port => <proxy port>
proxy_protocol => <"http"|"https">
max_size => 10
max_interval => 10
latch_timeout => 60
}
}
```
More information about configuring Logstash can be found in the [logstash configuration guide](https://www.elastic.co/guide/en/logstash/current/configuration.html)
Expand All @@ -56,22 +58,22 @@ More information about configuring Logstash can be found in the [logstash config

| Parameter Name | Description | Notes |
| --- | --- | --- |
| **path** | The plugin writes events to temporary files before sending them to ADX. This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the ADX service. The example above shows how to rotate the files every minute and check the Logstash docs for more information on time expressions. | Required
| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal.| Required|
| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional|
| **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional|
| **database**| Database name to place events | Required |
| **table** | Target table name to place events | Required
| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal. | Required |
| **app_id, app_key, app_tenant** | Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional |
| **managed_identity** | Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional |
| **database** | Database name to place events | Required |
| **table** | Target table name to place events | Required |
| **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional |
| **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | |
| **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| |
| **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| |
| **proxy_host** | The proxy hostname for redirecting traffic to Kusto.| |
| **proxy_port** | The proxy port for the proxy. Defaults to 80.| |
| **proxy_protocol** | The proxy server protocol , is one of http or https.| |
| **proxy_host** | The proxy hostname for redirecting traffic to Kusto. | Optional |
| **proxy_port** | The proxy port for the proxy. Defaults to 80. | Optional |
| **proxy_protocol** | The proxy server protocol, is one of http or https. | Optional |
| **max_size** | Maximum size of the buffer before it gets flushed, defaults to 10MB. | Optional |
| **latch_timeout** | Latch timeout in seconds, defaults to 60. This is the maximum wait time after which the flushing attempt is timed out and the network is considered to be down. The system waits for the network to be back to retry flushing the same batch. | Optional |

> Note : LS_JAVA_OPTS can be used to set proxy parameters as well (using export or SET options)

> Note: **path** config parameter is no longer used in the latest release (3.0.0) and will be deprecated in future releases

```bash
export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.proxyHost=1.2.3.4 -Dhttps.proxyPort=8989"
```
Expand All @@ -81,12 +83,13 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox

| Version | Release Date | Notes |
| --- | --- | --- |
| 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library |
| 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK |
| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution |
| 2.0.2 | 2023-11-28 | - Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped |
| 2.0.0 | 2023-09-19 | - Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries |
| 1.0.6 | 2022-11-29 | - Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.|
| 3.0.0 | 2024-11-01 | Updated configuration options |
| 2.0.8 | 2024-10-23 | Fix library deprecations, fix issues in the Azure Identity library |
| 2.0.7 | 2024-01-01 | Update Kusto JAVA SDK |
| 2.0.3 | 2023-12-12 | Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution |
| 2.0.2 | 2023-11-28 | Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped |
| 2.0.0 | 2023-09-19 | Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries |
| 1.0.6 | 2022-11-29 | Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.|


## Development Requirements
Expand Down
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ repositories {
// update dependencies to bom azure-sdk-bom/1.2.24

dependencies {
implementation 'com.microsoft.azure.kusto:kusto-data:5.2.0'
implementation 'com.microsoft.azure.kusto:kusto-ingest:5.2.0'
implementation 'com.microsoft.azure.kusto:kusto-data:6.0.0'
implementation 'com.microsoft.azure.kusto:kusto-ingest:6.0.0'
implementation 'com.azure:azure-core-http-netty:1.15.1'
implementation 'com.azure:azure-core:1.49.1'
implementation 'com.azure:azure-data-tables:12.4.2'
Expand All @@ -52,7 +52,7 @@ dependencies {
implementation 'com.nimbusds:nimbus-jose-jwt:9.40'
implementation 'com.nimbusds:oauth2-oidc-sdk:11.13'
implementation 'com.univocity:univocity-parsers:2.9.1'
implementation 'commons-codec:commons-codec:1.16.1'
implementation 'commons-codec:commons-codec:1.17.1'
implementation 'commons-logging:commons-logging:1.3.1'
implementation 'io.github.resilience4j:resilience4j-core:1.7.1'
implementation 'io.github.resilience4j:resilience4j-retry:1.7.1'
Expand Down
11 changes: 11 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export LOGSTASH_SOURCE=1
export LOGSTASH_PATH=/softwares/logstash
export JRUBY_HOME=$LOGSTASH_PATH/vendor/jruby
export JAVA_HOME=$LOGSTASH_PATH/jdk
export PATH=$PATH:/softwares/logstash/vendor/jruby/bin:/softwares/logstash/bin
jruby -S gem install bundler -v 2.4.19
jruby -S bundle install
gem build *.gemspec
rm Gemfile.lock
/softwares/logstash/bin/logstash-plugin uninstall logstash-output-kusto
/softwares/logstash/bin/logstash-plugin install logstash-output-kusto-3.0.0-java.gem
18 changes: 8 additions & 10 deletions e2e/e2e.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ class E2E

def initialize
super
@input_file = "/tmp/input_file.txt"
@output_file = "output_file.txt"
@input_file = File.expand_path("input_file.txt", __dir__)
@output_file = File.expand_path("output_file.txt", __dir__)
@columns = "(rownumber:int, rowguid:string, xdouble:real, xfloat:real, xbool:bool, xint16:int, xint32:int, xint64:long, xuint8:long, xuint16:long, xuint32:long, xuint64:long, xdate:datetime, xsmalltext:string, xtext:string, xnumberAsText:string, xtime:timespan, xtextWithNulls:string, xdynamicWithNulls:dynamic)"
@csv_columns = '"rownumber", "rowguid", "xdouble", "xfloat", "xbool", "xint16", "xint32", "xint64", "xuint8", "xuint16", "xuint32", "xuint64", "xdate", "xsmalltext", "xtext", "xnumberAsText", "xtime", "xtextWithNulls", "xdynamicWithNulls"'
@column_count = 19
Expand All @@ -35,15 +35,13 @@ def initialize
file { path => "#{@output_file}"}
stdout { codec => rubydebug }
kusto {
path => "tmp%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "#{@ingest_url}"
cli_auth => true
database => "#{@database}"
table => "#{@table_with_mapping}"
json_mapping => "#{@mapping_name}"
}
kusto {
path => "nomaptmp%{+YYYY-MM-dd-HH-mm}.txt"
cli_auth => true
ingest_url => "#{@ingest_url}"
database => "#{@database}"
Expand All @@ -56,20 +54,20 @@ def initialize
def create_table_and_mapping
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
puts "Creating table #{tableop}"
@query_client.execute(@database, ".drop table #{tableop} ifexists")
@query_client.executeMgmt(@database, ".drop table #{tableop} ifexists")
sleep(1)
@query_client.execute(@database, ".create table #{tableop} #{@columns}")
@query_client.execute(@database, ".alter table #{tableop} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'")
@query_client.executeMgmt(@database, ".create table #{tableop} #{@columns}")
@query_client.executeMgmt(@database, ".alter table #{tableop} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'")
}
# Mapping only for one table
@query_client.execute(@database, ".create table #{@table_with_mapping} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'")
@query_client.executeMgmt(@database, ".create table #{@table_with_mapping} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'")
end


def drop_and_cleanup
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
puts "Dropping table #{tableop}"
@query_client.execute(@database, ".drop table #{tableop} ifexists")
@query_client.executeMgmt(@database, ".drop table #{tableop} ifexists")
sleep(1)
}
end
Expand Down Expand Up @@ -99,7 +97,7 @@ def assert_data
(0...max_timeout).each do |_|
begin
sleep(5)
query = @query_client.execute(@database, "#{tableop} | sort by rownumber asc")
query = @query_client.executeQuery(@database, "#{tableop} | sort by rownumber asc")
result = query.getPrimaryResults()
raise "Wrong count - expected #{csv_data.length}, got #{result.count()} in table #{tableop}" unless result.count() == csv_data.length
rescue Exception => e
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading