From cc54a834e80b7aac32f0518568901f78f954462a Mon Sep 17 00:00:00 2001 From: Hugo Date: Wed, 13 Apr 2022 23:52:54 +0100 Subject: [PATCH] feat(worker): Instrumentation for Controller Cluster Communication (#1998) * feat(worker): Instrumentation for Controller Cluster Communication Adds request latency observations to the worker side (client) of worker <-> controller cluster communications by integrating a new grpc client interceptor. * style(controller): make-gen-deltas changes --- .../controller/internal/metric/cluster.go | 28 +-- .../servers/worker/controller_connection.go | 2 + .../worker/internal/metric/cluster_client.go | 155 ++++++++++++++++ .../internal/metric/cluster_client_test.go | 168 ++++++++++++++++++ internal/servers/worker/worker.go | 2 + 5 files changed, 341 insertions(+), 14 deletions(-) create mode 100644 internal/servers/worker/internal/metric/cluster_client.go create mode 100644 internal/servers/worker/internal/metric/cluster_client_test.go diff --git a/internal/servers/controller/internal/metric/cluster.go b/internal/servers/controller/internal/metric/cluster.go index 1f73091e56..a66cc466f7 100644 --- a/internal/servers/controller/internal/metric/cluster.go +++ b/internal/servers/controller/internal/metric/cluster.go @@ -20,20 +20,18 @@ const ( clusterSubSystem = "controller_cluster" ) -var ( - // gRpcRequestLatency collects measurements of how long it takes - // the boundary system to reply to a request to the controller cluster - // from the time that boundary received the request. - gRpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: globals.MetricNamespace, - Subsystem: clusterSubSystem, - Name: "grpc_request_duration_seconds", - Help: "Histogram of latencies for gRPC requests.", - Buckets: prometheus.DefBuckets, - }, - []string{labelGRpcCode, labelGRpcService, labelGRpcMethod}, - ) +// gRpcRequestLatency collects measurements of how long it takes +// the boundary system to reply to a request to the controller cluster +// from the time that boundary received the request. +var gRpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: globals.MetricNamespace, + Subsystem: clusterSubSystem, + Name: "grpc_request_duration_seconds", + Help: "Histogram of latencies for gRPC requests.", + Buckets: prometheus.DefBuckets, + }, + []string{labelGRpcCode, labelGRpcService, labelGRpcMethod}, ) // statusFromError retrieves the *status.Status from the provided error. It'll @@ -70,9 +68,11 @@ type statsHandler struct{} func (sh statsHandler) TagRPC(ctx context.Context, i *stats.RPCTagInfo) context.Context { return context.WithValue(ctx, metricMethodNameContextKey{}, i.FullMethodName) } + func (sh statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { return ctx } + func (sh statsHandler) HandleConn(context.Context, stats.ConnStats) { } diff --git a/internal/servers/worker/controller_connection.go b/internal/servers/worker/controller_connection.go index 1b1402d952..210f31936a 100644 --- a/internal/servers/worker/controller_connection.go +++ b/internal/servers/worker/controller_connection.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/boundary/internal/cmd/base" pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" "github.com/hashicorp/boundary/internal/observability/event" + "github.com/hashicorp/boundary/internal/servers/worker/internal/metric" "github.com/hashicorp/go-secure-stdlib/base62" "google.golang.org/grpc" "google.golang.org/grpc/resolver" @@ -115,6 +116,7 @@ func (w *Worker) createClientConn(addr string) error { cc, err := grpc.DialContext(w.baseContext, fmt.Sprintf("%s:///%s", res.Scheme(), addr), grpc.WithResolvers(res), + grpc.WithUnaryInterceptor(metric.InstrumentClusterClient()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(math.MaxInt32)), grpc.WithContextDialer(w.controllerDialerFunc()), diff --git a/internal/servers/worker/internal/metric/cluster_client.go b/internal/servers/worker/internal/metric/cluster_client.go new file mode 100644 index 0000000000..a6f3810209 --- /dev/null +++ b/internal/servers/worker/internal/metric/cluster_client.go @@ -0,0 +1,155 @@ +package metric + +import ( + "context" + "strings" + "time" + + "github.com/hashicorp/boundary/globals" + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/gen/controller/servers/services" + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" +) + +const ( + labelGrpcCode = "grpc_code" + labelGrpcService = "grpc_service" + labelGrpcMethod = "grpc_method" + + clusterClientSubsystem = "cluster_client" +) + +// grpcRequestLatency collects measurements of how long a gRPC +// request between a cluster and its clients takes. +var grpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: globals.MetricNamespace, + Subsystem: clusterClientSubsystem, + Name: "grpc_request_duration_seconds", + Help: "Histogram of latencies for gRPC requests between the cluster and any of its clients.", + Buckets: prometheus.DefBuckets, + }, + []string{labelGrpcCode, labelGrpcService, labelGrpcMethod}, +) + +// statusFromError retrieves the *status.Status from the provided error. It'll +// attempt to unwrap the *status.Error, which is something status.FromError +// does not do. +func statusFromError(err error) *status.Status { + if s, ok := status.FromError(err); ok { + return s + } + + type gRPCStatus interface { + GRPCStatus() *status.Status + } + var unwrappedStatus gRPCStatus + if ok := errors.As(err, &unwrappedStatus); ok { + return unwrappedStatus.GRPCStatus() + } + + return status.New(codes.Unknown, "Unknown Code") +} + +func splitMethodName(fullMethodName string) (string, string) { + fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash + if i := strings.Index(fullMethodName, "/"); i >= 0 { + return fullMethodName[:i], fullMethodName[i+1:] + } + return "unknown", "unknown" +} + +type requestRecorder struct { + labels prometheus.Labels + + // measurements + start time.Time +} + +func newRequestRecorder(fullMethodName string) requestRecorder { + service, method := splitMethodName(fullMethodName) + r := requestRecorder{ + labels: prometheus.Labels{ + labelGrpcMethod: method, + labelGrpcService: service, + }, + start: time.Now(), + } + + return r +} + +func (r requestRecorder) record(err error) { + r.labels[labelGrpcCode] = statusFromError(err).Code().String() + grpcRequestLatency.With(r.labels).Observe(time.Since(r.start).Seconds()) +} + +// InstrumentClusterClient wraps a UnaryClientInterceptor and records +// observations for the collectors associated with gRPC connections +// between the cluster and its clients. +func InstrumentClusterClient() grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + r := newRequestRecorder(method) + err := invoker(ctx, method, req, reply, cc, opts...) + r.record(err) + return err + } +} + +var allCodes = []codes.Code{ + codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound, + codes.AlreadyExists, codes.PermissionDenied, codes.Unauthenticated, codes.ResourceExhausted, + codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal, + codes.Unavailable, codes.DataLoss, +} + +func InitializeClusterClientCollectors(r prometheus.Registerer) { + if r == nil { + return + } + r.MustRegister(grpcRequestLatency) + + serviceNamesToMethodNames := make(map[string][]string, 0) + protoregistry.GlobalFiles.RangeFilesByPackage( + services.File_controller_servers_services_v1_session_service_proto.Package(), + func(fd protoreflect.FileDescriptor) bool { return rangeProtofiles(serviceNamesToMethodNames, fd) }, + ) + + for serviceName, serviceMethods := range serviceNamesToMethodNames { + for _, sm := range serviceMethods { + for _, c := range allCodes { + grpcRequestLatency.With(prometheus.Labels{ + labelGrpcCode: c.String(), + labelGrpcMethod: sm, + labelGrpcService: serviceName, + }) + } + } + } +} + +func rangeProtofiles(m map[string][]string, fd protoreflect.FileDescriptor) bool { + if fd.Services().Len() == 0 { + return true + } + + for i := 0; i < fd.Services().Len(); i++ { + s := fd.Services().Get(i) + if s.Methods().Len() == 0 { + continue + } + + methods := []string{} + for j := 0; j < s.Methods().Len(); j++ { + methods = append(methods, string(s.Methods().Get(j).Name())) + } + m[string(s.FullName())] = methods + } + + return true +} diff --git a/internal/servers/worker/internal/metric/cluster_client_test.go b/internal/servers/worker/internal/metric/cluster_client_test.go new file mode 100644 index 0000000000..9edfeb3bac --- /dev/null +++ b/internal/servers/worker/internal/metric/cluster_client_test.go @@ -0,0 +1,168 @@ +package metric + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +func TestInitializeClusterClientCollectors(t *testing.T) { + require.NotPanics(t, func() { InitializeClusterClientCollectors(nil) }) + require.NotPanics(t, func() { InitializeClusterClientCollectors(prometheus.NewRegistry()) }) +} + +func TestRecorder(t *testing.T) { + cases := []struct { + name string + methodName string + err error + wantedLabels prometheus.Labels + }{ + { + name: "basic", + methodName: "/some.service.path/method", + err: nil, + wantedLabels: map[string]string{ + labelGrpcCode: "OK", + labelGrpcMethod: "method", + labelGrpcService: "some.service.path", + }, + }, + { + name: "unrecognized method path format", + methodName: "unrecognized", + err: nil, + wantedLabels: map[string]string{ + labelGrpcCode: "OK", + labelGrpcMethod: "unknown", + labelGrpcService: "unknown", + }, + }, + { + name: "cancel error", + methodName: "/some.service.path/method", + err: status.Error(codes.Canceled, ""), + wantedLabels: map[string]string{ + labelGrpcCode: "Canceled", + labelGrpcMethod: "method", + labelGrpcService: "some.service.path", + }, + }, + { + name: "permission error", + methodName: "/some.service.path/method", + err: status.Error(codes.PermissionDenied, ""), + wantedLabels: map[string]string{ + labelGrpcCode: "PermissionDenied", + labelGrpcMethod: "method", + labelGrpcService: "some.service.path", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ogReqLatency := grpcRequestLatency + defer func() { grpcRequestLatency = ogReqLatency }() + + testableLatency := &testableObserverVec{} + grpcRequestLatency = testableLatency + + start := time.Now() + tested := newRequestRecorder(tc.methodName) + tested.record(tc.err) + + require.Len(t, testableLatency.observations, 1) + assert.Greater(t, testableLatency.observations[0].observation, float64(0)) + assert.LessOrEqual(t, testableLatency.observations[0].observation, time.Since(start).Seconds()) + assert.Equal(t, testableLatency.observations[0].labels, tc.wantedLabels) + }) + } +} + +func TestInstrumentClusterClient(t *testing.T) { + ogReqLatency := grpcRequestLatency + defer func() { grpcRequestLatency = ogReqLatency }() + + testableLatency := &testableObserverVec{} + grpcRequestLatency = testableLatency + + interceptor := InstrumentClusterClient() + i := &testInvoker{t: t, retErr: nil} + + start := time.Now() + err := interceptor(context.Background(), "/some.service.path/method", wrapperspb.Bytes([]byte{1}), nil, nil, i.invoke, []grpc.CallOption{}...) + require.NoError(t, err) + require.True(t, i.called) + + require.Len(t, testableLatency.observations, 1) + assert.Greater(t, testableLatency.observations[0].observation, float64(0)) + assert.LessOrEqual(t, testableLatency.observations[0].observation, time.Since(start).Seconds()) +} + +func TestInstrumentClusterClient_InvokerError(t *testing.T) { + ogReqLatency := grpcRequestLatency + defer func() { grpcRequestLatency = ogReqLatency }() + + testableLatency := &testableObserverVec{} + grpcRequestLatency = testableLatency + + interceptor := InstrumentClusterClient() + i := &testInvoker{t: t, retErr: fmt.Errorf("oops!")} + + start := time.Now() + err := interceptor(context.Background(), "/some.service.path/method", wrapperspb.Bytes([]byte{1}), nil, nil, i.invoke, []grpc.CallOption{}...) + require.EqualError(t, err, "oops!") + require.True(t, i.called) + + // We still assert request latency in error states. + require.Len(t, testableLatency.observations, 1) + assert.Greater(t, testableLatency.observations[0].observation, float64(0)) + assert.LessOrEqual(t, testableLatency.observations[0].observation, time.Since(start).Seconds()) +} + +type testInvoker struct { + t *testing.T + called bool + retErr error +} + +func (i *testInvoker) invoke(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + i.called = true + + require.NotNil(i.t, ctx) + require.NotEmpty(i.t, method) + require.NotNil(i.t, req) + return i.retErr +} + +// testableObserverVec allows us to assert which observations are being made +// with which labels. +type testableObserverVec struct { + observations []*testableObserver + prometheus.ObserverVec +} + +func (v *testableObserverVec) With(l prometheus.Labels) prometheus.Observer { + ret := &testableObserver{labels: l} + v.observations = append(v.observations, ret) + return ret +} + +type testableObserver struct { + labels prometheus.Labels + observation float64 +} + +func (o *testableObserver) Observe(f float64) { + o.observation = f +} diff --git a/internal/servers/worker/worker.go b/internal/servers/worker/worker.go index ad403ef0a2..64c428edcf 100644 --- a/internal/servers/worker/worker.go +++ b/internal/servers/worker/worker.go @@ -68,6 +68,8 @@ type Worker struct { func New(conf *Config) (*Worker, error) { metric.InitializeHttpCollectors(conf.PrometheusRegisterer) metric.InitializeWebsocketCollectors(conf.PrometheusRegisterer) + metric.InitializeClusterClientCollectors(conf.PrometheusRegisterer) + w := &Worker{ conf: conf, logger: conf.Logger.Named("worker"),