Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d19d27e
Add RumInjectorHealthMetrics
sarahchen6 Jul 28, 2025
e01d1a9
Add telemetry collector and methods to RumInjector
sarahchen6 Jul 28, 2025
8c71ef6
Initialize health metrics and telemetry collector
sarahchen6 Jul 28, 2025
6c01e10
Get injectionsucceed count
sarahchen6 Jul 28, 2025
ceb3e11
Add comments
sarahchen6 Jul 28, 2025
92f0c54
Reorganize classes
sarahchen6 Jul 31, 2025
c5c860f
Connect rum injector, telemetry collector, and statsdclient
sarahchen6 Jul 31, 2025
3d4ac53
Add tests
sarahchen6 Aug 1, 2025
3fd498d
Get and test metrics for injection failures and skips
sarahchen6 Aug 1, 2025
14c338b
Add Content Security Policy and HTTP response size telemetry
sarahchen6 Aug 1, 2025
c5b5389
Add injection duration telemetry
sarahchen6 Aug 1, 2025
feb40be
Fix some things
sarahchen6 Aug 2, 2025
23e0455
Fix content-length retrieval and add test for injection timing
sarahchen6 Aug 6, 2025
07db174
Add injection initialization success telemetry
sarahchen6 Aug 7, 2025
a5352ff
Fix CoreTracer compilation with InstrumenterConfig
sarahchen6 Aug 8, 2025
2a1f676
Add tags to all metrics
sarahchen6 Aug 8, 2025
b32874c
Update InjectingPipeOutputStreamTest
sarahchen6 Aug 8, 2025
7a5cd68
Tweaks
sarahchen6 Aug 8, 2025
412a3f8
Address jacoco coverage and injectingpipeoutstream interface updates
sarahchen6 Aug 9, 2025
b3fcde4
Add content-length detection for InjectingPipeWriter and improve tests
sarahchen6 Aug 11, 2025
4b2f6b3
Address review comments
sarahchen6 Aug 11, 2025
3e462d4
Fix header retrieval
sarahchen6 Aug 12, 2025
b1ab871
Add lots of improvements from review comments
sarahchen6 Aug 12, 2025
93bd0a4
Fix constructors and address review comment
sarahchen6 Aug 13, 2025
5a24d7d
Merge branch 'master' into sarahchen6/implement-rum-injector-telemetry
sarahchen6 Aug 13, 2025
b20d6c4
Clarify bytes written and address review comment
sarahchen6 Aug 13, 2025
f7607ba
Use dynamic servlet version retrieval
sarahchen6 Aug 18, 2025
e6afc52
Change injection timing logic
sarahchen6 Aug 18, 2025
6f2dc97
Clean up
sarahchen6 Aug 19, 2025
32372c1
Use dynamic servlet version retrieval for tagging as well
sarahchen6 Aug 19, 2025
821dc87
Address merge conflicts
sarahchen6 Aug 19, 2025
150ab05
Add a telemetry check to HttpServerTest
sarahchen6 Aug 19, 2025
e9095b5
Clean up
sarahchen6 Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class InjectingPipeOutputStreamBenchmark {
public void withPipe() throws Exception {
try (final PrintWriter out =
new PrintWriter(
new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content, null))) {
new InjectingPipeOutputStream(
new ByteArrayOutputStream(), marker, content, null, null))) {
htmlContent.forEach(out::println);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.LongConsumer;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand All @@ -23,18 +24,22 @@ public class InjectingPipeOutputStream extends OutputStream {
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final OutputStream downstream;
private final LongConsumer onBytesWritten;
private long bytesWritten = 0;

/**
* @param downstream the delegate output stream
* @param marker the marker to find in the stream. Must at least be one byte.
* @param contentToInject the content to inject once before the marker if found.
* @param onContentInjected callback called when and if the content is injected.
* @param onBytesWritten callback called when stream is closed to report total bytes written.
*/
public InjectingPipeOutputStream(
final OutputStream downstream,
final byte[] marker,
final byte[] contentToInject,
final Runnable onContentInjected) {
final Runnable onContentInjected,
final LongConsumer onBytesWritten) {
this.downstream = downstream;
this.marker = marker;
this.lookbehind = new byte[marker.length];
Expand All @@ -46,11 +51,13 @@ public InjectingPipeOutputStream(
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.onBytesWritten = onBytesWritten;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

@Override
public void write(int b) throws IOException {
bytesWritten++;
if (!filter) {
if (wasDraining) {
// continue draining
Expand Down Expand Up @@ -85,6 +92,7 @@ public void write(int b) throws IOException {

@Override
public void write(byte[] array, int off, int len) throws IOException {
bytesWritten += len;
if (!filter) {
if (wasDraining) {
// needs drain
Expand Down Expand Up @@ -113,6 +121,7 @@ public void write(byte[] array, int off, int len) throws IOException {
// we don't have a full match. write everything in a bulk except the lookbehind buffer
// sequentially
for (int i = off; i < off + marker.length - 1; i++) {
bytesWritten--; // avoid double counting
write(array[i]);
}
drain();
Expand All @@ -123,12 +132,14 @@ public void write(byte[] array, int off, int len) throws IOException {
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
filter = wasFiltering;
for (int i = len - marker.length + 1; i < len; i++) {
bytesWritten--; // avoid double counting
write(array[i]);
}
}
} else {
// use slow path because the length to write is small and within the lookbehind buffer size
for (int i = off; i < off + len; i++) {
bytesWritten--; // avoid double counting
write(array[i]);
}
}
Expand Down Expand Up @@ -185,6 +196,11 @@ public void flush() throws IOException {
public void close() throws IOException {
try {
commit();
// report the size of the original HTTP response before injecting via callback
if (onBytesWritten != null) {
onBytesWritten.accept(bytesWritten);
}
bytesWritten = 0;
} finally {
downstream.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
def 'should filter a buffer and inject if found #found'() {
setup:
def downstream = new ByteArrayOutputStream()
def piped = new OutputStreamWriter(new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null),
def piped = new OutputStreamWriter(new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null, null),
"UTF-8")
when:
try (def closeme = piped) {
Expand All @@ -55,7 +55,7 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
setup:
def baos = new ByteArrayOutputStream()
def downstream = new GlitchedOutputStream(baos, glichesAt)
def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null)
def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null, null)
when:
try {
for (String line : body) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class RumHttpServletResponseWrapper extends HttpServletResponseWrapper {
private PrintWriter printWriter;
private InjectingPipeWriter wrappedPipeWriter;
private boolean shouldInject = true;
private long injectionStartTime = -1;
private String contentEncoding = "none";

private static final MethodHandle SET_CONTENT_LENGTH_LONG = getMh("setContentLengthLong");

Expand Down Expand Up @@ -45,18 +47,30 @@ public ServletOutputStream getOutputStream() throws IOException {
return outputStream;
}
if (!shouldInject) {
RumInjector.getTelemetryCollector().onInjectionSkipped("3");
return super.getOutputStream();
}
String encoding = getCharacterEncoding();
if (encoding == null) {
encoding = Charset.defaultCharset().name();
// start timing injection
if (injectionStartTime == -1) {
injectionStartTime = System.nanoTime();
}
try {
String encoding = getCharacterEncoding();
if (encoding == null) {
encoding = Charset.defaultCharset().name();
}
outputStream =
new WrappedServletOutputStream(
super.getOutputStream(),
rumInjector.getMarkerBytes(encoding),
rumInjector.getSnippetBytes(encoding),
this::onInjected,
bytes -> RumInjector.getTelemetryCollector().onInjectionResponseSize("3", bytes));
} catch (Exception e) {
injectionStartTime = -1;
RumInjector.getTelemetryCollector().onInjectionFailed("3", contentEncoding);
throw e;
}
outputStream =
new WrappedServletOutputStream(
super.getOutputStream(),
rumInjector.getMarkerBytes(encoding),
rumInjector.getSnippetBytes(encoding),
this::onInjected);

return outputStream;
}
Expand All @@ -67,19 +81,56 @@ public PrintWriter getWriter() throws IOException {
return printWriter;
}
if (!shouldInject) {
RumInjector.getTelemetryCollector().onInjectionSkipped("3");
return super.getWriter();
}
wrappedPipeWriter =
new InjectingPipeWriter(
super.getWriter(),
rumInjector.getMarkerChars(),
rumInjector.getSnippetChars(),
this::onInjected);
printWriter = new PrintWriter(wrappedPipeWriter);
// start timing injection
if (injectionStartTime == -1) {
injectionStartTime = System.nanoTime();
}
try {
wrappedPipeWriter =
new InjectingPipeWriter(
super.getWriter(),
rumInjector.getMarkerChars(),
rumInjector.getSnippetChars(),
this::onInjected);
printWriter = new PrintWriter(wrappedPipeWriter);
} catch (Exception e) {
injectionStartTime = -1;
RumInjector.getTelemetryCollector().onInjectionFailed("3", contentEncoding);
throw e;
}

return printWriter;
}

@Override
public void setHeader(String name, String value) {
if (name != null) {
String lowerName = name.toLowerCase();
if (lowerName.startsWith("content-security-policy")) {
RumInjector.getTelemetryCollector().onContentSecurityPolicyDetected("3");
} else if (lowerName.contains("content-encoding")) {
this.contentEncoding = value;
}
}
super.setHeader(name, value);
}

@Override
public void addHeader(String name, String value) {
if (name != null) {
String lowerName = name.toLowerCase();
if (lowerName.startsWith("content-security-policy")) {
RumInjector.getTelemetryCollector().onContentSecurityPolicyDetected("3");
} else if (lowerName.contains("content-encoding")) {
this.contentEncoding = value;
}
}
super.addHeader(name, value);
}

@Override
public void setContentLength(int len) {
// don't set it since we don't know if we will inject
Expand All @@ -105,6 +156,7 @@ public void reset() {
this.wrappedPipeWriter = null;
this.printWriter = null;
this.shouldInject = false;
this.injectionStartTime = -1;
super.reset();
}

Expand All @@ -117,6 +169,16 @@ public void resetBuffer() {
}

public void onInjected() {
RumInjector.getTelemetryCollector().onInjectionSucceed("3");

// calculate total injection time
if (injectionStartTime != -1) {
long nanoseconds = System.nanoTime() - injectionStartTime;
long milliseconds = nanoseconds / 1_000_000L;
RumInjector.getTelemetryCollector().onInjectionTime("3", milliseconds);
injectionStartTime = -1;
}

try {
setHeader("x-datadog-rum-injected", "1");
} catch (Throwable ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datadog.trace.util.MethodHandles;
import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.util.function.LongConsumer;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;

Expand All @@ -29,8 +30,14 @@ private static <E extends Throwable> void sneakyThrow(Throwable e) throws E {
}

public WrappedServletOutputStream(
ServletOutputStream delegate, byte[] marker, byte[] contentToInject, Runnable onInjected) {
this.filtered = new InjectingPipeOutputStream(delegate, marker, contentToInject, onInjected);
ServletOutputStream delegate,
byte[] marker,
byte[] contentToInject,
Runnable onInjected,
LongConsumer onBytesWritten) {
this.filtered =
new InjectingPipeOutputStream(
delegate, marker, contentToInject, onInjected, onBytesWritten);
this.delegate = delegate;
}

Expand Down
Loading