Update the cluster server metrics to use the gRPC status.Handler. (#2000)

gRPC status.Handler gives us visibility into gRPC returned errors. In contrast, there are some situations where no metric would be counted at all using the interceptor approach (for example, in out of memory cases where gRPC would return a resource exhausted error).
pull/2009/head
Todd 4 years ago committed by GitHub
parent e3f72bd0a4
commit 36b7f3e1f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -3,16 +3,14 @@ package metric
import (
"context"
"strings"
"time"
"github.com/hashicorp/boundary/globals"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/observability/event"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
const (
@ -23,9 +21,6 @@ const (
)
var (
// 100 bytes, 1kb, 10kb, 100kb, 1mb, 10mb
gRpcMsgSizeBuckets = prometheus.ExponentialBuckets(100, 10, 6)
// gRpcRequestLatency collects measurements of how long it takes
// the boundary system to reply to a request to the controller cluster
// from the time that boundary received the request.
@ -39,32 +34,6 @@ var (
},
[]string{labelGRpcCode, labelGRpcService, labelGRpcMethod},
)
// gRpcRequestSize collections measurements of how large each request
// to the boundary controller cluster is.
gRpcRequestSize prometheus.ObserverVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: globals.MetricNamespace,
Subsystem: clusterSubSystem,
Name: "grpc_request_size_bytes",
Help: "Histogram of request sizes for gRPC requests.",
Buckets: gRpcMsgSizeBuckets,
},
[]string{labelGRpcCode, labelGRpcService, labelGRpcMethod},
)
// gRpcResponseSize collections measurements of how large each response
// from the boundary controller cluster is.
gRpcResponseSize prometheus.ObserverVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: globals.MetricNamespace,
Subsystem: clusterSubSystem,
Name: "grpc_response_size_bytes",
Help: "Histogram of response sizes for gRPC responses.",
Buckets: gRpcMsgSizeBuckets,
},
[]string{labelGRpcCode, labelGRpcService, labelGRpcMethod},
)
)
// statusFromError retrieves the *status.Status from the provided error. It'll
@ -94,71 +63,50 @@ func splitMethodName(fullMethodName string) (string, string) {
return "unknown", "unknown"
}
type requestRecorder struct {
labels prometheus.Labels
type metricMethodNameContextKey struct{}
type statsHandler struct{}
// measurements
reqSize *int
start time.Time
func (sh statsHandler) TagRPC(ctx context.Context, i *stats.RPCTagInfo) context.Context {
return context.WithValue(ctx, metricMethodNameContextKey{}, i.FullMethodName)
}
func (sh statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (sh statsHandler) HandleConn(context.Context, stats.ConnStats) {
}
func newRequestRecorder(ctx context.Context, req interface{}, fullMethodName string) requestRecorder {
const op = "metric.newRequestRecorder"
service, method := splitMethodName(fullMethodName)
r := requestRecorder{
labels: prometheus.Labels{
func (sh statsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
switch v := s.(type) {
case *stats.End:
// Accept the ok, but ignore it. This code doesn't need to panic
// and if "fullName" is an empty string splitMethodName will
// set service and method to "unknown".
fullName, _ := ctx.Value(metricMethodNameContextKey{}).(string)
service, method := splitMethodName(fullName)
l := prometheus.Labels{
labelGRpcMethod: method,
labelGRpcService: service,
},
start: time.Now(),
}
reqProto, ok := req.(proto.Message)
switch {
case ok:
reqSize := proto.Size(reqProto)
r.reqSize = &reqSize
default:
event.WriteError(ctx, op, errors.New(ctx, errors.Internal, op, "unable to cast to proto.Message"))
labelGRpcCode: statusFromError(v.Error).Code().String(),
}
gRpcRequestLatency.With(l).Observe(v.EndTime.Sub(v.BeginTime).Seconds())
}
return r
}
func (r requestRecorder) record(ctx context.Context, resp interface{}, err error) {
const op = "metric.(requestRecorder).record"
st := statusFromError(err)
r.labels[labelGRpcCode] = st.Code().String()
gRpcRequestLatency.With(r.labels).Observe(time.Since(r.start).Seconds())
if r.reqSize != nil {
gRpcRequestSize.With(r.labels).Observe(float64(*r.reqSize))
}
if respProto, ok := resp.(proto.Message); ok {
respSize := proto.Size(respProto)
gRpcResponseSize.With(r.labels).Observe(float64(respSize))
} else {
event.WriteError(ctx, op, errors.New(ctx, errors.Internal, op, "unable to cast to proto.Message"))
}
}
var allCodes = []codes.Code{
codes.OK, codes.InvalidArgument, codes.PermissionDenied,
codes.FailedPrecondition,
// InstrumentClusterInterceptor wraps a UnaryServerInterceptor and records
// observations for the collectors associated with the cluster's grpc service.
func InstrumentClusterInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
recorder := newRequestRecorder(ctx, req, info.FullMethod)
resp, err := handler(ctx, req)
recorder.record(ctx, resp, err)
return resp, err
}
// Codes which can be generated by the gRPC framework
codes.Canceled, codes.Unknown, codes.DeadlineExceeded,
codes.ResourceExhausted, codes.Unimplemented, codes.Internal,
codes.Unavailable, codes.Unauthenticated,
}
var allCodes = []codes.Code{
codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound,
codes.AlreadyExists, codes.PermissionDenied, codes.Unauthenticated, codes.ResourceExhausted,
codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal,
codes.Unavailable, codes.DataLoss,
// InstrumentClusterStatsHandler returns a gRPC stats.Handler which observes
// cluster specific metrics. Use with the cluster gRPC server.
func InstrumentClusterStatsHandler() statsHandler {
return statsHandler{}
}
// InitializeClusterCollectors registers the cluster metrics to the default
@ -168,7 +116,7 @@ func InitializeClusterCollectors(r prometheus.Registerer, server *grpc.Server) {
if r == nil {
return
}
r.MustRegister(gRpcRequestLatency, gRpcRequestSize, gRpcResponseSize)
r.MustRegister(gRpcRequestLatency)
for serviceName, info := range server.GetServiceInfo() {
for _, mInfo := range info.Methods {
@ -179,8 +127,6 @@ func InitializeClusterCollectors(r prometheus.Registerer, server *grpc.Server) {
labelGRpcCode: c.String(),
}
gRpcRequestLatency.With(l)
gRpcRequestSize.With(l)
gRpcResponseSize.With(l)
}
}
}

@ -2,227 +2,168 @@ package metric
import (
"context"
"fmt"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"
)
// testableObserverVec allows us to assert which observations are being made
// with which labels.
type testableObserverVec struct {
observations []*testableObserver
prometheus.ObserverVec
}
func (v *testableObserverVec) With(l prometheus.Labels) prometheus.Observer {
ret := &testableObserver{labels: l}
v.observations = append(v.observations, ret)
return ret
}
type testableObserver struct {
labels prometheus.Labels
observation float64
}
func (o *testableObserver) Observe(f float64) {
o.observation = f
}
func floatPtr(i int) *float64 {
f := float64(i)
return &f
}
func TestRecorder(t *testing.T) {
func TestStatsHandler(t *testing.T) {
bkpLatency := gRpcRequestLatency
bkpRespSize := gRpcResponseSize
bkpReqSize := gRpcRequestSize
defer func() {
gRpcRequestLatency = bkpLatency
gRpcResponseSize = bkpRespSize
gRpcRequestSize = bkpReqSize
}()
ctx := context.Background()
handler := InstrumentClusterStatsHandler()
cases := []struct {
name string
methodName string
req interface{}
resp interface{}
err error
stats []stats.RPCStats
fullMethodName string
wantedLabels prometheus.Labels
wantedReqSize *float64
wantedRespSize *float64
wantedLatency float64
}{
{
name: "basic",
methodName: "/some.service.path/method",
req: wrapperspb.Bytes([]byte{1, 2, 3}),
resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}),
err: nil,
wantedLabels: map[string]string{
labelGRpcCode: "OK",
labelGRpcMethod: "method",
labelGRpcService: "some.service.path",
name: "basic",
fullMethodName: "/some.service.path/method",
stats: []stats.RPCStats{
&stats.End{
BeginTime: time.Time{}.Add(time.Second),
EndTime: time.Time{}.Add(5 * time.Second),
},
},
wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))),
wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))),
},
{
name: "empty request",
methodName: "/some.service.path/method",
req: wrapperspb.Bytes(nil),
resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}),
err: nil,
wantedLabels: map[string]string{
labelGRpcCode: "OK",
labelGRpcMethod: "method",
labelGRpcService: "some.service.path",
},
wantedReqSize: floatPtr(0),
wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))),
wantedLatency: (4 * time.Second).Seconds(),
},
{
name: "empty response",
methodName: "/some.service.path/method",
req: wrapperspb.Bytes([]byte{1, 2, 3}),
resp: wrapperspb.Bytes(nil),
err: nil,
name: "ignored stats",
fullMethodName: "/some.service.path/method",
stats: []stats.RPCStats{
&stats.Begin{
BeginTime: time.Time{},
IsTransparentRetryAttempt: true,
},
&stats.InPayload{
Length: 5,
WireLength: 15,
RecvTime: time.Time{}.Add(time.Second).Add(500 * time.Millisecond),
},
&stats.OutPayload{
Length: 5,
WireLength: 15,
SentTime: time.Time{}.Add(2 * time.Second),
},
&stats.End{
BeginTime: time.Time{}.Add(time.Second),
EndTime: time.Time{}.Add(5 * time.Second),
},
},
wantedLabels: map[string]string{
labelGRpcCode: "OK",
labelGRpcMethod: "method",
labelGRpcService: "some.service.path",
},
wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))),
wantedRespSize: floatPtr(0),
wantedLatency: (4 * time.Second).Seconds(),
},
{
name: "unrecognized method path format",
methodName: "unrecognized",
req: wrapperspb.Bytes([]byte{1, 2, 3}),
resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}),
err: nil,
name: "bad method name",
fullMethodName: "",
stats: []stats.RPCStats{
&stats.End{
BeginTime: time.Time{}.Add(time.Second),
EndTime: time.Time{}.Add(5 * time.Second),
},
},
wantedLabels: map[string]string{
labelGRpcCode: "OK",
labelGRpcMethod: "unknown",
labelGRpcService: "unknown",
},
wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))),
wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))),
wantedLatency: (4 * time.Second).Seconds(),
},
{
name: "cancel error",
methodName: "/some.service.path/method",
req: wrapperspb.Bytes([]byte{1, 2, 3}),
resp: (*wrapperspb.BytesValue)(nil),
err: status.Error(codes.Canceled, ""),
wantedLabels: map[string]string{
labelGRpcCode: "Canceled",
labelGRpcMethod: "method",
labelGRpcService: "some.service.path",
name: "error code",
fullMethodName: "/some.service.path/method",
stats: []stats.RPCStats{
&stats.End{
BeginTime: time.Time{}.Add(time.Second),
EndTime: time.Time{}.Add(5 * time.Second),
Error: status.Error(codes.Canceled, "test"),
},
},
wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))),
wantedRespSize: floatPtr(0),
},
{
name: "permission error",
methodName: "/some.service.path/method",
req: wrapperspb.Bytes([]byte{1, 2, 3}),
resp: (*wrapperspb.BytesValue)(nil),
err: status.Error(codes.PermissionDenied, ""),
wantedLabels: map[string]string{
labelGRpcCode: "PermissionDenied",
labelGRpcCode: "Canceled",
labelGRpcMethod: "method",
labelGRpcService: "some.service.path",
},
wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))),
wantedRespSize: floatPtr(0),
wantedLatency: (4 * time.Second).Seconds(),
},
{
name: "error and response",
methodName: "/some.service.path/method",
req: wrapperspb.Bytes([]byte{1, 2, 3}),
resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}),
err: status.Error(codes.PermissionDenied, ""),
wantedLabels: map[string]string{
labelGRpcCode: "PermissionDenied",
labelGRpcMethod: "method",
labelGRpcService: "some.service.path",
name: "wrapped error",
fullMethodName: "/some.service.path/method",
stats: []stats.RPCStats{
&stats.End{
BeginTime: time.Time{}.Add(time.Second),
EndTime: time.Time{}.Add(5 * time.Second),
Error: fmt.Errorf("%w", status.Error(codes.InvalidArgument, "test")),
},
},
wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3}))),
wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))),
},
{
name: "bad request",
methodName: "/some.service.path/method",
req: "foo",
resp: wrapperspb.Bytes([]byte{1, 2, 3, 4}),
err: status.Error(codes.PermissionDenied, ""),
wantedLabels: map[string]string{
labelGRpcCode: "PermissionDenied",
labelGRpcCode: "InvalidArgument",
labelGRpcMethod: "method",
labelGRpcService: "some.service.path",
},
wantedReqSize: nil,
wantedRespSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))),
},
{
name: "bad response",
methodName: "/some.service.path/method",
req: wrapperspb.Bytes([]byte{1, 2, 3, 4}),
resp: "foo",
err: status.Error(codes.PermissionDenied, ""),
wantedLabels: map[string]string{
labelGRpcCode: "PermissionDenied",
labelGRpcMethod: "method",
labelGRpcService: "some.service.path",
},
wantedReqSize: floatPtr(proto.Size(wrapperspb.Bytes([]byte{1, 2, 3, 4}))),
wantedRespSize: nil,
wantedLatency: (4 * time.Second).Seconds(),
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
testableLatency := &testableObserverVec{}
testableReqSize := &testableObserverVec{}
testableRespSize := &testableObserverVec{}
gRpcRequestLatency = testableLatency
gRpcResponseSize = testableRespSize
gRpcRequestSize = testableReqSize
// record something
start := time.Now()
tested := newRequestRecorder(ctx, tc.req, tc.methodName)
tested.record(ctx, tc.resp, tc.err)
ctx := context.Background()
ctx = handler.TagRPC(ctx, &stats.RPCTagInfo{
FullMethodName: tc.fullMethodName,
})
require.Len(t, testableLatency.observations, 1)
assert.LessOrEqual(t, testableLatency.observations[0].observation, time.Since(start).Seconds())
assert.Greater(t, testableLatency.observations[0].observation, float64(0))
assert.Equal(t, testableLatency.observations[0].labels, tc.wantedLabels)
if tc.wantedReqSize == nil {
require.Len(t, testableReqSize.observations, 0)
} else {
require.Len(t, testableReqSize.observations, 1)
assert.Equal(t, testableReqSize.observations[0],
&testableObserver{observation: *tc.wantedReqSize, labels: tc.wantedLabels})
for _, i := range tc.stats {
handler.HandleRPC(ctx, i)
}
if tc.wantedRespSize == nil {
require.Len(t, testableRespSize.observations, 0)
} else {
require.Len(t, testableRespSize.observations, 1)
assert.Equal(t, testableRespSize.observations[0],
&testableObserver{observation: *tc.wantedRespSize, labels: tc.wantedLabels})
}
assert.Len(t, testableLatency.observations, 1)
assert.Equal(t, testableLatency.observations[0].observation, tc.wantedLatency)
assert.Equal(t, testableLatency.observations[0].labels, tc.wantedLabels)
})
}
}
// testableObserverVec allows us to assert which observations are being made
// with which labels.
type testableObserverVec struct {
observations []*testableObserver
prometheus.ObserverVec
}
func (v *testableObserverVec) With(l prometheus.Labels) prometheus.Observer {
ret := &testableObserver{labels: l}
v.observations = append(v.observations, ret)
return ret
}
type testableObserver struct {
labels prometheus.Labels
observation float64
}
func (o *testableObserver) Observe(f float64) {
o.observation = f
}

@ -113,11 +113,11 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error
}
workerServer := grpc.NewServer(
grpc.StatsHandler(metric.InstrumentClusterStatsHandler()),
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
metric.InstrumentClusterInterceptor(),
workerReqInterceptor,
auditRequestInterceptor(c.baseContext), // before we get started, audit the request
auditResponseInterceptor(c.baseContext), // as we finish, audit the response

Loading…
Cancel
Save