Skip to content
Open
9 changes: 5 additions & 4 deletions instrumentation/net/http/otelhttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,16 @@ const (
WriteEvents
)

// WithMessageEvents configures the Handler to record the specified events
// WithMessageEvents configures the Handler or Transport to record the specified events
// (span.AddEvent) on spans. By default only summary attributes are added at the
// end of the request.
//
// Valid events are:
// - ReadEvents: Record the number of bytes read after every http.Request.Body.Read
// using the ReadBytesKey
// - WriteEvents: Record the number of bytes written after every http.ResponeWriter.Write
// using the WriteBytesKey
// using the ReadBytesKey for Handler. For Transport, it will record the number of
// bytes read after every http.Response.Body.Read.
// - WriteEvents: Record the number of bytes written after every http.ResponseWriter.Write
// using the WriteBytesKey for Handler. Ignored for Transport.
func WithMessageEvents(events ...event) Option {
return optionFunc(func(c *config) {
for _, e := range events {
Expand Down
16 changes: 12 additions & 4 deletions instrumentation/net/http/otelhttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
spanNameFormatter func(string, *http.Request) string
clientTrace func(context.Context) *httptrace.ClientTrace
metricAttributesFn func(*http.Request) []attribute.KeyValue
readEvent bool

semconv semconv.HTTPClient
}
Expand Down Expand Up @@ -74,6 +75,7 @@
t.clientTrace = c.ClientTrace
t.semconv = semconv.NewHTTPClient(c.Meter)
t.metricAttributesFn = c.MetricAttributesFn
t.readEvent = c.ReadEvent
}

func defaultTransportFormatter(_ string, r *http.Request) string {
Expand Down Expand Up @@ -153,7 +155,7 @@
t.semconv.RecordResponseSize(ctx, n, metricOpts)
}

res.Body = newWrappedBody(span, readRecordFunc, res.Body)
res.Body = newWrappedBody(span, readRecordFunc, res.Body, t.readEvent)
}

// Use floating point division here for higher precision (instead of Millisecond method).
Expand Down Expand Up @@ -198,17 +200,17 @@
// newWrappedBody returns a new and appropriately scoped *wrappedBody as an
// io.ReadCloser. If the passed body implements io.Writer, the returned value
// will implement io.ReadWriteCloser.
func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) io.ReadCloser {
func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser, readEvent bool) io.ReadCloser {
// The successful protocol switch responses will have a body that
// implement an io.ReadWriteCloser. Ensure this interface type continues
// to be satisfied if that is the case.
if _, ok := body.(io.ReadWriteCloser); ok {
return &wrappedBody{span: span, record: record, body: body}
return &wrappedBody{span: span, record: record, body: body, readEvent: readEvent}
}

// Remove the implementation of the io.ReadWriteCloser and only implement
// the io.ReadCloser.
return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body}}
return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body, readEvent: readEvent}}
}

// wrappedBody is the response body type returned by the transport
Expand All @@ -225,6 +227,8 @@
record func(n int64)
body io.ReadCloser
read atomic.Int64

readEvent bool
}

var _ io.ReadWriteCloser = &wrappedBody{}
Expand All @@ -244,6 +248,10 @@
// Record the number of bytes read
wb.read.Add(int64(n))

if wb.readEvent {
wb.span.AddEvent("read", trace.WithAttributes(ReadBytesKey.Int64(int64(n))))
}

Check warning on line 253 in instrumentation/net/http/otelhttp/transport.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/net/http/otelhttp/transport.go#L252-L253

Added lines #L252 - L253 were not covered by tests

