From 36b7f3e1f8ca2912fba14c8ff936e220d5d75048 Mon Sep 17 00:00:00 2001 From: Todd Date: Wed, 13 Apr 2022 10:27:26 -0700 Subject: [PATCH] Update the cluster server metrics to use the gRPC status.Handler. (#2000) gRPC status.Handler gives us visibility into gRPC returned errors. In contrast, there are some situations where no metric would be counted at all using the interceptor approach (for example, in out of memory cases where gRPC would return a resource exhausted error). --- .../controller/internal/metric/cluster.go | 124 +++------ .../internal/metric/cluster_test.go | 257 +++++++----------- internal/servers/controller/listeners.go | 2 +- 3 files changed, 135 insertions(+), 248 deletions(-) diff --git a/internal/servers/controller/internal/metric/cluster.go b/internal/servers/controller/internal/metric/cluster.go index 4d82bcb0e2..1f73091e56 100644 --- a/internal/servers/controller/internal/metric/cluster.go +++ b/internal/servers/controller/internal/metric/cluster.go @@ -3,16 +3,14 @@ package metric import ( "context" "strings" - "time" "github.com/hashicorp/boundary/globals" "github.com/hashicorp/boundary/internal/errors" - "github.com/hashicorp/boundary/internal/observability/event" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/stats" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" ) const ( @@ -23,9 +21,6 @@ const ( ) var ( - // 100 bytes, 1kb, 10kb, 100kb, 1mb, 10mb - gRpcMsgSizeBuckets = prometheus.ExponentialBuckets(100, 10, 6) - // gRpcRequestLatency collects measurements of how long it takes // the boundary system to reply to a request to the controller cluster // from the time that boundary received the request. @@ -39,32 +34,6 @@ var ( }, []string{labelGRpcCode, labelGRpcService, labelGRpcMethod}, ) - - // gRpcRequestSize collections measurements of how large each request - // to the boundary controller cluster is. - gRpcRequestSize prometheus.ObserverVec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: globals.MetricNamespace, - Subsystem: clusterSubSystem, - Name: "grpc_request_size_bytes", - Help: "Histogram of request sizes for gRPC requests.", - Buckets: gRpcMsgSizeBuckets, - }, - []string{labelGRpcCode, labelGRpcService, labelGRpcMethod}, - ) - - // gRpcResponseSize collections measurements of how large each response - // from the boundary controller cluster is. - gRpcResponseSize prometheus.ObserverVec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: globals.MetricNamespace, - Subsystem: clusterSubSystem, - Name: "grpc_response_size_bytes", - Help: "Histogram of response sizes for gRPC responses.", - Buckets: gRpcMsgSizeBuckets, - }, - []string{labelGRpcCode, labelGRpcService, labelGRpcMethod}, - ) ) // statusFromError retrieves the *status.Status from the provided error. It'll @@ -94,71 +63,50 @@ func splitMethodName(fullMethodName string) (string, string) { return "unknown", "unknown" } -type requestRecorder struct { - labels prometheus.Labels +type metricMethodNameContextKey struct{} + +type statsHandler struct{} - // measurements - reqSize *int - start time.Time +func (sh statsHandler) TagRPC(ctx context.Context, i *stats.RPCTagInfo) context.Context { + return context.WithValue(ctx, metricMethodNameContextKey{}, i.FullMethodName) +} +func (sh statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} +func (sh statsHandler) HandleConn(context.Context, stats.ConnStats) { } -func newRequestRecorder(ctx context.Context, req interface{}, fullMethodName string) requestRecorder { - const op = "metric.newRequestRecorder" - service, method := splitMethodName(fullMethodName) - r := requestRecorder{ - labels: prometheus.Labels{ +func (sh statsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) { + switch v := s.(type) { + case *stats.End: + // Accept the ok, but ignore it. This code doesn't need to panic + // and if "fullName" is an empty string splitMethodName will + // set service and method to "unknown". + fullName, _ := ctx.Value(metricMethodNameContextKey{}).(string) + service, method := splitMethodName(fullName) + l := prometheus.Labels{ labelGRpcMethod: method, labelGRpcService: service, - }, - start: time.Now(), - } - - reqProto, ok := req.(proto.Message) - switch { - case ok: - reqSize := proto.Size(reqProto) - r.reqSize = &reqSize - default: - event.WriteError(ctx, op, errors.New(ctx, errors.Internal, op, "unable to cast to proto.Message")) + labelGRpcCode: statusFromError(v.Error).Code().String(), + } + gRpcRequestLatency.With(l).Observe(v.EndTime.Sub(v.BeginTime).Seconds()) } - return r } -func (r requestRecorder) record(ctx context.Context, resp interface{}, err error) { - const op = "metric.(requestRecorder).record" - st := statusFromError(err) - r.labels[labelGRpcCode] = st.Code().String() - - gRpcRequestLatency.With(r.labels).Observe(time.Since(r.start).Seconds()) - - if r.reqSize != nil { - gRpcRequestSize.With(r.labels).Observe(float64(*r.reqSize)) - } - - if respProto, ok := resp.(proto.Message); ok { - respSize := proto.Size(respProto) - gRpcResponseSize.With(r.labels).Observe(float64(respSize)) - } else { - event.WriteError(ctx, op, errors.New(ctx, errors.Internal, op, "unable to cast to proto.Message")) - } -} +var allCodes = []codes.Code{ + codes.OK, codes.InvalidArgument, codes.PermissionDenied, + codes.FailedPrecondition, -// InstrumentClusterInterceptor wraps a UnaryServerInterceptor and records -// observations for the collectors associated with the cluster's grpc service. -func InstrumentClusterInterceptor() grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - recorder := newRequestRecorder(ctx, req, info.FullMethod) - resp, err := handler(ctx, req) - recorder.record(ctx, resp, err) - return resp, err - } + // Codes which can be generated by the gRPC framework + codes.Canceled, codes.Unknown, codes.DeadlineExceeded, + codes.ResourceExhausted, codes.Unimplemented, codes.Internal, + codes.Unavailable, codes.Unauthenticated, } -var allCodes = []codes.Code{ - codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound, - codes.AlreadyExists, codes.PermissionDenied, codes.Unauthenticated, codes.ResourceExhausted, - codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal, - codes.Unavailable, codes.DataLoss, +// InstrumentClusterStatsHandler returns a gRPC stats.Handler which observes +// cluster specific metrics. Use with the cluster gRPC server. +func InstrumentClusterStatsHandler() statsHandler { + return statsHandler{} } // InitializeClusterCollectors registers the cluster metrics to the default @@ -168,7 +116,7 @@ func InitializeClusterCollectors(r prometheus.Registerer, server *grpc.Server) { if r == nil { return } - r.MustRegister(gRpcRequestLatency, gRpcRequestSize, gRpcResponseSize) + r.MustRegister(gRpcRequestLatency) for serviceName, info := range server.GetServiceInfo() { for _, mInfo := range info.Methods { @@ -179,8 +127,6 @@ func InitializeClusterCollectors(r prometheus.Registerer, server *grpc.Server) { labelGRpcCode: c.String(), } gRpcRequestLatency.With(l) - gRpcRequestSize.With(l) - gRpcResponseSize.With(l) } } } diff --git a/internal/servers/controller/internal/metric/cluster_test.go b/internal/servers/controller/internal/metric/cluster_test.go index 853ea6bdf2..292a3fcb35 100644 --- a/internal/servers/controller/internal/metric/cluster_test.go +++ b/internal/servers/controller/internal/metric/cluster_test.go @@ -2,227 +2,168 @@ package metric import ( "context" + "fmt" "testing" "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" + "google.golang.org/grpc/stats" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/wrapperspb" ) -// testableObserverVec allows us to assert which observations are being made -// with which labels. -type testableObserverVec struct { - observations []*testableObserver - prometheus.ObserverVec -} - -func (v *testableObserverVec) With(l prometheus.Labels) prometheus.Observer { - ret := &testableObserver{labels: l} - v.observations = append(v.observations, ret) - return ret -} - -type testableObserver struct { - labels prometheus.Labels - observation float64 -} - -func (o *testableObserver) Observe(f float64) { - o.observation = f -} - -func floatPtr(i int) *float64 { - f := float64(i) - return &f -} - -func TestRecorder(t *testing.T) { +func TestStatsHandler(t *testing.T) { bkpLatency := gRpcRequestLatency - bkpRespSize := gRpcResponseSize - bkpReqSize := gRpcRequestSize defer func() { gRpcRequestLatency = bkpLatency - gRpcResponseSize = bkpRespSize - gRpcRequestSize = bkpReqSize }() - ctx := context.Background() + + handler := InstrumentClusterStatsHandler() cases := []struct { name string - methodName string - req interface{} - resp interface{} - err error + stats []stats.RPCStats + fullMethodName string wantedLabels prometheus.Labels - wantedReqSize *float64 - wantedRespSize *float64 + wantedLatency float64 }{ { - name: "basic", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: nil, - wantedLabels: map[string]string{ - labelGRpcCode: "OK", - labelGRpcMethod: "method", - labelGRpcService: "some.service.path", + name: "basic", + fullMethodName: "/some.service.path/method", + stats: []stats.RPCStats{ + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + }, }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), - }, - { - name: "empty request", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes(nil), - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: nil, wantedLabels: map[string]string{ labelGRpcCode: "OK", labelGRpcMethod: "method", labelGRpcService: "some.service.path", }, - wantedReqSize: floatPtr(0), - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), + wantedLatency: (4 * time.Second).Seconds(), }, { - name: "empty response", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: wrapperspb.Bytes(nil), - err: nil, + name: "ignored stats", + fullMethodName: "/some.service.path/method", + stats: []stats.RPCStats{ + &stats.Begin{ + BeginTime: time.Time{}, + IsTransparentRetryAttempt: true, + }, + &stats.InPayload{ + Length: 5, + WireLength: 15, + RecvTime: time.Time{}.Add(time.Second).Add(500 * time.Millisecond), + }, + &stats.OutPayload{ + Length: 5, + WireLength: 15, + SentTime: time.Time{}.Add(2 * time.Second), + }, + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + }, + }, wantedLabels: map[string]string{ labelGRpcCode: "OK", labelGRpcMethod: "method", labelGRpcService: "some.service.path", }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(0), + wantedLatency: (4 * time.Second).Seconds(), }, { - name: "unrecognized method path format", - methodName: "unrecognized", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: nil, + name: "bad method name", + fullMethodName: "", + stats: []stats.RPCStats{ + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + }, + }, wantedLabels: map[string]string{ labelGRpcCode: "OK", labelGRpcMethod: "unknown", labelGRpcService: "unknown", }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), + wantedLatency: (4 * time.Second).Seconds(), }, { - name: "cancel error", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: (*wrapperspb.BytesValue)(nil), - err: status.Error(codes.Canceled, ""), - wantedLabels: map[string]string{ - labelGRpcCode: "Canceled", - labelGRpcMethod: "method", - labelGRpcService: "some.service.path", + name: "error code", + fullMethodName: "/some.service.path/method", + stats: []stats.RPCStats{ + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + Error: status.Error(codes.Canceled, "test"), + }, }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(0), - }, - { - name: "permission error", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: (*wrapperspb.BytesValue)(nil), - err: status.Error(codes.PermissionDenied, ""), wantedLabels: map[string]string{ - labelGRpcCode: "PermissionDenied", + labelGRpcCode: "Canceled", labelGRpcMethod: "method", labelGRpcService: "some.service.path", }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(0), + wantedLatency: (4 * time.Second).Seconds(), }, { - name: "error and response", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: status.Error(codes.PermissionDenied, ""), - wantedLabels: map[string]string{ - labelGRpcCode: "PermissionDenied", - labelGRpcMethod: "method", - labelGRpcService: "some.service.path", + name: "wrapped error", + fullMethodName: "/some.service.path/method", + stats: []stats.RPCStats{ + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + Error: fmt.Errorf("%w", status.Error(codes.InvalidArgument, "test")), + }, }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), - }, - { - name: "bad request", - methodName: "/some.service.path/method", - req: "foo", - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: status.Error(codes.PermissionDenied, ""), wantedLabels: map[string]string{ - labelGRpcCode: "PermissionDenied", + labelGRpcCode: "InvalidArgument", labelGRpcMethod: "method", labelGRpcService: "some.service.path", }, - wantedReqSize: nil, - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), - }, - { - name: "bad response", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - resp: "foo", - err: status.Error(codes.PermissionDenied, ""), - wantedLabels: map[string]string{ - labelGRpcCode: "PermissionDenied", - labelGRpcMethod: "method", - labelGRpcService: "some.service.path", - }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), - wantedRespSize: nil, + wantedLatency: (4 * time.Second).Seconds(), }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { testableLatency := &testableObserverVec{} - testableReqSize := &testableObserverVec{} - testableRespSize := &testableObserverVec{} gRpcRequestLatency = testableLatency - gRpcResponseSize = testableRespSize - gRpcRequestSize = testableReqSize - // record something - start := time.Now() - tested := newRequestRecorder(ctx, tc.req, tc.methodName) - tested.record(ctx, tc.resp, tc.err) + ctx := context.Background() + ctx = handler.TagRPC(ctx, &stats.RPCTagInfo{ + FullMethodName: tc.fullMethodName, + }) - require.Len(t, testableLatency.observations, 1) - assert.LessOrEqual(t, testableLatency.observations[0].observation, time.Since(start).Seconds()) - assert.Greater(t, testableLatency.observations[0].observation, float64(0)) - assert.Equal(t, testableLatency.observations[0].labels, tc.wantedLabels) - - if tc.wantedReqSize == nil { - require.Len(t, testableReqSize.observations, 0) - } else { - require.Len(t, testableReqSize.observations, 1) - assert.Equal(t, testableReqSize.observations[0], - &testableObserver{observation: *tc.wantedReqSize, labels: tc.wantedLabels}) + for _, i := range tc.stats { + handler.HandleRPC(ctx, i) } - if tc.wantedRespSize == nil { - require.Len(t, testableRespSize.observations, 0) - } else { - require.Len(t, testableRespSize.observations, 1) - assert.Equal(t, testableRespSize.observations[0], - &testableObserver{observation: *tc.wantedRespSize, labels: tc.wantedLabels}) - } + assert.Len(t, testableLatency.observations, 1) + assert.Equal(t, testableLatency.observations[0].observation, tc.wantedLatency) + assert.Equal(t, testableLatency.observations[0].labels, tc.wantedLabels) }) } } + +// testableObserverVec allows us to assert which observations are being made +// with which labels. +type testableObserverVec struct { + observations []*testableObserver + prometheus.ObserverVec +} + +func (v *testableObserverVec) With(l prometheus.Labels) prometheus.Observer { + ret := &testableObserver{labels: l} + v.observations = append(v.observations, ret) + return ret +} + +type testableObserver struct { + labels prometheus.Labels + observation float64 +} + +func (o *testableObserver) Observe(f float64) { + o.observation = f +} diff --git a/internal/servers/controller/listeners.go b/internal/servers/controller/listeners.go index cc9543c3f0..aa9df9ed6b 100644 --- a/internal/servers/controller/listeners.go +++ b/internal/servers/controller/listeners.go @@ -113,11 +113,11 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error } workerServer := grpc.NewServer( + grpc.StatsHandler(metric.InstrumentClusterStatsHandler()), grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( grpc_middleware.ChainUnaryServer( - metric.InstrumentClusterInterceptor(), workerReqInterceptor, auditRequestInterceptor(c.baseContext), // before we get started, audit the request auditResponseInterceptor(c.baseContext), // as we finish, audit the response