Skip to content
This repository was archived by the owner on Apr 18, 2023. It is now read-only.

Commit 3d3c285

Browse files
use stats.handler instead of interceptor and add message size monitor
1 parent b7dd0c7 commit 3d3c285

14 files changed

+828
-13
lines changed

client.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ var (
2020

2121
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
2222
StreamClientInterceptor = DefaultClientMetrics.StreamClientInterceptor()
23+
24+
// ClientStatsHandler is a gRPC client-side stats.Handler that provides Prometheus monitoring for RPCs.
25+
ClientStatsHandler = DefaultClientMetrics.NewClientStatsHandler()
2326
)
2427

2528
func init() {
@@ -55,3 +58,21 @@ func EnableClientStreamSendTimeHistogram(opts ...HistogramOption) {
5558
DefaultClientMetrics.EnableClientStreamSendTimeHistogram(opts...)
5659
prom.Register(DefaultClientMetrics.clientStreamSendHistogram)
5760
}
61+
62+
// EnableClientMsgSizeReceivedBytesHistogram turns on recording of
63+
// single message send time of streaming RPCs.
64+
// This function acts on the DefaultClientMetrics variable and the
65+
// default Prometheus metrics registry.
66+
func EnableClientMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
67+
DefaultClientMetrics.EnableMsgSizeReceivedBytesHistogram(opts...)
68+
prom.Register(DefaultClientMetrics.clientMsgSizeReceivedHistogram)
69+
}
70+
71+
// EnableClientMsgSizeSentBytesHistogram turns on recording of
72+
// single message send time of streaming RPCs.
73+
// This function acts on the DefaultClientMetrics variable and the
74+
// default Prometheus metrics registry.
75+
func EnableClientMsgSizeSentBytesHistogram(opts ...HistogramOption) {
76+
DefaultClientMetrics.EnableMsgSizeSentBytesHistogram(opts...)
77+
prom.Register(DefaultClientMetrics.clientMsgSizeSentHistogram)
78+
}

client_metrics.go

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,18 @@ import (
77
prom "github.com/prometheus/client_golang/prometheus"
88
"google.golang.org/grpc"
99
"google.golang.org/grpc/codes"
10+
"google.golang.org/grpc/stats"
1011
"google.golang.org/grpc/status"
1112
)
1213

1314
// ClientMetrics represents a collection of metrics to be registered on a
1415
// Prometheus metrics registry for a gRPC client.
1516
type ClientMetrics struct {
16-
clientStartedCounter *prom.CounterVec
17-
clientHandledCounter *prom.CounterVec
18-
clientStreamMsgReceived *prom.CounterVec
19-
clientStreamMsgSent *prom.CounterVec
17+
clientStartedCounter *prom.CounterVec
18+
clientStartedCounterOpts prom.CounterOpts
19+
clientHandledCounter *prom.CounterVec
20+
clientStreamMsgReceived *prom.CounterVec
21+
clientStreamMsgSent *prom.CounterVec
2022

2123
clientHandledHistogramEnabled bool
2224
clientHandledHistogramOpts prom.HistogramOpts
@@ -29,6 +31,14 @@ type ClientMetrics struct {
2931
clientStreamSendHistogramEnabled bool
3032
clientStreamSendHistogramOpts prom.HistogramOpts
3133
clientStreamSendHistogram *prom.HistogramVec
34+
35+
clientMsgSizeReceivedHistogramEnabled bool
36+
clientMsgSizeReceivedHistogramOpts prom.HistogramOpts
37+
clientMsgSizeReceivedHistogram *prom.HistogramVec
38+
39+
clientMsgSizeSentHistogramEnabled bool
40+
clientMsgSizeSentHistogramOpts prom.HistogramOpts
41+
clientMsgSizeSentHistogram *prom.HistogramVec
3242
}
3343

3444
// NewClientMetrics returns a ClientMetrics object. Use a new instance of
@@ -82,7 +92,21 @@ func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
8292
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
8393
Buckets: prom.DefBuckets,
8494
},
85-
clientStreamSendHistogram: nil,
95+
clientStreamSendHistogram: nil,
96+
clientMsgSizeReceivedHistogramEnabled: false,
97+
clientMsgSizeReceivedHistogramOpts: prom.HistogramOpts{
98+
Name: "grpc_client_msg_size_received_bytes",
99+
Help: "Histogram of message sizes received by the client.",
100+
Buckets: defMsgBytesBuckets,
101+
},
102+
clientMsgSizeReceivedHistogram: nil,
103+
clientMsgSizeSentHistogramEnabled: false,
104+
clientMsgSizeSentHistogramOpts: prom.HistogramOpts{
105+
Name: "grpc_client_msg_size_sent_bytes",
106+
Help: "Histogram of message sizes sent by the client.",
107+
Buckets: defMsgBytesBuckets,
108+
},
109+
clientMsgSizeSentHistogram: nil,
86110
}
87111
}
88112

