Skip to content
This repository was archived by the owner on Feb 24, 2025. It is now read-only.

Commit c38f7ef

Browse files
test(output): add integration tests
1 parent 133ea7f commit c38f7ef

File tree

2 files changed

+316
-2
lines changed

2 files changed

+316
-2
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ require (
1414
go.opentelemetry.io/otel/metric v1.28.0
1515
go.opentelemetry.io/otel/sdk v1.28.0
1616
go.opentelemetry.io/otel/sdk/metric v1.28.0
17+
go.opentelemetry.io/proto/otlp v1.3.1
1718
google.golang.org/grpc v1.64.1
19+
google.golang.org/protobuf v1.34.2
1820
gopkg.in/guregu/null.v3 v3.3.0
1921
)
2022

@@ -36,13 +38,11 @@ require (
3638
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 // indirect
3739
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect
3840
go.opentelemetry.io/otel/trace v1.28.0 // indirect
39-
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
4041
golang.org/x/net v0.27.0 // indirect
4142
golang.org/x/sys v0.22.0 // indirect
4243
golang.org/x/text v0.16.0 // indirect
4344
golang.org/x/time v0.5.0 // indirect
4445
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
4546
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
46-
google.golang.org/protobuf v1.34.2 // indirect
4747
gopkg.in/yaml.v3 v3.0.1 // indirect
4848
)

