Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public boolean willExceedMaxRequestSizeBytes(final SinkBufferEntry sinkBufferEnt

@Override
public SinkFlushableBuffer getFlushableBuffer(final SinkFlushContext sinkFlushContext) {
numEvents = 0;
currentRequestSize = 0L;
lastFlushedTimeMs = Instant.now().toEpochMilli();
return sinkBufferWriter.getBuffer(sinkFlushContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
public class DefaultSinkFlushResult implements SinkFlushResult {
private final List<Event> events;
private final Throwable exception;
private final int statusCode;

public DefaultSinkFlushResult(final List<Event> events, final Throwable exception) {
public DefaultSinkFlushResult(final List<Event> events, final int statusCode, final Throwable exception) {
this.events = events;
this.exception = exception;
this.statusCode = statusCode;
}

public List<Event> getEvents() {
Expand All @@ -25,5 +27,9 @@ public List<Event> getEvents() {
public Throwable getException() {
return exception;
}

public int getStatusCode() {
return statusCode;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,26 @@ public DefaultSinkOutputStrategy(final LockStrategy lockStrategy, final SinkBuff

public void flushBuffer() {
long startTime = System.nanoTime();
// getBuffer() should return the buffer contents
// getFlushableBuffer() should return the buffer contents
SinkFlushableBuffer flushableBuffer = sinkBuffer.getFlushableBuffer(sinkFlushContext);
List<Event> events = flushableBuffer.getEvents();
try {
SinkFlushResult flushResult = flushableBuffer.flush();
if (flushResult == null) { // success
sinkMetrics.recordRequestLatency((double)(System.nanoTime() - startTime));
List<Event> events = flushableBuffer.getEvents();
for (final Event event: events) {
event.getEventHandle().release(true);
}
} else {
// flush Result should contain the events that are
// failed to be delivered, so that these events can be forwarded to DLQ
addFailedEventsToDlq(flushResult.getEvents(), flushResult.getException());
addFailedEventsToDlq(flushResult.getEvents(), flushResult.getException(), flushResult.getStatusCode());
}
} catch (Exception e) {
// Add list of events to DLQ
addFailedEventsToDlq(flushableBuffer.getEvents(), e);
sinkMetrics.incrementRequestsFailedCounter(1);
sinkMetrics.incrementEventsFailedCounter(events.size());
addFailedEventsToDlq(events, e, 0);
}
}

Expand Down Expand Up @@ -91,7 +93,7 @@ public void execute(Collection<Record<Event>> records) {
}
} catch (Exception ex) {
LOG.warn(NOISY, "Failed process the event ", ex);
addFailedEventsToDlq(List.of(event), ex);
addFailedEventsToDlq(List.of(event), ex, 0);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@

public interface SinkDlqHandler {
void flushDlqList();
void addFailedEventsToDlq(final List<Event> events, final Throwable ex);
void addFailedEventsToDlq(final List<Event> events, final Throwable ex, final int statusCode);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
public interface SinkFlushResult {
List<Event> getEvents();
Throwable getException();
int getStatusCode();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.mockito.Mock;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;

import java.util.List;
import java.util.Random;

public class DefaultSinkFlushResultTest {
private List<Event> events;
Expand All @@ -22,16 +24,21 @@ public class DefaultSinkFlushResultTest {

private DefaultSinkFlushResult defaultSinkFlushResult;

private int statusCode;

private DefaultSinkFlushResult createObjectUnderTest() {
return new DefaultSinkFlushResult(events, exception);
return new DefaultSinkFlushResult(events, statusCode, exception);
}

@Test
public void test_basic() {
exception = mock(Throwable.class);
events = List.of();
Random random = new Random();
statusCode = random.nextInt(500);
defaultSinkFlushResult = createObjectUnderTest();
assertThat(defaultSinkFlushResult.getEvents(), sameInstance(events));
assertThat(defaultSinkFlushResult.getException(), sameInstance(exception));
assertThat(defaultSinkFlushResult.getStatusCode(), equalTo(statusCode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void flushDlqList() {
dlqEvents.clear();
}

public void addFailedEventsToDlq(final List<Event> events, final Throwable ex) {
public void addFailedEventsToDlq(final List<Event> events, final Throwable ex, final int statusCode) {
dlqEvents.addAll(events);
}

Expand Down
41 changes: 37 additions & 4 deletions data-prepper-plugins/prometheus-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
id 'java-library'
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
}
}

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-plugins:common')
implementation 'com.arpnetworking.metrics:prometheus-remote-protocol:1.0.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:otel-proto-common')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand All @@ -13,16 +34,27 @@ dependencies {
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'software.amazon.awssdk:auth'
implementation 'io.prometheus:client:0.0.10'
implementation libs.commons.lang3
implementation project(':data-prepper-plugins:failures-common')
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2'
implementation 'org.apache.httpcomponents.core5:httpcore5:5.3.3'
implementation 'io.github.acm19:aws-request-signing-apache-interceptor:3.0.0'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.4.2'
implementation 'org.xerial.snappy:snappy-java:1.1.10.1'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:acm'
implementation 'com.github.scribejava:scribejava-core:8.3.3'
implementation project(':data-prepper-plugin-framework')
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test:test-common')
testImplementation project(':data-prepper-core')
testImplementation project(':data-prepper-plugin-framework')
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'

// Armeria
implementation libs.armeria.core
}

test {
Expand Down Expand Up @@ -52,9 +84,10 @@ task integrationTest(type: Test) {
useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.prometheus.sink.http.endpoint', System.getProperty('tests.prometheus.sink.http.endpoint')

systemProperty 'tests.prometheus.url', System.getProperty('tests.prometheus.url')
systemProperty 'tests.aws.region', System.getProperty('tests.aws.region')
systemProperty 'tests.aws.role', System.getProperty('tests.aws.role')
filter {
includeTestsMatching '*IT'
}
}
}
Loading
Loading