diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c20ff446be..a24009fc78c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `go.opentelemetry.io/contrib/detectors/autodetect` package is added to automatically compose user defined `resource.Detector`s at runtime. (#7522) - Add the `WithLoggerProviderOptions`, `WithMeterProviderOptions` and `WithTracerProviderOptions` options to `NewSDK` to allow passing custom options to providers in `go.opentelemetry.io/contrib/otelconf`. (#7552) - Added V2 version of AWS EC2 detector `go.opentelemetry.io/contrib/detectors/aws/ec2/v2` due to deprecation of `github.com/aws/aws-sdk-go`. (#6961) +- Handle the `WithMessageEvents` option for `Transport` in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#7513) diff --git a/instrumentation/net/http/otelhttp/config.go b/instrumentation/net/http/otelhttp/config.go index 6bd50d4c9b4..3e2d4ceea9d 100644 --- a/instrumentation/net/http/otelhttp/config.go +++ b/instrumentation/net/http/otelhttp/config.go @@ -152,15 +152,16 @@ const ( WriteEvents ) -// WithMessageEvents configures the Handler to record the specified events +// WithMessageEvents configures the Handler or Transport to record the specified events // (span.AddEvent) on spans. By default only summary attributes are added at the // end of the request. // // Valid events are: // - ReadEvents: Record the number of bytes read after every http.Request.Body.Read -// using the ReadBytesKey -// - WriteEvents: Record the number of bytes written after every http.ResponeWriter.Write -// using the WriteBytesKey +// using the ReadBytesKey for Handler. For Transport, it will record the number of +// bytes read after every http.Response.Body.Read. +// - WriteEvents: Record the number of bytes written after every http.ResponseWriter.Write +// using the WriteBytesKey for Handler. Ignored for Transport. func WithMessageEvents(events ...event) Option { return optionFunc(func(c *config) { for _, e := range events { diff --git a/instrumentation/net/http/otelhttp/transport.go b/instrumentation/net/http/otelhttp/transport.go index e4c02a4296b..a62c2a07fcd 100644 --- a/instrumentation/net/http/otelhttp/transport.go +++ b/instrumentation/net/http/otelhttp/transport.go @@ -33,6 +33,7 @@ type Transport struct { spanNameFormatter func(string, *http.Request) string clientTrace func(context.Context) *httptrace.ClientTrace metricAttributesFn func(*http.Request) []attribute.KeyValue + readEvent bool semconv semconv.HTTPClient } @@ -74,6 +75,7 @@ func (t *Transport) applyConfig(c *config) { t.clientTrace = c.ClientTrace t.semconv = semconv.NewHTTPClient(c.Meter) t.metricAttributesFn = c.MetricAttributesFn + t.readEvent = c.ReadEvent } func defaultTransportFormatter(_ string, r *http.Request) string { @@ -153,7 +155,7 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { t.semconv.RecordResponseSize(ctx, n, metricOpts) } - res.Body = newWrappedBody(span, readRecordFunc, res.Body) + res.Body = newWrappedBody(span, readRecordFunc, res.Body, t.readEvent) } // Use floating point division here for higher precision (instead of Millisecond method). @@ -198,17 +200,17 @@ func (t *Transport) metricAttributesFromRequest(r *http.Request) []attribute.Key // newWrappedBody returns a new and appropriately scoped *wrappedBody as an // io.ReadCloser. If the passed body implements io.Writer, the returned value // will implement io.ReadWriteCloser. -func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) io.ReadCloser { +func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser, readEvent bool) io.ReadCloser { // The successful protocol switch responses will have a body that // implement an io.ReadWriteCloser. Ensure this interface type continues // to be satisfied if that is the case. if _, ok := body.(io.ReadWriteCloser); ok { - return &wrappedBody{span: span, record: record, body: body} + return &wrappedBody{span: span, record: record, body: body, readEvent: readEvent} } // Remove the implementation of the io.ReadWriteCloser and only implement // the io.ReadCloser. - return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body}} + return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body, readEvent: readEvent}} } // wrappedBody is the response body type returned by the transport @@ -225,6 +227,8 @@ type wrappedBody struct { record func(n int64) body io.ReadCloser read atomic.Int64 + + readEvent bool } var _ io.ReadWriteCloser = &wrappedBody{} @@ -244,6 +248,10 @@ func (wb *wrappedBody) Read(b []byte) (int, error) { // Record the number of bytes read wb.read.Add(int64(n)) + if wb.readEvent { + wb.span.AddEvent("read", trace.WithAttributes(ReadBytesKey.Int64(int64(n)))) + } + switch err { case nil: // nothing to do here but fall through to the return diff --git a/instrumentation/net/http/otelhttp/transport_test.go b/instrumentation/net/http/otelhttp/transport_test.go index fa05936ce76..30efbd00bdf 100644 --- a/instrumentation/net/http/otelhttp/transport_test.go +++ b/instrumentation/net/http/otelhttp/transport_test.go @@ -260,7 +260,7 @@ func TestWrappedBodyRead(t *testing.T) { s := new(span) called := false record := func(numBytes int64) { called = true } - wb := newWrappedBody(s, record, readCloser{}) + wb := newWrappedBody(s, record, readCloser{}, false) n, err := wb.Read([]byte{}) assert.Equal(t, readSize, n, "wrappedBody returned wrong bytes") assert.NoError(t, err) @@ -276,7 +276,7 @@ func TestWrappedBodyReadEOFError(t *testing.T) { called = true numRecorded = numBytes } - wb := newWrappedBody(s, record, readCloser{readErr: io.EOF}) + wb := newWrappedBody(s, record, readCloser{readErr: io.EOF}, false) n, err := wb.Read([]byte{}) assert.Equal(t, readSize, n, "wrappedBody returned wrong bytes") assert.Equal(t, io.EOF, err) @@ -290,7 +290,7 @@ func TestWrappedBodyReadError(t *testing.T) { called := false record := func(int64) { called = true } expectedErr := errors.New("test") - wb := newWrappedBody(s, record, readCloser{readErr: expectedErr}) + wb := newWrappedBody(s, record, readCloser{readErr: expectedErr}, false) n, err := wb.Read([]byte{}) assert.Equal(t, readSize, n, "wrappedBody returned wrong bytes") assert.Equal(t, expectedErr, err) @@ -302,7 +302,7 @@ func TestWrappedBodyClose(t *testing.T) { s := new(span) called := false record := func(int64) { called = true } - wb := newWrappedBody(s, record, readCloser{}) + wb := newWrappedBody(s, record, readCloser{}, false) assert.NoError(t, wb.Close()) s.assert(t, true, nil, codes.Unset, "") assert.True(t, called, "record should have been called") @@ -311,7 +311,7 @@ func TestWrappedBodyClose(t *testing.T) { func TestWrappedBodyClosePanic(t *testing.T) { s := new(span) var body io.ReadCloser - wb := newWrappedBody(s, func(n int64) {}, body) + wb := newWrappedBody(s, func(n int64) {}, body, false) assert.NotPanics(t, func() { wb.Close() }, "nil body should not panic on close") } @@ -320,7 +320,7 @@ func TestWrappedBodyCloseError(t *testing.T) { called := false record := func(int64) { called = true } expectedErr := errors.New("test") - wb := newWrappedBody(s, record, readCloser{closeErr: expectedErr}) + wb := newWrappedBody(s, record, readCloser{closeErr: expectedErr}, false) assert.Equal(t, expectedErr, wb.Close()) s.assert(t, true, nil, codes.Unset, "") assert.True(t, called, "record should have been called") @@ -339,12 +339,12 @@ func (rwc readWriteCloser) Write([]byte) (int, error) { } func TestNewWrappedBodyReadWriteCloserImplementation(t *testing.T) { - wb := newWrappedBody(nil, func(n int64) {}, readWriteCloser{}) + wb := newWrappedBody(nil, func(n int64) {}, readWriteCloser{}, false) assert.Implements(t, (*io.ReadWriteCloser)(nil), wb) } func TestNewWrappedBodyReadCloserImplementation(t *testing.T) { - wb := newWrappedBody(nil, func(n int64) {}, readCloser{}) + wb := newWrappedBody(nil, func(n int64) {}, readCloser{}, false) assert.Implements(t, (*io.ReadCloser)(nil), wb) _, ok := wb.(io.ReadWriteCloser) @@ -355,7 +355,7 @@ func TestWrappedBodyWrite(t *testing.T) { s := new(span) var rwc io.ReadWriteCloser assert.NotPanics(t, func() { - rwc = newWrappedBody(s, func(n int64) {}, readWriteCloser{}).(io.ReadWriteCloser) + rwc = newWrappedBody(s, func(n int64) {}, readWriteCloser{}, false).(io.ReadWriteCloser) }) n, err := rwc.Write([]byte{}) @@ -373,7 +373,7 @@ func TestWrappedBodyWriteError(t *testing.T) { func(n int64) {}, readWriteCloser{ writeErr: expectedErr, - }).(io.ReadWriteCloser) + }, false).(io.ReadWriteCloser) }) n, err := rwc.Write([]byte{}) assert.Equal(t, writeSize, n, "wrappedBody returned wrong bytes") @@ -617,6 +617,66 @@ func TestTransportErrorStatus(t *testing.T) { } } +func TestTransportWithMessageEventsReadEvents(t *testing.T) { + // Prepare tracing stuff. + spanRecorder := tracetest.NewSpanRecorder() + provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(spanRecorder)) + + content := []byte("Hello, world!") + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write(content) + assert.NoError(t, err) + })) + defer ts.Close() + + // Create our Transport and make request. + tr := NewTransport( + http.DefaultTransport, + WithTracerProvider(provider), + WithMessageEvents(ReadEvents), + ) + c := http.Client{Transport: tr} + r, err := http.NewRequest(http.MethodGet, ts.URL, nil) + if err != nil { + t.Fatal(err) + } + res, err := c.Do(r) // nolint:bodyclose // False-positive. + require.NoError(t, err) + defer func() { assert.NoError(t, res.Body.Close()) }() + + _, err = io.ReadAll(res.Body) + require.NoError(t, err) + + // Check span. + spans := spanRecorder.Ended() + if len(spans) != 1 { + t.Fatalf("expected 1 span; got: %d", len(spans)) + } + span := spans[0] + + events := span.Events() + require.NotEmpty(t, events, "expected span to have events") + + var readEvent sdktrace.Event + for _, ev := range events { + if ev.Name == "read" { + readEvent = ev + break + } + } + require.NotEmpty(t, readEvent, "expected span to have a 'read' event") + + var readBytesAttr attribute.KeyValue + for _, attr := range readEvent.Attributes { + if attr.Key == ReadBytesKey { + readBytesAttr = attr + break + } + } + require.NotEmpty(t, readBytesAttr, "expected span to have a read bytes attribute") + require.Positive(t, readBytesAttr.Value.AsInt64(), "expected read bytes attribute to have a non-zero int64 value") +} + func TestTransportRequestWithTraceContext(t *testing.T) { spanRecorder := tracetest.NewSpanRecorder() provider := sdktrace.NewTracerProvider(