@@ -103,6 +127,12 @@ func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
103127
if m.clientStreamSendHistogramEnabled {
104128
m.clientStreamSendHistogram.Describe(ch)
105129
}
130+
if m.clientMsgSizeReceivedHistogramEnabled {
131+
m.clientMsgSizeReceivedHistogram.Describe(ch)
132+
}
133+
if m.clientMsgSizeSentHistogramEnabled {
134+
m.clientMsgSizeSentHistogram.Describe(ch)
135+
}
106136
}
107137

108138
// Collect is called by the Prometheus registry when collecting
@@ -122,6 +152,12 @@ func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
122152
if m.clientStreamSendHistogramEnabled {
123153
m.clientStreamSendHistogram.Collect(ch)
124154
}
155+
if m.clientMsgSizeReceivedHistogramEnabled {
156+
m.clientMsgSizeReceivedHistogram.Collect(ch)
157+
}
158+
if m.clientMsgSizeSentHistogramEnabled {
159+
m.clientMsgSizeSentHistogram.Collect(ch)
160+
}
125161
}
126162

127163
// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
@@ -173,6 +209,38 @@ func (m *ClientMetrics) EnableClientStreamSendTimeHistogram(opts ...HistogramOpt
173209
m.clientStreamSendHistogramEnabled = true
174210
}
175211

212+
// EnableMsgSizeReceivedBytesHistogram turns on recording of received message size of RPCs.
213+
// Histogram metrics can be very expensive for Prometheus to retain and query. It takes
214+
// options to configure histogram options such as the defined buckets.
215+
func (m *ClientMetrics) EnableMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
216+
for _, o := range opts {
217+
o(&m.clientMsgSizeReceivedHistogramOpts)
218+
}
219+
if !m.clientMsgSizeReceivedHistogramEnabled {
220+
m.clientMsgSizeReceivedHistogram = prom.NewHistogramVec(
221+
m.clientMsgSizeReceivedHistogramOpts,
222+
[]string{"grpc_service", "grpc_method", "grpc_stats"},
223+
)
224+
}
225+
m.clientMsgSizeReceivedHistogramEnabled = true
226+
}
227+
228+
// EnableMsgSizeSentBytesHistogram turns on recording of sent message size of RPCs.
229+
// Histogram metrics can be very expensive for Prometheus to retain and query. It
230+
// takes options to configure histogram options such as the defined buckets.
231+
func (m *ClientMetrics) EnableMsgSizeSentBytesHistogram(opts ...HistogramOption) {
232+
for _, o := range opts {
233+
o(&m.clientMsgSizeSentHistogramOpts)
234+
}
235+
if !m.clientMsgSizeSentHistogramEnabled {
236+
m.clientMsgSizeSentHistogram = prom.NewHistogramVec(
237+
m.clientMsgSizeSentHistogramOpts,
238+
[]string{"grpc_service", "grpc_method", "grpc_stats"},
239+
)
240+
}
241+
m.clientMsgSizeSentHistogramEnabled = true
242+
}
243+
176244
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
177245
func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
178246
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
@@ -202,6 +270,13 @@ func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc
202270
}
203271
}
204272

