Skip to content
This repository was archived by the owner on Feb 24, 2025. It is now read-only.

Commit dc79ff2

Browse files
authored
Merge pull request #26 from TimotejKovacka/main
test(output): add integration tests
2 parents 133ea7f + 14cfd08 commit dc79ff2

File tree

2 files changed

+338
-2
lines changed

2 files changed

+338
-2
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ require (
1414
go.opentelemetry.io/otel/metric v1.28.0
1515
go.opentelemetry.io/otel/sdk v1.28.0
1616
go.opentelemetry.io/otel/sdk/metric v1.28.0
17+
go.opentelemetry.io/proto/otlp v1.3.1
1718
google.golang.org/grpc v1.64.1
19+
google.golang.org/protobuf v1.34.2
1820
gopkg.in/guregu/null.v3 v3.3.0
1921
)
2022

@@ -36,13 +38,11 @@ require (
3638
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 // indirect
3739
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect
3840
go.opentelemetry.io/otel/trace v1.28.0 // indirect
39-
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
4041
golang.org/x/net v0.27.0 // indirect
4142
golang.org/x/sys v0.22.0 // indirect
4243
golang.org/x/text v0.16.0 // indirect
4344
golang.org/x/time v0.5.0 // indirect
4445
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
4546
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
46-
google.golang.org/protobuf v1.34.2 // indirect
4747
gopkg.in/yaml.v3 v3.0.1 // indirect
4848
)