pkg/opentelemetry/output_test.go

Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
package opentelemetry
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net"
8+
"net/http"
9+
"net/http/httptest"
10+
"sync"
11+
"testing"
12+
"time"
13+
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"go.k6.io/k6/lib/testutils"
17+
"go.k6.io/k6/metrics"
18+
"go.k6.io/k6/output"
19+
collectormetrics "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
20+
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
21+
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
22+
"google.golang.org/grpc"
23+
"google.golang.org/protobuf/proto"
24+
)
25+
26+
type MetricsServer interface {
27+
Start() error
28+
Stop()
29+
Endpoint() string
30+
LastMetrics() []byte
31+
}
32+
33+
type baseServer struct {
34+
mu sync.Mutex
35+
lastMetrics []byte
36+
}
37+
38+
func (s *baseServer) setLastMetrics(metrics []byte) {
39+
s.mu.Lock()
40+
s.lastMetrics = metrics
41+
s.mu.Unlock()
42+
}
43+
44+
func (s *baseServer) LastMetrics() []byte {
45+
s.mu.Lock()
46+
defer s.mu.Unlock()
47+
return s.lastMetrics
48+
}
49+
50+
type httpMetricsServer struct {
51+
baseServer
52+
server *httptest.Server
53+
}
54+
55+
func newHTTPServer() *httpMetricsServer {
56+
s := &httpMetricsServer{}
57+
s.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
58+
if r.Method != http.MethodPost || r.URL.Path != "/v1/metrics" {
59+
w.WriteHeader(http.StatusNotFound)
60+
return
61+
}
62+
63+
body, err := io.ReadAll(r.Body)
64+
if err != nil {
65+
w.WriteHeader(http.StatusInternalServerError)
66+
return
67+
}
68+
s.setLastMetrics(body)
69+
w.WriteHeader(http.StatusOK)
70+
}))
71+
return s
72+
}
73+
74+
func (s *httpMetricsServer) Start() error { return nil }
75+
func (s *httpMetricsServer) Stop() { s.server.Close() }
76+
func (s *httpMetricsServer) Endpoint() string { return s.server.Listener.Addr().String() }
77+
78+
type grpcMetricsServer struct {
79+
baseServer
80+
server *grpc.Server
81+
listener net.Listener
82+
}
83+
84+
func newGRPCServer() (*grpcMetricsServer, error) {
85+
listener, err := net.Listen("tcp", "localhost:0")
86+
if err != nil {
87+
return nil, fmt.Errorf("failed to create listener: %w", err)
88+
}
89+
90+
s := &grpcMetricsServer{
91+
server: grpc.NewServer(),
92+
listener: listener,
93+
}
94+
95+
collectormetrics.RegisterMetricsServiceServer(s.server, &grpcMetricsHandler{
96+
UnimplementedMetricsServiceServer: collectormetrics.UnimplementedMetricsServiceServer{},
97+
baseServer: &s.baseServer,
98+
})
99+
return s, nil
100+
}
101+
102+
func (s *grpcMetricsServer) Start() error {
103+
go s.server.Serve(s.listener)
104+
return nil
105+
}
106+
107+
func (s *grpcMetricsServer) Stop() {
108+
s.server.Stop()
109+
s.listener.Close()
110+
}
111+
112+
func (s *grpcMetricsServer) Endpoint() string { return s.listener.Addr().String() }
113+
114+
type grpcMetricsHandler struct {
115+
collectormetrics.UnimplementedMetricsServiceServer
116+
baseServer *baseServer
117+
}
118+
119+
func (h *grpcMetricsHandler) Export(ctx context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) {
120+
data, err := proto.Marshal(req)
121+
if err != nil {
122+
return nil, fmt.Errorf("failed to marshal request: %w", err)
123+
}
124+
h.baseServer.setLastMetrics(data)
125+
return &collectormetrics.ExportMetricsServiceResponse{}, nil
126+
}
127+
128+
func createServer(t *testing.T, protocol string) MetricsServer {
129+
switch protocol {
130+
case "http":
131+
return newHTTPServer()
132+
case "grpc":
133+
server, err := newGRPCServer()
134+
require.NoError(t, err)
135+
require.NoError(t, server.Start())
136+
return server
137+
default:
138+
t.Fatalf("unsupported protocol: %s", protocol)
139+
return nil
140+
}
141+
}
142+
143+
func TestOutput(t *testing.T) {
144+
testProtocols := []string{"http", "grpc"}
145+
testCases := []struct {
146+
name string
147+
metric struct {
148+
typ metrics.MetricType
149+
value float64
150+
}
151+
validate func(*testing.T, *collectormetrics.ExportMetricsServiceRequest)
152+
}{
153+
{
154+
name: "gauge_metric",
155+
metric: struct {
156+
typ metrics.MetricType
157+
value float64
158+
}{metrics.Gauge, 42.0},
159+
validate: validateGaugeMetric,
160+
},
161+
{
162+
name: "counter_metric",
163+
metric: struct {
164+
typ metrics.MetricType
165+
value float64
166+
}{metrics.Counter, 10.0},
167+
validate: validateCounterMetric,
168+
},
169+
{
170+
name: "trend_metric",
171+
metric: struct {
172+
typ metrics.MetricType
173+
value float64
174+
}{metrics.Trend, 25.0},
175+
validate: validateTrendMetric,
176+
},
177+
}
178+
179+
for _, protocol := range testProtocols {
180+
t.Run(fmt.Sprintf("%s collector", protocol), func(t *testing.T) {
181+
for _, tc := range testCases {
182+
tc := tc
183+
t.Run(tc.name, func(t *testing.T) {
184+
t.Parallel()
185+
186+
server := createServer(t, protocol)
187+
defer server.Stop()
188+
189+
config := createTestConfig(protocol, server.Endpoint())
190+
output := setupOutput(t, config)
191+
defer output.Stop()
192+
193+
sample := createTestSample(t, tc.metric.typ, tc.metric.value)
194+
output.AddMetricSamples([]metrics.SampleContainer{metrics.Samples([]metrics.Sample{sample})})
195+
196+
time.Sleep(300 * time.Millisecond)
197+
validateMetrics(t, server.LastMetrics(), tc.validate)
198+
})
199+
}
200+
})
201+
}
202+
}
203+
204+
func createTestConfig(protocol, endpoint string) map[string]string {
205+
config := map[string]string{
206+
"K6_OTEL_SERVICE_NAME": "test_service",
207+
"K6_OTEL_FLUSH_INTERVAL": "100ms",
208+
"K6_OTEL_EXPORT_INTERVAL": "100ms",
209+
"K6_OTEL_EXPORTER_TYPE": protocol,
210+
"K6_OTEL_METRIC_PREFIX": "test.",
211+
}
212+
213+
if protocol == "http" {
214+
config["K6_OTEL_HTTP_EXPORTER_INSECURE"] = "true"
215+
config["K6_OTEL_HTTP_EXPORTER_ENDPOINT"] = endpoint
216+
config["K6_OTEL_HTTP_EXPORTER_URL_PATH"] = "/v1/metrics"
217+
} else {
218+
config["K6_OTEL_GRPC_EXPORTER_INSECURE"] = "true"
219+
config["K6_OTEL_GRPC_EXPORTER_ENDPOINT"] = endpoint
220+
}
221+
222+
return config
223+
}
224+
225+
func setupOutput(t *testing.T, config map[string]string) *Output {
226+
o, err := New(output.Params{
227+
Logger: testutils.NewLogger(t),
228+
Environment: config,
229+
})
230+
require.NoError(t, err)
231+
require.NoError(t, o.Start())
232+
return o
233+
}
234+
235+
func createTestSample(t *testing.T, metricType metrics.MetricType, value float64) metrics.Sample {
236+
registry := metrics.NewRegistry()
237+
metricName := metricType.String() + "_metric"
238+
metric, err := registry.NewMetric(metricName, metricType)
239+
require.NoError(t, err)
240+
241+
return metrics.Sample{
242+
TimeSeries: metrics.TimeSeries{
243+
Metric: metric,
244+
Tags: registry.RootTagSet().WithTagsFromMap(map[string]string{
245+
"tag1": "value1",
246+
}),
247+
},
248+
Value: value,
249+
}
250+
}
251+
252+
func validateMetrics(t *testing.T, data []byte, validate func(*testing.T, *collectormetrics.ExportMetricsServiceRequest)) {
253+
require.NotNil(t, data, "No metrics were received by collector")
254+
255+
var metricsRequest collectormetrics.ExportMetricsServiceRequest
256+
err := proto.Unmarshal(data, &metricsRequest)
257+
require.NoError(t, err)
258+
259+
validate(t, &metricsRequest)
260+
}
261+
262+
func validateGaugeMetric(t *testing.T, mr *collectormetrics.ExportMetricsServiceRequest) {
263+
metric := findMetric(mr, "test.gauge_metric")
264+
require.NotNil(t, metric, "gauge metric not found")
265+
gauge := metric.GetGauge()
266+
require.NotNil(t, gauge)
267+
require.Len(t, gauge.DataPoints, 1)
268+
assert.Equal(t, 42.0, gauge.DataPoints[0].GetAsDouble())
269+
assertHasAttribute(t, gauge.DataPoints[0].Attributes, "tag1", "value1")
270+
}
271+
272+
func validateCounterMetric(t *testing.T, mr *collectormetrics.ExportMetricsServiceRequest) {
273+
metric := findMetric(mr, "test.counter_metric")
274+
require.NotNil(t, metric, "counter metric not found")
275+
sum := metric.GetSum()
276+
require.NotNil(t, sum)
277+
require.Len(t, sum.DataPoints, 1)
278+
assert.Equal(t, 10.0, sum.DataPoints[0].GetAsDouble())
279+
assertHasAttribute(t, sum.DataPoints[0].Attributes, "tag1", "value1")
280+
}
281+
282+
func validateTrendMetric(t *testing.T, mr *collectormetrics.ExportMetricsServiceRequest) {
283+
metric := findMetric(mr, "test.trend_metric")
284+
require.NotNil(t, metric, "trend metric not found")
285+
histogram := metric.GetHistogram()
286+
require.NotNil(t, histogram)
287+
require.Len(t, histogram.DataPoints, 1)
288+
assert.Equal(t, uint64(1), histogram.DataPoints[0].GetCount())
289+
assert.Equal(t, 25.0, histogram.DataPoints[0].GetSum())
290+
assertHasAttribute(t, histogram.DataPoints[0].Attributes, "tag1", "value1")
291+
}
292+
293+
func findMetric(mr *collectormetrics.ExportMetricsServiceRequest, name string) *metricpb.Metric {
294+
for _, rm := range mr.GetResourceMetrics() {
295+
for _, sm := range rm.ScopeMetrics {
296+
for _, m := range sm.Metrics {
297+
if m.Name == name {
298+
return m
299+
}
300+
}
301+
}
302+
}
303+
return nil
304+
}
305+
306+
func assertHasAttribute(t *testing.T, attrs []*commonpb.KeyValue, key, value string) {
307+
for _, attr := range attrs {
308+
if attr.Key == key {
309+
assert.Equal(t, value, attr.GetValue().GetStringValue())
310+
return
311+
}
312+
}
313+
t.Errorf("Attribute %s not found", key)
314+
}

0 commit comments

Comments
 (0)