feat(worker): Instrumentation for Controller Cluster Communication (#1998)

* feat(worker): Instrumentation for Controller Cluster Communication

Adds request latency observations to the worker side (client) of
worker <-> controller cluster communications by integrating a new
grpc client interceptor.

* style(controller): make-gen-deltas changes
pull/2010/head
Hugo 4 years ago committed by GitHub
parent 36b7f3e1f8
commit cc54a834e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,20 +20,18 @@ const (
clusterSubSystem = "controller_cluster"
)
var (
// gRpcRequestLatency collects measurements of how long it takes
// the boundary system to reply to a request to the controller cluster
// from the time that boundary received the request.
gRpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: globals.MetricNamespace,
Subsystem: clusterSubSystem,
Name: "grpc_request_duration_seconds",
Help: "Histogram of latencies for gRPC requests.",
Buckets: prometheus.DefBuckets,
},
[]string{labelGRpcCode, labelGRpcService, labelGRpcMethod},
)
// gRpcRequestLatency collects measurements of how long it takes
// the boundary system to reply to a request to the controller cluster
// from the time that boundary received the request.
var gRpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: globals.MetricNamespace,
Subsystem: clusterSubSystem,
Name: "grpc_request_duration_seconds",
Help: "Histogram of latencies for gRPC requests.",
Buckets: prometheus.DefBuckets,
},
[]string{labelGRpcCode, labelGRpcService, labelGRpcMethod},
)
// statusFromError retrieves the *status.Status from the provided error. It'll
@ -70,9 +68,11 @@ type statsHandler struct{}
func (sh statsHandler) TagRPC(ctx context.Context, i *stats.RPCTagInfo) context.Context {
return context.WithValue(ctx, metricMethodNameContextKey{}, i.FullMethodName)
}
func (sh statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (sh statsHandler) HandleConn(context.Context, stats.ConnStats) {
}

@ -21,6 +21,7 @@ import (
"github.com/hashicorp/boundary/internal/cmd/base"
pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/hashicorp/boundary/internal/observability/event"
"github.com/hashicorp/boundary/internal/servers/worker/internal/metric"
"github.com/hashicorp/go-secure-stdlib/base62"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
@ -115,6 +116,7 @@ func (w *Worker) createClientConn(addr string) error {
cc, err := grpc.DialContext(w.baseContext,
fmt.Sprintf("%s:///%s", res.Scheme(), addr),
grpc.WithResolvers(res),
grpc.WithUnaryInterceptor(metric.InstrumentClusterClient()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(math.MaxInt32)),
grpc.WithContextDialer(w.controllerDialerFunc()),

@ -0,0 +1,155 @@
package metric
import (
"context"
"strings"
"time"
"github.com/hashicorp/boundary/globals"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
)
const (
labelGrpcCode = "grpc_code"
labelGrpcService = "grpc_service"
labelGrpcMethod = "grpc_method"
clusterClientSubsystem = "cluster_client"
)
// grpcRequestLatency collects measurements of how long a gRPC
// request between a cluster and its clients takes.
var grpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: globals.MetricNamespace,
Subsystem: clusterClientSubsystem,
Name: "grpc_request_duration_seconds",
Help: "Histogram of latencies for gRPC requests between the cluster and any of its clients.",
Buckets: prometheus.DefBuckets,
},
[]string{labelGrpcCode, labelGrpcService, labelGrpcMethod},
)
// statusFromError retrieves the *status.Status from the provided error. It'll
// attempt to unwrap the *status.Error, which is something status.FromError
// does not do.
func statusFromError(err error) *status.Status {
if s, ok := status.FromError(err); ok {
return s
}
type gRPCStatus interface {
GRPCStatus() *status.Status
}
var unwrappedStatus gRPCStatus
if ok := errors.As(err, &unwrappedStatus); ok {
return unwrappedStatus.GRPCStatus()
}
return status.New(codes.Unknown, "Unknown Code")
}
func splitMethodName(fullMethodName string) (string, string) {
fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
if i := strings.Index(fullMethodName, "/"); i >= 0 {
return fullMethodName[:i], fullMethodName[i+1:]
}
return "unknown", "unknown"
}
type requestRecorder struct {
labels prometheus.Labels
// measurements
start time.Time
}
func newRequestRecorder(fullMethodName string) requestRecorder {
service, method := splitMethodName(fullMethodName)
r := requestRecorder{
labels: prometheus.Labels{
labelGrpcMethod: method,
labelGrpcService: service,
},
start: time.Now(),
}
return r
}
func (r requestRecorder) record(err error) {
r.labels[labelGrpcCode] = statusFromError(err).Code().String()
grpcRequestLatency.With(r.labels).Observe(time.Since(r.start).Seconds())
}
// InstrumentClusterClient wraps a UnaryClientInterceptor and records
// observations for the collectors associated with gRPC connections
// between the cluster and its clients.
func InstrumentClusterClient() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
r := newRequestRecorder(method)
err := invoker(ctx, method, req, reply, cc, opts...)
r.record(err)
return err
}
}
var allCodes = []codes.Code{
codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound,
codes.AlreadyExists, codes.PermissionDenied, codes.Unauthenticated, codes.ResourceExhausted,
codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal,
codes.Unavailable, codes.DataLoss,
}
func InitializeClusterClientCollectors(r prometheus.Registerer) {
if r == nil {
return
}
r.MustRegister(grpcRequestLatency)
serviceNamesToMethodNames := make(map[string][]string, 0)
protoregistry.GlobalFiles.RangeFilesByPackage(
services.File_controller_servers_services_v1_session_service_proto.Package(),
func(fd protoreflect.FileDescriptor) bool { return rangeProtofiles(serviceNamesToMethodNames, fd) },
)
for serviceName, serviceMethods := range serviceNamesToMethodNames {
for _, sm := range serviceMethods {
for _, c := range allCodes {
grpcRequestLatency.With(prometheus.Labels{
labelGrpcCode: c.String(),
labelGrpcMethod: sm,
labelGrpcService: serviceName,
})
}
}
}
}
func rangeProtofiles(m map[string][]string, fd protoreflect.FileDescriptor) bool {
if fd.Services().Len() == 0 {
return true
}
for i := 0; i < fd.Services().Len(); i++ {
s := fd.Services().Get(i)
if s.Methods().Len() == 0 {
continue
}
methods := []string{}
for j := 0; j < s.Methods().Len(); j++ {
methods = append(methods, string(s.Methods().Get(j).Name()))
}
m[string(s.FullName())] = methods
}
return true
}

