Skip to content

Commit 4c540e0

Browse files
authored
otelgrpc: Add metrics support to NewServerHandler and NewClientHandler (#4356)
1 parent 23181f7 commit 4c540e0

File tree

8 files changed

+863
-53
lines changed

8 files changed

+863
-53
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1717
- Add metrics support (No-op, OTLP and Prometheus) to `go.opentelemetry.io/contrib/exporters/autoexport`. (#4229, #4479)
1818
- Add support for `console` span exporter and metrics exporter in `go.opentelemetry.io/contrib/exporters/autoexport`. (#4486)
1919
- Set unit and description on all instruments in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#4500)
20+
- Add metric support for `grpc.StatsHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4356)
2021

2122
### Changed
2223

instrumentation/google.golang.org/grpc/otelgrpc/config.go

+44-4
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,14 @@ type config struct {
4646
ReceivedEvent bool
4747
SentEvent bool
4848

49-
meter metric.Meter
50-
rpcServerDuration metric.Int64Histogram
49+
tracer trace.Tracer
50+
meter metric.Meter
51+
52+
rpcDuration metric.Float64Histogram
53+
rpcRequestSize metric.Int64Histogram
54+
rpcResponseSize metric.Int64Histogram
55+
rpcRequestsPerRPC metric.Int64Histogram
56+
rpcResponsesPerRPC metric.Int64Histogram
5157
}
5258

5359
// Option applies an option value for a config.
@@ -56,7 +62,7 @@ type Option interface {
5662
}
5763

5864
// newConfig returns a config configured with all the passed Options.
59-
func newConfig(opts []Option) *config {
65+
func newConfig(opts []Option, role string) *config {
6066
c := &config{
6167
Propagators: otel.GetTextMapPropagator(),
6268
TracerProvider: otel.GetTracerProvider(),
@@ -66,19 +72,53 @@ func newConfig(opts []Option) *config {
6672
o.apply(c)
6773
}
6874

75+
c.tracer = c.TracerProvider.Tracer(
76+
instrumentationName,
77+
trace.WithInstrumentationVersion(SemVersion()),
78+
)
79+
6980
c.meter = c.MeterProvider.Meter(
7081
instrumentationName,
7182
metric.WithInstrumentationVersion(Version()),
7283
metric.WithSchemaURL(semconv.SchemaURL),
7384
)
85+
7486
var err error
75-
c.rpcServerDuration, err = c.meter.Int64Histogram("rpc.server.duration",
87+
c.rpcDuration, err = c.meter.Float64Histogram("rpc."+role+".duration",
7688
metric.WithDescription("Measures the duration of inbound RPC."),
7789
metric.WithUnit("ms"))
7890
if err != nil {
7991
otel.Handle(err)
8092
}
8193

94+
c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
95+
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
96+
metric.WithUnit("By"))
97+
if err != nil {
98+
otel.Handle(err)
99+
}
100+
101+
c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
102+
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
103+
metric.WithUnit("By"))
104+
if err != nil {
105+
otel.Handle(err)
106+
}
107+
108+
c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
109+
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
110+
metric.WithUnit("{count}"))
111+
if err != nil {
112+
otel.Handle(err)
113+
}
114+
115+
c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
116+
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
117+
metric.WithUnit("{count}"))
118+
if err != nil {
119+
otel.Handle(err)
120+
}
121+
82122
return c
83123
}
84124

instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ var (
6161
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
6262
// for use in a grpc.Dial call.
6363
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
64-
cfg := newConfig(opts)
64+
cfg := newConfig(opts, "client")
6565
tracer := cfg.TracerProvider.Tracer(
6666
instrumentationName,
6767
trace.WithInstrumentationVersion(Version()),
@@ -255,7 +255,7 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
255255
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
256256
// for use in a grpc.Dial call.
257257
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
258-
cfg := newConfig(opts)
258+
cfg := newConfig(opts, "client")
259259
tracer := cfg.TracerProvider.Tracer(
260260
instrumentationName,
261261
trace.WithInstrumentationVersion(Version()),
@@ -325,7 +325,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
325325
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
326326
// for use in a grpc.NewServer call.
327327
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
328-
cfg := newConfig(opts)
328+
cfg := newConfig(opts, "server")
329329
tracer := cfg.TracerProvider.Tracer(
330330
instrumentationName,
331331
trace.WithInstrumentationVersion(Version()),
@@ -387,7 +387,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
387387

388388
elapsedTime := time.Since(before).Milliseconds()
389389
attr = append(attr, grpcStatusCodeAttr)
390-
cfg.rpcServerDuration.Record(ctx, elapsedTime, metric.WithAttributes(attr...))
390+
cfg.rpcDuration.Record(ctx, float64(elapsedTime), metric.WithAttributes(attr...))
391391

392392
return resp, err
393393
}
@@ -446,7 +446,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *s
446446
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
447447
// for use in a grpc.NewServer call.
448448
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
449-
cfg := newConfig(opts)
449+
cfg := newConfig(opts, "server")
450450
tracer := cfg.TracerProvider.Tracer(
451451
instrumentationName,
452452
trace.WithInstrumentationVersion(Version()),

instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (s *metadataSupplier) Keys() []string {
5656
// requests.
5757
// Deprecated: Unnecessary public func.
5858
func Inject(ctx context.Context, md *metadata.MD, opts ...Option) {
59-
c := newConfig(opts)
59+
c := newConfig(opts, "")
6060
c.Propagators.Inject(ctx, &metadataSupplier{
6161
metadata: md,
6262
})
@@ -78,7 +78,7 @@ func inject(ctx context.Context, propagators propagation.TextMapPropagator) cont
7878
// This function is meant to be used on incoming requests.
7979
// Deprecated: Unnecessary public func.
8080
func Extract(ctx context.Context, md *metadata.MD, opts ...Option) (baggage.Baggage, trace.SpanContext) {
81-
c := newConfig(opts)
81+
c := newConfig(opts, "")
8282
ctx = c.Propagators.Extract(ctx, &metadataSupplier{
8383
metadata: md,
8484
})

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

+80-36
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@ package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.g
1717
import (
1818
"context"
1919
"sync/atomic"
20+
"time"
2021

2122
grpc_codes "google.golang.org/grpc/codes"
2223
"google.golang.org/grpc/stats"
2324
"google.golang.org/grpc/status"
2425

2526
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
27+
"go.opentelemetry.io/otel/attribute"
2628
"go.opentelemetry.io/otel/codes"
29+
"go.opentelemetry.io/otel/metric"
2730
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
2831
"go.opentelemetry.io/otel/trace"
2932
)
@@ -33,24 +36,32 @@ type gRPCContextKey struct{}
3336
type gRPCContext struct {
3437
messagesReceived int64
3538
messagesSent int64
39+
metricAttrs []attribute.KeyValue
40+
}
41+
42+
type serverHandler struct {
43+
*config
3644
}
3745

3846
// NewServerHandler creates a stats.Handler for gRPC server.
3947
func NewServerHandler(opts ...Option) stats.Handler {
4048
h := &serverHandler{
41-
config: newConfig(opts),
49+
config: newConfig(opts, "server"),
4250
}
4351

44-
h.tracer = h.config.TracerProvider.Tracer(
45-
instrumentationName,
46-
trace.WithInstrumentationVersion(SemVersion()),
47-
)
4852
return h
4953
}
5054

51-
type serverHandler struct {
52-
*config
53-
tracer trace.Tracer
55+
// TagConn can attach some information to the given context.
56+
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
57+
span := trace.SpanFromContext(ctx)
58+
attrs := peerAttr(peerFromCtx(ctx))
59+
span.SetAttributes(attrs...)
60+
return ctx
61+
}
62+
63+
// HandleConn processes the Conn stats.
64+
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
5465
}
5566

5667
// TagRPC can attach some information to the given context.
@@ -66,46 +77,30 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
6677
trace.WithAttributes(attrs...),
6778
)
6879

69-
gctx := gRPCContext{}
80+
gctx := gRPCContext{
81+
metricAttrs: attrs,
82+
}
7083
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
7184
}
7285

7386
// HandleRPC processes the RPC stats.
7487
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
75-
handleRPC(ctx, rs)
76-
}
77-
78-
// TagConn can attach some information to the given context.
79-
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
80-
span := trace.SpanFromContext(ctx)
81-
attrs := peerAttr(peerFromCtx(ctx))
82-
span.SetAttributes(attrs...)
83-
return ctx
88+
h.handleRPC(ctx, rs)
8489
}
8590

86-
// HandleConn processes the Conn stats.
87-
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
91+
type clientHandler struct {
92+
*config
8893
}
8994

9095
// NewClientHandler creates a stats.Handler for gRPC client.
9196
func NewClientHandler(opts ...Option) stats.Handler {
9297
h := &clientHandler{
93-
config: newConfig(opts),
98+
config: newConfig(opts, "client"),
9499
}
95100

96-
h.tracer = h.config.TracerProvider.Tracer(
97-
instrumentationName,
98-
trace.WithInstrumentationVersion(SemVersion()),
99-
)
100-
101101
return h
102102
}
103103

104-
type clientHandler struct {
105-
*config
106-
tracer trace.Tracer
107-
}
108-
109104
// TagRPC can attach some information to the given context.
110105
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
111106
name, attrs := internal.ParseFullMethod(info.FullMethodName)
@@ -117,14 +112,16 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
117112
trace.WithAttributes(attrs...),
118113
)
119114

120-
gctx := gRPCContext{}
115+
gctx := gRPCContext{
116+
metricAttrs: attrs,
117+
}
121118

122119
return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
123120
}
124121

125122
// HandleRPC processes the RPC stats.
126123
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
127-
handleRPC(ctx, rs)
124+
h.handleRPC(ctx, rs)
128125
}
129126

130127
// TagConn can attach some information to the given context.
@@ -140,17 +137,22 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
140137
// no-op
141138
}
142139

143-
func handleRPC(ctx context.Context, rs stats.RPCStats) {
140+
func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) {
144141
span := trace.SpanFromContext(ctx)
145142
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
146143
var messageId int64
144+
metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
145+
metricAttrs = append(metricAttrs, gctx.metricAttrs...)
146+
wctx := withoutCancel(ctx)
147147

148148
switch rs := rs.(type) {
149149
case *stats.Begin:
150150
case *stats.InPayload:
151151
if gctx != nil {
152152
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
153+
c.rpcRequestSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
153154
}
155+
154156
span.AddEvent("message",
155157
trace.WithAttributes(
156158
semconv.MessageTypeReceived,
@@ -162,6 +164,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) {
162164
case *stats.OutPayload:
163165
if gctx != nil {
164166
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
167+
c.rpcResponseSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
165168
}
166169

167170
span.AddEvent("message",
@@ -172,16 +175,57 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) {
172175
semconv.MessageUncompressedSizeKey.Int(rs.Length),
173176
),
174177
)
178+
case *stats.OutTrailer:
175179
case *stats.End:
180+
var rpcStatusAttr attribute.KeyValue
181+
176182
if rs.Error != nil {
177183
s, _ := status.FromError(rs.Error)
178184
span.SetStatus(codes.Error, s.Message())
179-
span.SetAttributes(statusCodeAttr(s.Code()))
185+
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))
180186
} else {
181-
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
187+
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
182188
}
189+
span.SetAttributes(rpcStatusAttr)
183190
span.End()
191+
192+
metricAttrs = append(metricAttrs, rpcStatusAttr)
193+
c.rpcDuration.Record(wctx, float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...))
194+
c.rpcRequestsPerRPC.Record(wctx, gctx.messagesReceived, metric.WithAttributes(metricAttrs...))
195+
c.rpcResponsesPerRPC.Record(wctx, gctx.messagesSent, metric.WithAttributes(metricAttrs...))
196+
184197
default:
185198
return
186199
}
187200
}
201+
202+
func withoutCancel(parent context.Context) context.Context {
203+
if parent == nil {
204+
panic("cannot create context from nil parent")
205+
}
206+
return withoutCancelCtx{parent}
207+
}
208+
209+
type withoutCancelCtx struct {
210+
c context.Context
211+
}
212+
213+
func (withoutCancelCtx) Deadline() (deadline time.Time, ok bool) {
214+
return
215+
}
216+
217+
func (withoutCancelCtx) Done() <-chan struct{} {
218+
return nil
219+
}
220+
221+
func (withoutCancelCtx) Err() error {
222+
return nil
223+
}
224+
225+
func (w withoutCancelCtx) Value(key any) any {
226+
return w.c.Value(key)
227+
}
228+
229+
func (w withoutCancelCtx) String() string {
230+
return "withoutCancel"
231+
}

0 commit comments

Comments
 (0)