switch err {
case nil:
// nothing to do here but fall through to the return
Expand Down
20 changes: 10 additions & 10 deletions instrumentation/net/http/otelhttp/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestWrappedBodyRead(t *testing.T) {
s := new(span)
called := false
record := func(numBytes int64) { called = true }
wb := newWrappedBody(s, record, readCloser{})
wb := newWrappedBody(s, record, readCloser{}, false)
n, err := wb.Read([]byte{})
assert.Equal(t, readSize, n, "wrappedBody returned wrong bytes")
assert.NoError(t, err)
Expand All @@ -276,7 +276,7 @@ func TestWrappedBodyReadEOFError(t *testing.T) {
called = true
numRecorded = numBytes
}
wb := newWrappedBody(s, record, readCloser{readErr: io.EOF})
wb := newWrappedBody(s, record, readCloser{readErr: io.EOF}, false)
n, err := wb.Read([]byte{})
assert.Equal(t, readSize, n, "wrappedBody returned wrong bytes")
assert.Equal(t, io.EOF, err)
Expand All @@ -290,7 +290,7 @@ func TestWrappedBodyReadError(t *testing.T) {
called := false
record := func(int64) { called = true }
expectedErr := errors.New("test")
wb := newWrappedBody(s, record, readCloser{readErr: expectedErr})
wb := newWrappedBody(s, record, readCloser{readErr: expectedErr}, false)
n, err := wb.Read([]byte{})
assert.Equal(t, readSize, n, "wrappedBody returned wrong bytes")
assert.Equal(t, expectedErr, err)
Expand All @@ -302,7 +302,7 @@ func TestWrappedBodyClose(t *testing.T) {
s := new(span)
called := false
record := func(int64) { called = true }
wb := newWrappedBody(s, record, readCloser{})
wb := newWrappedBody(s, record, readCloser{}, false)
assert.NoError(t, wb.Close())
s.assert(t, true, nil, codes.Unset, "")
assert.True(t, called, "record should have been called")
Expand All @@ -311,7 +311,7 @@ func TestWrappedBodyClose(t *testing.T) {
func TestWrappedBodyClosePanic(t *testing.T) {
s := new(span)
var body io.ReadCloser
wb := newWrappedBody(s, func(n int64) {}, body)
wb := newWrappedBody(s, func(n int64) {}, body, false)
assert.NotPanics(t, func() { wb.Close() }, "nil body should not panic on close")
}

Expand All @@ -320,7 +320,7 @@ func TestWrappedBodyCloseError(t *testing.T) {
called := false
record := func(int64) { called = true }
expectedErr := errors.New("test")
wb := newWrappedBody(s, record, readCloser{closeErr: expectedErr})
wb := newWrappedBody(s, record, readCloser{closeErr: expectedErr}, false)
assert.Equal(t, expectedErr, wb.Close())
s.assert(t, true, nil, codes.Unset, "")
assert.True(t, called, "record should have been called")
Expand All @@ -339,12 +339,12 @@ func (rwc readWriteCloser) Write([]byte) (int, error) {
}

func TestNewWrappedBodyReadWriteCloserImplementation(t *testing.T) {
wb := newWrappedBody(nil, func(n int64) {}, readWriteCloser{})
wb := newWrappedBody(nil, func(n int64) {}, readWriteCloser{}, false)
assert.Implements(t, (*io.ReadWriteCloser)(nil), wb)
}

func TestNewWrappedBodyReadCloserImplementation(t *testing.T) {
wb := newWrappedBody(nil, func(n int64) {}, readCloser{})
wb := newWrappedBody(nil, func(n int64) {}, readCloser{}, false)
assert.Implements(t, (*io.ReadCloser)(nil), wb)

_, ok := wb.(io.ReadWriteCloser)
Expand All @@ -355,7 +355,7 @@ func TestWrappedBodyWrite(t *testing.T) {
s := new(span)
var rwc io.ReadWriteCloser
assert.NotPanics(t, func() {
rwc = newWrappedBody(s, func(n int64) {}, readWriteCloser{}).(io.ReadWriteCloser)
rwc = newWrappedBody(s, func(n int64) {}, readWriteCloser{}, false).(io.ReadWriteCloser)
})

n, err := rwc.Write([]byte{})
Expand All @@ -373,7 +373,7 @@ func TestWrappedBodyWriteError(t *testing.T) {
func(n int64) {},
readWriteCloser{
writeErr: expectedErr,
}).(io.ReadWriteCloser)
}, false).(io.ReadWriteCloser)
})
n, err := rwc.Write([]byte{})
assert.Equal(t, writeSize, n, "wrappedBody returned wrong bytes")
Expand Down
Loading