273+
// NewClientStatsHandler is a gRPC client-side stats.Handler that providers Prometheus monitoring for RPCs.
274+
func (m *ClientMetrics) NewClientStatsHandler() stats.Handler {
275+
return &clientStatsHandler{
276+
clientMetrics: m,
277+
}
278+
}
279+
205280
func clientStreamType(desc *grpc.StreamDesc) grpcType {
206281
if desc.ClientStreams && !desc.ServerStreams {
207282
return ClientStream

client_reporter.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package grpc_prometheus
55

66
import (
7+
"fmt"
78
"time"
89

910
"github.com/prometheus/client_golang/prometheus"
@@ -31,6 +32,16 @@ func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *c
3132
return r
3233
}
3334

35+
func newClientReporterForStatsHanlder(startTime time.Time, m *ClientMetrics, fullMethod string) *clientReporter {
36+
r := &clientReporter{
37+
metrics: m,
38+
rpcType: Unary,
39+
startTime: startTime,
40+
}
41+
r.serviceName, r.methodName = splitMethodName(fullMethod)
42+
return r
43+
}
44+
3445
// timer is a helper interface to time functions.
3546
type timer interface {
3647
ObserveDuration() time.Duration
@@ -54,10 +65,25 @@ func (r *clientReporter) ReceiveMessageTimer() timer {
5465
return emptyTimer
5566
}
5667

68+
func (r *clientReporter) StartedConn() {
69+
r.metrics.clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
70+
}
71+
5772
func (r *clientReporter) ReceivedMessage() {
5873
r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
5974
}
6075

76+
// ReceivedMessageSize counts the size of received messages on client-side
77+
func (r *clientReporter) ReceivedMessageSize(rpcStats grpcStats, size float64) {
78+
if rpcStats == Payload {
79+
r.ReceivedMessage()
80+
}
81+
82+
if r.metrics.clientMsgSizeReceivedHistogramEnabled {
83+
r.metrics.clientMsgSizeReceivedHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size)
84+
}
85+
}
86+
6187
func (r *clientReporter) SendMessageTimer() timer {
6288
if r.metrics.clientStreamSendHistogramEnabled {
6389
hist := r.metrics.clientStreamSendHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName)
@@ -71,9 +97,26 @@ func (r *clientReporter) SentMessage() {
7197
r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
7298
}
7399

100+
// SentMessageSize counts the size of sent messages on client-side
101+
func (r *clientReporter) SentMessageSize(rpcStats grpcStats, size float64) {
102+
if rpcStats == Payload {
103+
r.SentMessage()
104+
}
105+
106+
if r.metrics.clientMsgSizeSentHistogramEnabled {
107+
r.metrics.clientMsgSizeSentHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size)
108+
}
109+
}
110+
111+
// StartTime is used to reset the value of the startTime
112+
func (r *clientReporter) StartTime(t time.Time) {
113+
r.startTime = t
114+
}
115+
74116
func (r *clientReporter) Handled(code codes.Code) {
75117
r.metrics.clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
76118
if r.metrics.clientHandledHistogramEnabled {
119+
fmt.Printf("client handled count + 1: %v,%f\n", code, time.Since(r.startTime).Seconds())
77120
r.metrics.clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
78121
}
79122
}

client_stats_handler.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package grpc_prometheus
2+
3+
import (
4+
"context"
5+
6+
"google.golang.org/grpc/stats"
7+
"google.golang.org/grpc/status"
8+
)
9+
10+
type clientStatsHandler struct {
11+
clientMetrics *ClientMetrics
12+
}
13+
14+
// TagRPC implements the stats.Hanlder interface.
15+
func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
16+
rpcInfo := newRPCInfo(info.FullMethodName)
17+
return context.WithValue(ctx, &rpcInfoKey, rpcInfo)
18+
}
19+
20+
// HandleRPC implements the stats.Hanlder interface.
21+
func (h *clientStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
22+
v, ok := ctx.Value(&rpcInfoKey).(*rpcInfo)
23+
if !ok {
24+
return
25+
}
26+
monitor := newClientReporterForStatsHanlder(v.startTime, h.clientMetrics, v.fullMethodName)
27+
switch s := s.(type) {
28+
case *stats.Begin:
29+
v.startTime = s.BeginTime
30+
monitor.StartedConn()
31+
case *stats.End:
32+
monitor.Handled(status.Code(s.Error))
33+
case *stats.InHeader:
34+
monitor.ReceivedMessageSize(Header, float64(s.WireLength))
35+
case *stats.InPayload:
36+
// TODO: remove the +5 offset on wire length here, which is a temporary stand-in for the missing grpc framing offset
37+
// See: https://github.com/grpc/grpc-go/issues/1647
38+
// TODO(tonywang): response latency (seconds) of the gRPC single message received
39+
monitor.ReceivedMessageSize(Payload, float64(s.WireLength+5))
40+
case *stats.InTrailer:
41+
monitor.ReceivedMessageSize(Tailer, float64(s.WireLength))
42+
case *stats.OutHeader:
43+
// TODO: Add the sent header message size stats, if the wire length of the send header is provided
44+
case *stats.OutPayload:
45+
// TODO(tonywang): response latency (seconds) of the gRPC single message send
46+
monitor.SentMessageSize(Payload, float64(s.WireLength))
47+
case *stats.OutTrailer:
48+
monitor.SentMessageSize(Tailer, float64(s.WireLength))
49+
}
50+
}
51+
52+
// TagConn implements the stats.Hanlder interface.
53+
func (h *clientStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
54+
return ctx
55+
}
56+
57+
// HandleConn implements the stats.Hanlder interface.
58+
func (h *clientStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
59+
}

0 commit comments

Comments
 (0)