diff --git a/internal/servers/controller/internal/metric/cluster.go b/internal/servers/controller/internal/metric/cluster.go index 4d82bcb0e2..1f73091e56 100644 --- a/internal/servers/controller/internal/metric/cluster.go +++ b/internal/servers/controller/internal/metric/cluster.go @@ -3,16 +3,14 @@ package metric import ( "context" "strings" - "time" "github.com/hashicorp/boundary/globals" "github.com/hashicorp/boundary/internal/errors" - "github.com/hashicorp/boundary/internal/observability/event" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/stats" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" ) const ( @@ -23,9 +21,6 @@ const ( ) var ( - // 100 bytes, 1kb, 10kb, 100kb, 1mb, 10mb - gRpcMsgSizeBuckets = prometheus.ExponentialBuckets(100, 10, 6) - // gRpcRequestLatency collects measurements of how long it takes // the boundary system to reply to a request to the controller cluster // from the time that boundary received the request. @@ -39,32 +34,6 @@ var ( }, []string{labelGRpcCode, labelGRpcService, labelGRpcMethod}, ) - - // gRpcRequestSize collections measurements of how large each request - // to the boundary controller cluster is. - gRpcRequestSize prometheus.ObserverVec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: globals.MetricNamespace, - Subsystem: clusterSubSystem, - Name: "grpc_request_size_bytes", - Help: "Histogram of request sizes for gRPC requests.", - Buckets: gRpcMsgSizeBuckets, - }, - []string{labelGRpcCode, labelGRpcService, labelGRpcMethod}, - ) - - // gRpcResponseSize collections measurements of how large each response - // from the boundary controller cluster is. - gRpcResponseSize prometheus.ObserverVec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: globals.MetricNamespace, - Subsystem: clusterSubSystem, - Name: "grpc_response_size_bytes", - Help: "Histogram of response sizes for gRPC responses.", - Buckets: gRpcMsgSizeBuckets, - }, - []string{labelGRpcCode, labelGRpcService, labelGRpcMethod}, - ) ) // statusFromError retrieves the *status.Status from the provided error. It'll @@ -94,71 +63,50 @@ func splitMethodName(fullMethodName string) (string, string) { return "unknown", "unknown" } -type requestRecorder struct { - labels prometheus.Labels +type metricMethodNameContextKey struct{} + +type statsHandler struct{} - // measurements - reqSize *int - start time.Time +func (sh statsHandler) TagRPC(ctx context.Context, i *stats.RPCTagInfo) context.Context { + return context.WithValue(ctx, metricMethodNameContextKey{}, i.FullMethodName) +} +func (sh statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} +func (sh statsHandler) HandleConn(context.Context, stats.ConnStats) { } -func newRequestRecorder(ctx context.Context, req interface{}, fullMethodName string) requestRecorder { - const op = "metric.newRequestRecorder" - service, method := splitMethodName(fullMethodName) - r := requestRecorder{ - labels: prometheus.Labels{ +func (sh statsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) { + switch v := s.(type) { + case *stats.End: + // Accept the ok, but ignore it. This code doesn't need to panic + // and if "fullName" is an empty string splitMethodName will + // set service and method to "unknown". + fullName, _ := ctx.Value(metricMethodNameContextKey{}).(string) + service, method := splitMethodName(fullName) + l := prometheus.Labels{ labelGRpcMethod: method, labelGRpcService: service, - }, - start: time.Now(), - } - - reqProto, ok := req.(proto.Message) - switch { - case ok: - reqSize := proto.Size(reqProto) - r.reqSize = &reqSize - default: - event.WriteError(ctx, op, errors.New(ctx, errors.Internal, op, "unable to cast to proto.Message")) + labelGRpcCode: statusFromError(v.Error).Code().String(), + } + gRpcRequestLatency.With(l).Observe(v.EndTime.Sub(v.BeginTime).Seconds()) } - return r } -func (r requestRecorder) record(ctx context.Context, resp interface{}, err error) { - const op = "metric.(requestRecorder).record" - st := statusFromError(err) - r.labels[labelGRpcCode] = st.Code().String() - - gRpcRequestLatency.With(r.labels).Observe(time.Since(r.start).Seconds()) - - if r.reqSize != nil { - gRpcRequestSize.With(r.labels).Observe(float64(*r.reqSize)) - } - - if respProto, ok := resp.(proto.Message); ok { - respSize := proto.Size(respProto) - gRpcResponseSize.With(r.labels).Observe(float64(respSize)) - } else { - event.WriteError(ctx, op, errors.New(ctx, errors.Internal, op, "unable to cast to proto.Message")) - } -} +var allCodes = []codes.Code{ + codes.OK, codes.InvalidArgument, codes.PermissionDenied, + codes.FailedPrecondition, -// InstrumentClusterInterceptor wraps a UnaryServerInterceptor and records -// observations for the collectors associated with the cluster's grpc service. -func InstrumentClusterInterceptor() grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - recorder := newRequestRecorder(ctx, req, info.FullMethod) - resp, err := handler(ctx, req) - recorder.record(ctx, resp, err) - return resp, err - } + // Codes which can be generated by the gRPC framework + codes.Canceled, codes.Unknown, codes.DeadlineExceeded, + codes.ResourceExhausted, codes.Unimplemented, codes.Internal, + codes.Unavailable, codes.Unauthenticated, } -var allCodes = []codes.Code{ - codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound, - codes.AlreadyExists, codes.PermissionDenied, codes.Unauthenticated, codes.ResourceExhausted, - codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal, - codes.Unavailable, codes.DataLoss, +// InstrumentClusterStatsHandler returns a gRPC stats.Handler which observes +// cluster specific metrics. Use with the cluster gRPC server. +func InstrumentClusterStatsHandler() statsHandler { + return statsHandler{} } // InitializeClusterCollectors registers the cluster metrics to the default @@ -168,7 +116,7 @@ func InitializeClusterCollectors(r prometheus.Registerer, server *grpc.Server) { if r == nil { return } - r.MustRegister(gRpcRequestLatency, gRpcRequestSize, gRpcResponseSize) + r.MustRegister(gRpcRequestLatency) for serviceName, info := range server.GetServiceInfo() { for _, mInfo := range info.Methods { @@ -179,8 +127,6 @@ func InitializeClusterCollectors(r prometheus.Registerer, server *grpc.Server) { labelGRpcCode: c.String(), } gRpcRequestLatency.With(l) - gRpcRequestSize.With(l) - gRpcResponseSize.With(l) } } } diff --git a/internal/servers/controller/internal/metric/cluster_test.go b/internal/servers/controller/internal/metric/cluster_test.go index 853ea6bdf2..292a3fcb35 100644 --- a/internal/servers/controller/internal/metric/cluster_test.go +++ b/internal/servers/controller/internal/metric/cluster_test.go @@ -2,227 +2,168 @@ package metric import ( "context" + "fmt" "testing" "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" + "google.golang.org/grpc/stats" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/wrapperspb" ) -// testableObserverVec allows us to assert which observations are being made -// with which labels. -type testableObserverVec struct { - observations []*testableObserver - prometheus.ObserverVec -} - -func (v *testableObserverVec) With(l prometheus.Labels) prometheus.Observer { - ret := &testableObserver{labels: l} - v.observations = append(v.observations, ret) - return ret -} - -type testableObserver struct { - labels prometheus.Labels - observation float64 -} - -func (o *testableObserver) Observe(f float64) { - o.observation = f -} - -func floatPtr(i int) *float64 { - f := float64(i) - return &f -} - -func TestRecorder(t *testing.T) { +func TestStatsHandler(t *testing.T) { bkpLatency := gRpcRequestLatency - bkpRespSize := gRpcResponseSize - bkpReqSize := gRpcRequestSize defer func() { gRpcRequestLatency = bkpLatency - gRpcResponseSize = bkpRespSize - gRpcRequestSize = bkpReqSize }() - ctx := context.Background() + + handler := InstrumentClusterStatsHandler() cases := []struct { name string - methodName string - req interface{} - resp interface{} - err error + stats []stats.RPCStats + fullMethodName string wantedLabels prometheus.Labels - wantedReqSize *float64 - wantedRespSize *float64 + wantedLatency float64 }{ { - name: "basic", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: nil, - wantedLabels: map[string]string{ - labelGRpcCode: "OK", - labelGRpcMethod: "method", - labelGRpcService: "some.service.path", + name: "basic", + fullMethodName: "/some.service.path/method", + stats: []stats.RPCStats{ + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + }, }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), - }, - { - name: "empty request", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes(nil), - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: nil, wantedLabels: map[string]string{ labelGRpcCode: "OK", labelGRpcMethod: "method", labelGRpcService: "some.service.path", }, - wantedReqSize: floatPtr(0), - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), + wantedLatency: (4 * time.Second).Seconds(), }, { - name: "empty response", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: wrapperspb.Bytes(nil), - err: nil, + name: "ignored stats", + fullMethodName: "/some.service.path/method", + stats: []stats.RPCStats{ + &stats.Begin{ + BeginTime: time.Time{}, + IsTransparentRetryAttempt: true, + }, + &stats.InPayload{ + Length: 5, + WireLength: 15, + RecvTime: time.Time{}.Add(time.Second).Add(500 * time.Millisecond), + }, + &stats.OutPayload{ + Length: 5, + WireLength: 15, + SentTime: time.Time{}.Add(2 * time.Second), + }, + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + }, + }, wantedLabels: map[string]string{ labelGRpcCode: "OK", labelGRpcMethod: "method", labelGRpcService: "some.service.path", }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(0), + wantedLatency: (4 * time.Second).Seconds(), }, { - name: "unrecognized method path format", - methodName: "unrecognized", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: nil, + name: "bad method name", + fullMethodName: "", + stats: []stats.RPCStats{ + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + }, + }, wantedLabels: map[string]string{ labelGRpcCode: "OK", labelGRpcMethod: "unknown", labelGRpcService: "unknown", }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), + wantedLatency: (4 * time.Second).Seconds(), }, { - name: "cancel error", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: (*wrapperspb.BytesValue)(nil), - err: status.Error(codes.Canceled, ""), - wantedLabels: map[string]string{ - labelGRpcCode: "Canceled", - labelGRpcMethod: "method", - labelGRpcService: "some.service.path", + name: "error code", + fullMethodName: "/some.service.path/method", + stats: []stats.RPCStats{ + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + Error: status.Error(codes.Canceled, "test"), + }, }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(0), - }, - { - name: "permission error", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: (*wrapperspb.BytesValue)(nil), - err: status.Error(codes.PermissionDenied, ""), wantedLabels: map[string]string{ - labelGRpcCode: "PermissionDenied", + labelGRpcCode: "Canceled", labelGRpcMethod: "method", labelGRpcService: "some.service.path", }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(0), + wantedLatency: (4 * time.Second).Seconds(), }, { - name: "error and response", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3}), - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: status.Error(codes.PermissionDenied, ""), - wantedLabels: map[string]string{ - labelGRpcCode: "PermissionDenied", - labelGRpcMethod: "method", - labelGRpcService: "some.service.path", + name: "wrapped error", + fullMethodName: "/some.service.path/method", + stats: []stats.RPCStats{ + &stats.End{ + BeginTime: time.Time{}.Add(time.Second), + EndTime: time.Time{}.Add(5 * time.Second), + Error: fmt.Errorf("%w", status.Error(codes.InvalidArgument, "test")), + }, }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))), - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), - }, - { - name: "bad request", - methodName: "/some.service.path/method", - req: "foo", - resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - err: status.Error(codes.PermissionDenied, ""), wantedLabels: map[string]string{ - labelGRpcCode: "PermissionDenied", + labelGRpcCode: "InvalidArgument", labelGRpcMethod: "method", labelGRpcService: "some.service.path", }, - wantedReqSize: nil, - wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), - }, - { - name: "bad response", - methodName: "/some.service.path/method", - req: wrapperspb.Bytes([]byte{1, 2, 3, 4}), - resp: "foo", - err: status.Error(codes.PermissionDenied, ""), - wantedLabels: map[string]string{ - labelGRpcCode: "PermissionDenied", - labelGRpcMethod: "method", - labelGRpcService: "some.service.path", - }, - wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))), - wantedRespSize: nil, + wantedLatency: (4 * time.Second).Seconds(), }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { testableLatency := &testableObserverVec{} - testableReqSize := &testableObserverVec{} - testableRespSize := &testableObserverVec{} gRpcRequestLatency = testableLatency - gRpcResponseSize = testableRespSize - gRpcRequestSize = testableReqSize - // record something - start := time.Now() - tested := newRequestRecorder(ctx, tc.req, tc.methodName) - tested.record(ctx, tc.resp, tc.err) + ctx := context.Background() + ctx = handler.TagRPC(ctx, &stats.RPCTagInfo{ + FullMethodName: tc.fullMethodName, + }) - require.Len(t, testableLatency.observations, 1) - assert.LessOrEqual(t, testableLatency.observations[0].observation, time.Since(start).Seconds()) - assert.Greater(t, testableLatency.observations[0].observation, float64(0)) - assert.Equal(t, testableLatency.observations[0].labels, tc.wantedLabels) - - if tc.wantedReqSize == nil { - require.Len(t, testableReqSize.observations, 0) - } else { - require.Len(t, testableReqSize.observations, 1) - assert.Equal(t, testableReqSize.observations[0], - &testableObserver{observation: *tc.wantedReqSize, labels: tc.wantedLabels}) + for _, i := range tc.stats { + handler.HandleRPC(ctx, i) } - if tc.wantedRespSize == nil { - require.Len(t, testableRespSize.observations, 0) - } else { - require.Len(t, testableRespSize.observations, 1) - assert.Equal(t, testableRespSize.observations[0], - &testableObserver{observation: *tc.wantedRespSize, labels: tc.wantedLabels}) - } + assert.Len(t, testableLatency.observations, 1) + assert.Equal(t, testableLatency.observations[0].observation, tc.wantedLatency) + assert.Equal(t, testableLatency.observations[0].labels, tc.wantedLabels) }) } } + +// testableObserverVec allows us to assert which observations are being made +// with which labels. +type testableObserverVec struct { + observations []*testableObserver + prometheus.ObserverVec +} + +func (v *testableObserverVec) With(l prometheus.Labels) prometheus.Observer { + ret := &testableObserver{labels: l} + v.observations = append(v.observations, ret) + return ret +} + +type testableObserver struct { + labels prometheus.Labels + observation float64 +} + +func (o *testableObserver) Observe(f float64) { + o.observation = f +} diff --git a/internal/servers/controller/listeners.go b/internal/servers/controller/listeners.go index cc9543c3f0..aa9df9ed6b 100644 --- a/internal/servers/controller/listeners.go +++ b/internal/servers/controller/listeners.go @@ -113,11 +113,11 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error } workerServer := grpc.NewServer( + grpc.StatsHandler(metric.InstrumentClusterStatsHandler()), grpc.MaxRecvMsgSize(math.MaxInt32), grpc.MaxSendMsgSize(math.MaxInt32), grpc.UnaryInterceptor( grpc_middleware.ChainUnaryServer( - metric.InstrumentClusterInterceptor(), workerReqInterceptor, auditRequestInterceptor(c.baseContext), // before we get started, audit the request auditResponseInterceptor(c.baseContext), // as we finish, audit the response