pkg/opentelemetry/output_test.go

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
package opentelemetry
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net"
8+
"net/http"
9+
"net/http/httptest"
10+
"sync"
11+
"testing"
12+
"time"
13+
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"go.k6.io/k6/lib/testutils"
17+
"go.k6.io/k6/metrics"
18+
"go.k6.io/k6/output"
19+
collectormetrics "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
20+
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
21+
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
22+
"google.golang.org/grpc"
23+
"google.golang.org/protobuf/proto"
24+
)
25+
26+
type MetricsServer interface {
27+
Start() error
28+
Stop()
29+
Endpoint() string
30+
LastMetrics() []byte
31+
}
32+
33+
type baseServer struct {
34+
mu sync.Mutex
35+
lastMetrics []byte
36+
}
37+
38+
func (s *baseServer) setLastMetrics(metrics []byte) {
39+
s.mu.Lock()
40+
s.lastMetrics = metrics
41+
s.mu.Unlock()
42+
}
43+
44+
func (s *baseServer) LastMetrics() []byte {
45+
s.mu.Lock()
46+
defer s.mu.Unlock()
47+
return s.lastMetrics
48+
}
49+
50+
type httpMetricsServer struct {
51+
baseServer
52+
server *httptest.Server
53+
}
54+
55+
func newHTTPServer() *httpMetricsServer {
56+
s := &httpMetricsServer{}
57+
s.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
58+
if r.Method != http.MethodPost || r.URL.Path != "/v1/metrics" {
59+
w.WriteHeader(http.StatusNotFound)
60+
return
61+
}
62+
63+
body, err := io.ReadAll(r.Body)
64+
if err != nil {
65+
w.WriteHeader(http.StatusInternalServerError)
66+
return
67+
}
68+
s.setLastMetrics(body)
69+
w.WriteHeader(http.StatusOK)
70+
}))
71+
return s
72+
}
73+
74+
func (s *httpMetricsServer) Start() error { return nil }
75+
func (s *httpMetricsServer) Stop() { s.server.Close() }
76+
func (s *httpMetricsServer) Endpoint() string { return s.server.Listener.Addr().String() }
77+
78+
type grpcMetricsServer struct {
79+
baseServer
80+
server *grpc.Server
81+
listener net.Listener
82+
}
83+
84+
func newGRPCServer() (*grpcMetricsServer, error) {
85+
listener, err := net.Listen("tcp", "localhost:0")
86+
if err != nil {
87+
return nil, fmt.Errorf("failed to create listener: %w", err)
88+
}
89+
90+
s := &grpcMetricsServer{
91+
server: grpc.NewServer(),
92+
listener: listener,
93+
}
94+
95+
collectormetrics.RegisterMetricsServiceServer(s.server, &grpcMetricsHandler{
96+
UnimplementedMetricsServiceServer: collectormetrics.UnimplementedMetricsServiceServer{},
97+
baseServer: &s.baseServer,
98+
})
99+
return s, nil
100+
}
101+
102+
func (s *grpcMetricsServer) Start() error {
103+
errChan := make(chan error, 1)
104+
go func() {
105+
if err := s.server.Serve(s.listener); err != nil {
106+
errChan <- fmt.Errorf("server failed to serve: %w", err)
107+
}
108+
close(errChan)
109+
}()
110+
111+
select {
112+
case err := <-errChan:
113+
return err
114+
case <-time.After(100 * time.Millisecond):
115+
return nil
116+
}
117+
}
118+
119+
func (s *grpcMetricsServer) Stop() {
120+
s.server.Stop()
121+
if err := s.listener.Close(); err != nil {
122+
_ = err
123+
}
124+
}
125+
126+
func (s *grpcMetricsServer) Endpoint() string { return s.listener.Addr().String() }
127+
128+
type grpcMetricsHandler struct {
129+
collectormetrics.UnimplementedMetricsServiceServer
130+
baseServer *baseServer
131+
}
132+
133+
func (h *grpcMetricsHandler) Export(_ context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) {
134+
data, err := proto.Marshal(req)
135+
if err != nil {
136+
return nil, fmt.Errorf("failed to marshal request: %w", err)
137+
}
138+
h.baseServer.setLastMetrics(data)
139+
return &collectormetrics.ExportMetricsServiceResponse{}, nil
140+
}
141+
142+
func createServer(t *testing.T, protocol string) MetricsServer {
143+
switch protocol {
144+
case "http":
145+
return newHTTPServer()
146+
case "grpc":
147+
server, err := newGRPCServer()
148+
require.NoError(t, err)
149+
require.NoError(t, server.Start())
150+
return server
151+
default:
152+
t.Fatalf("unsupported protocol: %s", protocol)
153+
return nil
154+
}
155+
}
156+
157+
func TestOutput(t *testing.T) {
158+
t.Parallel()
159+
160+
testProtocols := []string{"http", "grpc"}
161+
testCases := []struct {
162+
name string
163+
metric struct {
164+
typ metrics.MetricType
165+
value float64
166+
}
167+
validate func(*testing.T, *collectormetrics.ExportMetricsServiceRequest)
168+
}{
169+
{
170+
name: "gauge_metric",
171+
metric: struct {
172+
typ metrics.MetricType
173+
value float64
174+
}{metrics.Gauge, 42.0},
175+
validate: validateGaugeMetric,
176+
},
177+
{
178+
name: "counter_metric",
179+
metric: struct {
180+
typ metrics.MetricType
181+
value float64
182+
}{metrics.Counter, 10.0},
183+
validate: validateCounterMetric,
184+
},
185+
{
186+
name: "trend_metric",
187+
metric: struct {
188+
typ metrics.MetricType
189+
value float64
190+
}{metrics.Trend, 25.0},
191+
validate: validateTrendMetric,
192+
},
193+
}
194+
195+
for _, proto := range testProtocols {
196+
proto := proto
197+
t.Run(fmt.Sprintf("%s collector", proto), func(t *testing.T) {
198+
t.Parallel()
199+
for _, tc := range testCases {
200+
tc := tc
201+
t.Run(tc.name, func(t *testing.T) {
202+
t.Parallel()
203+
204+
server := createServer(t, proto)
205+
defer server.Stop()
206+
207+
config := createTestConfig(proto, server.Endpoint())
208+
output := setupOutput(t, config)
209+
defer func() {
210+
if err := output.Stop(); err != nil {
211+
t.Errorf("failed to stop output: %v", err)
212+
}
213+
}()
214+
215+
sample := createTestSample(t, tc.metric.typ, tc.metric.value)
216+
output.AddMetricSamples([]metrics.SampleContainer{metrics.Samples([]metrics.Sample{sample})})
217+
218+
time.Sleep(300 * time.Millisecond)
219+
validateMetrics(t, server.LastMetrics(), tc.validate)
220+
})
221+
}
222+
})
223+
}
224+
}
225+
226+
func createTestConfig(protocol, endpoint string) map[string]string {
227+
config := map[string]string{
228+
"K6_OTEL_SERVICE_NAME": "test_service",
229+
"K6_OTEL_FLUSH_INTERVAL": "100ms",
230+
"K6_OTEL_EXPORT_INTERVAL": "100ms",
231+
"K6_OTEL_EXPORTER_TYPE": protocol,
232+
"K6_OTEL_METRIC_PREFIX": "test.",
233+
}
234+
235+
if protocol == "http" {
236+
config["K6_OTEL_HTTP_EXPORTER_INSECURE"] = "true"
237+
config["K6_OTEL_HTTP_EXPORTER_ENDPOINT"] = endpoint
238+
config["K6_OTEL_HTTP_EXPORTER_URL_PATH"] = "/v1/metrics"
239+
} else {
240+
config["K6_OTEL_GRPC_EXPORTER_INSECURE"] = "true"
241+
config["K6_OTEL_GRPC_EXPORTER_ENDPOINT"] = endpoint
242+
}
243+
244+
return config
245+
}
246+
247+
func setupOutput(t *testing.T, config map[string]string) *Output {
248+
o, err := New(output.Params{
249+
Logger: testutils.NewLogger(t),
250+
Environment: config,
251+
})
252+
require.NoError(t, err)
253+
require.NoError(t, o.Start())
254+
return o
255+
}
256+
257+
func createTestSample(t *testing.T, metricType metrics.MetricType, value float64) metrics.Sample {
258+
registry := metrics.NewRegistry()
259+
metricName := metricType.String() + "_metric"
260+
metric, err := registry.NewMetric(metricName, metricType)
261+
require.NoError(t, err)
262+
263+
return metrics.Sample{
264+
TimeSeries: metrics.TimeSeries{
265+
Metric: metric,
266+
Tags: registry.RootTagSet().WithTagsFromMap(map[string]string{
267+
"tag1": "value1",
268+
}),
269+
},
270+
Value: value,
271+
}
272+
}
273+
274+
func validateMetrics(t *testing.T, data []byte, validate func(*testing.T, *collectormetrics.ExportMetricsServiceRequest)) {
275+
require.NotNil(t, data, "No metrics were received by collector")
276+
277+
var metricsRequest collectormetrics.ExportMetricsServiceRequest
278+
err := proto.Unmarshal(data, &metricsRequest)
279+
require.NoError(t, err)
280+
281+
validate(t, &metricsRequest)
282+
}
283+
284+
func validateGaugeMetric(t *testing.T, mr *collectormetrics.ExportMetricsServiceRequest) {
285+
metric := findMetric(mr, "test.gauge_metric")
286+
require.NotNil(t, metric, "gauge metric not found")
287+
gauge := metric.GetGauge()
288+
require.NotNil(t, gauge)
289+
require.Len(t, gauge.DataPoints, 1)
290+
assert.Equal(t, 42.0, gauge.DataPoints[0].GetAsDouble())
291+
assertHasAttribute(t, gauge.DataPoints[0].Attributes, "tag1", "value1")
292+
}
293+
294+
func validateCounterMetric(t *testing.T, mr *collectormetrics.ExportMetricsServiceRequest) {
295+
metric := findMetric(mr, "test.counter_metric")
296+
require.NotNil(t, metric, "counter metric not found")
297+
sum := metric.GetSum()
298+
require.NotNil(t, sum)
299+
require.Len(t, sum.DataPoints, 1)
300+
assert.Equal(t, 10.0, sum.DataPoints[0].GetAsDouble())
301+
assertHasAttribute(t, sum.DataPoints[0].Attributes, "tag1", "value1")
302+
}
303+
304+
func validateTrendMetric(t *testing.T, mr *collectormetrics.ExportMetricsServiceRequest) {
305+
metric := findMetric(mr, "test.trend_metric")
306+
require.NotNil(t, metric, "trend metric not found")
307+
histogram := metric.GetHistogram()
308+
require.NotNil(t, histogram)
309+
require.Len(t, histogram.DataPoints, 1)
310+
assert.Equal(t, uint64(1), histogram.DataPoints[0].GetCount())
311+
assert.Equal(t, 25.0, histogram.DataPoints[0].GetSum())
312+
assertHasAttribute(t, histogram.DataPoints[0].Attributes, "tag1", "value1")
313+
}
314+
315+
func findMetric(mr *collectormetrics.ExportMetricsServiceRequest, name string) *metricpb.Metric {
316+
for _, rm := range mr.GetResourceMetrics() {
317+
for _, sm := range rm.ScopeMetrics {
318+
for _, m := range sm.Metrics {
319+
if m.Name == name {
320+
return m
321+
}
322+
}
323+
}
324+
}
325+
return nil
326+
}
327+
328+
func assertHasAttribute(t *testing.T, attrs []*commonpb.KeyValue, key, value string) {
329+
for _, attr := range attrs {
330+
if attr.Key == key {
331+
assert.Equal(t, value, attr.GetValue().GetStringValue())
332+
return
333+
}
334+
}
335+
t.Errorf("Attribute %s not found", key)
336+
}

0 commit comments

Comments
 (0)