Skip to content

Commit 0c8b7d6

Browse files
authored
feat(collector): improve run.ai metrics collection resilience (#2700)
1 parent 2a8287a commit 0c8b7d6

File tree

1 file changed

+37
-29
lines changed

1 file changed

+37
-29
lines changed

collector/benthos/input/runai/metrics.go

+37-29
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"net/http"
7+
"maps"
88
"strconv"
99
"strings"
1010
"time"
@@ -92,13 +92,13 @@ func (s *Service) GetWorkloadMetrics(ctx context.Context, workloadID string, par
9292
return m, err
9393
}
9494

95-
if resp.StatusCode() != http.StatusOK {
96-
return m, fmt.Errorf("failed to get workload metrics, status code: %d", resp.StatusCode())
95+
if resp.StatusCode() >= 400 {
96+
return m, fmt.Errorf("failed to get workload metrics after %d retries, status: %d %s", s.client.RetryCount, resp.StatusCode(), resp.Status())
9797
}
9898

99-
result := resp.Result().(*MeasurementResponse)
100-
if result == nil {
101-
return m, fmt.Errorf("failed to get workload metrics, result is nil")
99+
result, ok := resp.Result().(*MeasurementResponse)
100+
if !ok || result == nil {
101+
return m, fmt.Errorf("failed to get workload metrics due to invalid response")
102102
}
103103

104104
for _, measurement := range result.Measurements {
@@ -149,8 +149,8 @@ func (s *Service) GetAllWorkloadWithMetrics(ctx context.Context, params Measurem
149149
return nil, err
150150
}
151151

152-
workloadsWithMetrics := make([]WorkloadWithMetrics, len(workloads))
153-
for i, workload := range workloads {
152+
workloadsWithMetrics := make([]WorkloadWithMetrics, 0, len(workloads))
153+
for _, workload := range workloads {
154154
metrics := Metrics{
155155
Timestamp: params.EndTime.UTC(),
156156
Values: make(map[MetricType]float64),
@@ -164,17 +164,21 @@ func (s *Service) GetAllWorkloadWithMetrics(ctx context.Context, params Measurem
164164
EndTime: params.EndTime.UTC(),
165165
})
166166
if err != nil {
167-
return nil, err
167+
// We don't want to fail the whole operation if one workload fails to get metrics
168+
s.logger.With("startTime", params.StartTime.UTC(), "endTime", params.EndTime.UTC(), "workloadID", workload.ID, "metricTypes", metricTypes).Errorf("failed to get workload metrics: %w", err)
169+
continue
168170
}
169171

170-
for mt, v := range m.Values {
171-
metrics.Values[mt] = v
172-
}
172+
// Copy the metrics into the metrics struct
173+
maps.Copy(metrics.Values, m.Values)
173174
}
174175

175-
workloadsWithMetrics[i] = WorkloadWithMetrics{
176-
Workload: workload,
177-
Metrics: metrics,
176+
// Only add the workload to the list if it has metrics
177+
if len(metrics.Values) > 0 {
178+
workloadsWithMetrics = append(workloadsWithMetrics, WorkloadWithMetrics{
179+
Workload: workload,
180+
Metrics: metrics,
181+
})
178182
}
179183
}
180184

@@ -228,13 +232,13 @@ func (s *Service) GetPodMetrics(ctx context.Context, workloadID string, podID st
228232
return m, err
229233
}
230234

231-
if resp.StatusCode() != http.StatusOK {
232-
return m, fmt.Errorf("failed to get pod metrics, status code: %d", resp.StatusCode())
235+
if resp.StatusCode() >= 400 {
236+
return m, fmt.Errorf("failed to get pod metrics after %d retries, status: %d %s", s.client.RetryCount, resp.StatusCode(), resp.Status())
233237
}
234238

235-
result := resp.Result().(*MeasurementResponse)
236-
if result == nil {
237-
return m, fmt.Errorf("failed to get pod metrics, result is nil")
239+
result, ok := resp.Result().(*MeasurementResponse)
240+
if !ok || result == nil {
241+
return m, fmt.Errorf("failed to get pod metrics due to invalid response")
238242
}
239243

240244
for _, measurement := range result.Measurements {
@@ -285,8 +289,8 @@ func (s *Service) GetAllPodWithMetrics(ctx context.Context, params MeasurementPa
285289
return nil, err
286290
}
287291

288-
podsWithMetrics := make([]PodWithMetrics, len(pods))
289-
for i, pod := range pods {
292+
podsWithMetrics := make([]PodWithMetrics, 0, len(pods))
293+
for _, pod := range pods {
290294
metrics := Metrics{
291295
Timestamp: params.EndTime.UTC(),
292296
Values: make(map[MetricType]float64),
@@ -300,17 +304,21 @@ func (s *Service) GetAllPodWithMetrics(ctx context.Context, params MeasurementPa
300304
EndTime: params.EndTime.UTC(),
301305
})
302306
if err != nil {
303-
return nil, err
307+
// We don't want to fail the whole operation if one pod fails to get metrics
308+
s.logger.With("startTime", params.StartTime.UTC(), "endTime", params.EndTime.UTC(), "workloadID", pod.WorkloadID, "podID", pod.ID, "metricTypes", metricTypes).Errorf("failed to get pod metrics: %w", err)
309+
continue
304310
}
305311

306-
for mt, v := range m.Values {
307-
metrics.Values[mt] = v
308-
}
312+
// Copy the metrics into the metrics struct
313+
maps.Copy(metrics.Values, m.Values)
309314
}
310315

311-
podsWithMetrics[i] = PodWithMetrics{
312-
Pod: pod,
313-
Metrics: metrics,
316+
// Only add the pod to the list if it has metrics
317+
if len(metrics.Values) > 0 {
318+
podsWithMetrics = append(podsWithMetrics, PodWithMetrics{
319+
Pod: pod,
320+
Metrics: metrics,
321+
})
314322
}
315323
}
316324

0 commit comments

Comments
 (0)