@ -0,0 +1,168 @@
package metric
import (
"context"
"fmt"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func TestInitializeClusterClientCollectors(t *testing.T) {
require.NotPanics(t, func() { InitializeClusterClientCollectors(nil) })
require.NotPanics(t, func() { InitializeClusterClientCollectors(prometheus.NewRegistry()) })
}
func TestRecorder(t *testing.T) {
cases := []struct {
name string
methodName string
err error
wantedLabels prometheus.Labels
}{
{
name: "basic",
methodName: "/some.service.path/method",
err: nil,
wantedLabels: map[string]string{
labelGrpcCode: "OK",
labelGrpcMethod: "method",
labelGrpcService: "some.service.path",
},
},
{
name: "unrecognized method path format",
methodName: "unrecognized",
err: nil,
wantedLabels: map[string]string{
labelGrpcCode: "OK",
labelGrpcMethod: "unknown",
labelGrpcService: "unknown",
},
},
{
name: "cancel error",
methodName: "/some.service.path/method",
err: status.Error(codes.Canceled, ""),
wantedLabels: map[string]string{
labelGrpcCode: "Canceled",
labelGrpcMethod: "method",
labelGrpcService: "some.service.path",
},
},
{
name: "permission error",
methodName: "/some.service.path/method",
err: status.Error(codes.PermissionDenied, ""),
wantedLabels: map[string]string{
labelGrpcCode: "PermissionDenied",
labelGrpcMethod: "method",
labelGrpcService: "some.service.path",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ogReqLatency := grpcRequestLatency
defer func() { grpcRequestLatency = ogReqLatency }()
testableLatency := &testableObserverVec{}
grpcRequestLatency = testableLatency
start := time.Now()
tested := newRequestRecorder(tc.methodName)
tested.record(tc.err)
require.Len(t, testableLatency.observations, 1)
assert.Greater(t, testableLatency.observations[0].observation, float64(0))
assert.LessOrEqual(t, testableLatency.observations[0].observation, time.Since(start).Seconds())
assert.Equal(t, testableLatency.observations[0].labels, tc.wantedLabels)
})
}
}
func TestInstrumentClusterClient(t *testing.T) {
ogReqLatency := grpcRequestLatency
defer func() { grpcRequestLatency = ogReqLatency }()
testableLatency := &testableObserverVec{}
grpcRequestLatency = testableLatency
interceptor := InstrumentClusterClient()
i := &testInvoker{t: t, retErr: nil}
start := time.Now()
err := interceptor(context.Background(), "/some.service.path/method", wrapperspb.Bytes([]byte{1}), nil, nil, i.invoke, []grpc.CallOption{}...)
require.NoError(t, err)
require.True(t, i.called)
require.Len(t, testableLatency.observations, 1)
assert.Greater(t, testableLatency.observations[0].observation, float64(0))
assert.LessOrEqual(t, testableLatency.observations[0].observation, time.Since(start).Seconds())
}
func TestInstrumentClusterClient_InvokerError(t *testing.T) {
ogReqLatency := grpcRequestLatency
defer func() { grpcRequestLatency = ogReqLatency }()
testableLatency := &testableObserverVec{}
grpcRequestLatency = testableLatency
interceptor := InstrumentClusterClient()
i := &testInvoker{t: t, retErr: fmt.Errorf("oops!")}
start := time.Now()
err := interceptor(context.Background(), "/some.service.path/method", wrapperspb.Bytes([]byte{1}), nil, nil, i.invoke, []grpc.CallOption{}...)
require.EqualError(t, err, "oops!")
require.True(t, i.called)
// We still assert request latency in error states.
require.Len(t, testableLatency.observations, 1)
assert.Greater(t, testableLatency.observations[0].observation, float64(0))
assert.LessOrEqual(t, testableLatency.observations[0].observation, time.Since(start).Seconds())
}
type testInvoker struct {
t *testing.T
called bool
retErr error
}
func (i *testInvoker) invoke(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
i.called = true
require.NotNil(i.t, ctx)
require.NotEmpty(i.t, method)
require.NotNil(i.t, req)
return i.retErr
}
// testableObserverVec allows us to assert which observations are being made
// with which labels.
type testableObserverVec struct {
observations []*testableObserver
prometheus.ObserverVec
}
func (v *testableObserverVec) With(l prometheus.Labels) prometheus.Observer {
ret := &testableObserver{labels: l}
v.observations = append(v.observations, ret)
return ret
}
type testableObserver struct {
labels prometheus.Labels
observation float64
}
func (o *testableObserver) Observe(f float64) {
o.observation = f
}

@ -68,6 +68,8 @@ type Worker struct {
func New(conf *Config) (*Worker, error) {
metric.InitializeHttpCollectors(conf.PrometheusRegisterer)
metric.InitializeWebsocketCollectors(conf.PrometheusRegisterer)
metric.InitializeClusterClientCollectors(conf.PrometheusRegisterer)
w := &Worker{
conf: conf,
logger: conf.Logger.Named("worker"),

Loading…
Cancel
Save