diff --git a/internal/daemon/controller/internal/metric/cluster.go b/internal/daemon/controller/internal/metric/cluster.go index 10790a70ef..0b4baadab5 100644 --- a/internal/daemon/controller/internal/metric/cluster.go +++ b/internal/daemon/controller/internal/metric/cluster.go @@ -2,6 +2,7 @@ package metric import ( "context" + "net" "github.com/hashicorp/boundary/globals" "github.com/hashicorp/boundary/internal/daemon/metric" @@ -33,6 +34,28 @@ var grpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec( metric.ListGrpcLabels, ) +// acceptedConnsTotal keeps a count of the total accepted connections to a controller. +var acceptedConnsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: globals.MetricNamespace, + Subsystem: clusterSubSystem, + Name: "accepted_connections_total", + Help: "Count of total accepted network connections to this controller.", + }, + []string{metric.LabelConnectionPurpose}, +) + +// closedConnsTotal keeps a count of the total closed connections to a controller. +var closedConnsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: globals.MetricNamespace, + Subsystem: clusterSubSystem, + Name: "closed_connections_total", + Help: "Count of total closed network connections to this controller.", + }, + []string{metric.LabelConnectionPurpose}, +) + // All the codes expected to be returned by boundary or the grpc framework to // requests to the cluster server. var expectedGrpcCodes = []codes.Code{ @@ -45,6 +68,11 @@ var expectedGrpcCodes = []codes.Code{ codes.Unavailable, codes.Unauthenticated, } +func InstrumentClusterTrackingListener(l net.Listener, purpose string) net.Listener { + p := prometheus.Labels{metric.LabelConnectionPurpose: purpose} + return metric.NewConnectionTrackingListener(l, acceptedConnsTotal.With(p), closedConnsTotal.With(p)) +} + // InstrumentClusterStatsHandler returns a gRPC stats.Handler which observes // cluster specific metrics. Use with the cluster gRPC server. func InstrumentClusterStatsHandler(ctx context.Context) (stats.Handler, error) { @@ -57,3 +85,7 @@ func InstrumentClusterStatsHandler(ctx context.Context) (stats.Handler, error) { func InitializeClusterCollectors(r prometheus.Registerer, server *grpc.Server) { metric.InitializeGrpcCollectorsFromServer(r, grpcRequestLatency, server, expectedGrpcCodes) } + +func InitializeConnectionCounters(r prometheus.Registerer) { + metric.InitializeConnectionCounters(r, []prometheus.CounterVec{*acceptedConnsTotal, *closedConnsTotal}) +} diff --git a/internal/daemon/controller/listeners.go b/internal/daemon/controller/listeners.go index 32b6af46ce..e826678da2 100644 --- a/internal/daemon/controller/listeners.go +++ b/internal/daemon/controller/listeners.go @@ -237,6 +237,7 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error } } + metric.InitializeConnectionCounters(c.conf.PrometheusRegisterer) metric.InitializeClusterCollectors(c.conf.PrometheusRegisterer, workerServer) ln.GrpcServer = workerServer @@ -249,13 +250,13 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error } }() go func() { - err := handleSecondaryConnection(c.baseContext, multiplexingReverseGrpcListener, c.downstreamRoutes, -1) + err := handleSecondaryConnection(c.baseContext, metric.InstrumentClusterTrackingListener(revPkiWorkerTrackingListener, "reverse-grpc"), c.downstreamRoutes, -1) if err != nil { event.WriteError(c.baseContext, op, err, event.WithInfoMsg("handleSecondaryConnection error")) } }() go func() { - err := ln.GrpcServer.Serve(multiplexingAuthedListener) + err := ln.GrpcServer.Serve(metric.InstrumentClusterTrackingListener(multiplexingAuthedListener, "grpc")) if err != nil { event.WriteError(c.baseContext, op, err, event.WithInfoMsg("multiplexingAuthedListener error")) } diff --git a/internal/daemon/metric/initialize_metrics.go b/internal/daemon/metric/initialize_metrics.go index 47668d2a14..c5c41130a3 100644 --- a/internal/daemon/metric/initialize_metrics.go +++ b/internal/daemon/metric/initialize_metrics.go @@ -15,12 +15,13 @@ import ( ) const ( - LabelGrpcService = "grpc_service" - LabelGrpcMethod = "grpc_method" - LabelGrpcCode = "grpc_code" - LabelHttpPath = "path" - LabelHttpMethod = "method" - LabelHttpCode = "code" + LabelConnectionPurpose = "purpose" + LabelGrpcService = "grpc_service" + LabelGrpcMethod = "grpc_method" + LabelGrpcCode = "grpc_code" + LabelHttpPath = "path" + LabelHttpMethod = "method" + LabelHttpCode = "code" invalidPathValue = "invalid" ) @@ -86,6 +87,15 @@ func InitializeGrpcCollectorsFromPackage(r prometheus.Registerer, v prometheus.O } } +func InitializeConnectionCounters(r prometheus.Registerer, counters []prometheus.CounterVec) { + if r == nil { + return + } + for _, c := range counters { + r.MustRegister(c) + } +} + // InitializeGrpcCollectorsFromServer registers and zeroes a Prometheus // histogram, finding all service and method labels from the provided gRPC // server. diff --git a/internal/daemon/metric/instrument_handlers.go b/internal/daemon/metric/instrument_handlers.go index ae47e97790..e4c79ad890 100644 --- a/internal/daemon/metric/instrument_handlers.go +++ b/internal/daemon/metric/instrument_handlers.go @@ -4,7 +4,9 @@ package metric import ( "context" + "net" "strings" + "sync" "time" "github.com/hashicorp/boundary/internal/errors" @@ -95,6 +97,41 @@ func (r requestRecorder) Record(err error) { r.reqLatency.With(r.labels).Observe(time.Since(r.start).Seconds()) } +type connectionTrackingListener struct { + net.Listener + acceptedConns prometheus.Counter + closedConns prometheus.Counter +} + +func (l *connectionTrackingListener) Accept() (net.Conn, error) { + conn, err := l.Listener.Accept() + if err != nil { + return nil, err + } + l.acceptedConns.Inc() + return &connectionTrackingListenerConn{Conn: conn, closedConns: l.closedConns}, nil +} + +// NewConnectionTrackingListener registers a new Prometheus gauge with an unique +// connection type label and wraps an existing listener to track when connections +// are accepted and closed. +// Multiple calls to Close() a listener connection will only decrement the gauge +// once. A call to Close() will decrement the gauge even if Close() errors. +func NewConnectionTrackingListener(l net.Listener, ac prometheus.Counter, cc prometheus.Counter) *connectionTrackingListener { + return &connectionTrackingListener{Listener: l, acceptedConns: ac, closedConns: cc} +} + +type connectionTrackingListenerConn struct { + net.Conn + dec sync.Once + closedConns prometheus.Counter +} + +func (c *connectionTrackingListenerConn) Close() error { + c.dec.Do(func() { c.closedConns.Inc() }) + return c.Conn.Close() +} + // StatusFromError retrieves the *status.Status from the provided error. It'll // attempt to unwrap the *status.Error, which is something status.FromError // does not do. diff --git a/internal/daemon/metric/instrument_handlers_test.go b/internal/daemon/metric/instrument_handlers_test.go index b528dceb0a..264d789f21 100644 --- a/internal/daemon/metric/instrument_handlers_test.go +++ b/internal/daemon/metric/instrument_handlers_test.go @@ -2,7 +2,9 @@ package metric import ( "context" + "errors" "fmt" + "net" "testing" "time" @@ -15,10 +17,12 @@ import ( "google.golang.org/grpc/status" ) +const testSubsystem = "test_metric" + var grpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: globals.MetricNamespace, - Subsystem: "test_metric", + Subsystem: testSubsystem, Name: "grpc_request_duration_seconds", Help: "Test histogram.", Buckets: prometheus.DefBuckets, @@ -26,6 +30,225 @@ var grpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec( ListGrpcLabels, ) +var testAcceptedConns prometheus.CounterVec = *prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: globals.MetricNamespace, + Subsystem: testSubsystem, + Name: "test_accepted_connections_total", + Help: "Test CounterVec.", + }, + []string{LabelConnectionPurpose}, +) + +var testClosedConns prometheus.CounterVec = *prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: globals.MetricNamespace, + Subsystem: testSubsystem, + Name: "test_closed_connections_total", + Help: "Test CounterVec.", + }, + []string{LabelConnectionPurpose}, +) + +type testPrometheusCounter struct { + prometheus.Metric + prometheus.Collector + + incCalledN int + t *testing.T +} + +func (tpg *testPrometheusCounter) Inc() { tpg.incCalledN++ } +func (tpg *testPrometheusCounter) Add(float64) { tpg.t.Fatal("testPrometheusCounter Add() called") } + +type testListener struct { + net.Listener + lastClientConn net.Conn +} + +func (l *testListener) Accept() (net.Conn, error) { + s, c := net.Pipe() + l.lastClientConn = c + return s, nil +} + +func (l *testListener) Close() error { + return l.lastClientConn.Close() +} + +type erroringAcceptListener struct { + net.Listener +} + +func (l *erroringAcceptListener) Accept() (net.Conn, error) { + return nil, errors.New("error for testcase") +} + +type erroringCloseListener struct { + net.Listener + lastClientConn net.Conn +} + +func (l *erroringCloseListener) Accept() (net.Conn, error) { + s, c := net.Pipe() + l.lastClientConn = c + return &erroringConn{Conn: s}, nil +} + +type erroringConn struct { + net.Conn +} + +func (c *erroringConn) Close() error { + c.Conn.Close() + return errors.New("error for testcase") +} + +func TestNewConnectionTrackingListener(t *testing.T) { + t.Run("set-label", + func(t *testing.T) { + l := &testListener{} + acceptedConns := testAcceptedConns.With(prometheus.Labels{LabelConnectionPurpose: "test_label"}) + closedConns := testClosedConns.With(prometheus.Labels{LabelConnectionPurpose: "test_label"}) + ctl := NewConnectionTrackingListener(l, acceptedConns, closedConns) + require.NotNil(t, ctl) + + assert.Equal(t, ctl.Listener, l) + cc, err := ctl.Accept() + require.NoError(t, err) + require.NotNil(t, cc) + + // check purpose label was populated correctly by attempting to delete it + assert.Equal(t, testAcceptedConns.DeleteLabelValues("test_label"), true) + assert.Equal(t, testClosedConns.DeleteLabelValues("test_label"), true) + require.NoError(t, cc.Close()) + }) + t.Run("accept-err", + func(t *testing.T) { + tpcAcc := &testPrometheusCounter{t: t} + tpcClo := &testPrometheusCounter{t: t} + el := &erroringAcceptListener{} + ctl := NewConnectionTrackingListener(el, tpcAcc, tpcClo) + require.NotNil(t, ctl) + + cc, err := ctl.Accept() + assert.Nil(t, cc) + assert.Contains(t, "error for testcase", err.Error()) + assert.Equal(t, 0, tpcAcc.incCalledN) + assert.Equal(t, 0, tpcClo.incCalledN) + }) + t.Run("accept-multiple", + func(t *testing.T) { + tpcAcc := &testPrometheusCounter{t: t} + tpcClo := &testPrometheusCounter{t: t} + n := 10 + for i := 0; i < n; i++ { + l := &testListener{} + ctl := NewConnectionTrackingListener(l, tpcAcc, tpcClo) + require.NotNil(t, ctl) + cc, err := ctl.Accept() + require.NotNil(t, cc) + require.NoError(t, err) + assert.Equal(t, i+1, tpcAcc.incCalledN) + + require.NoError(t, cc.Close()) + assert.Equal(t, i+1, tpcClo.incCalledN) + } + }) + t.Run("close-err", + func(t *testing.T) { + tpcAcc := &testPrometheusCounter{t: t} + tpcClo := &testPrometheusCounter{t: t} + el := &erroringCloseListener{} + ctl := NewConnectionTrackingListener(el, tpcAcc, tpcClo) + require.NotNil(t, ctl) + + cc, err := ctl.Accept() + require.NotNil(t, cc) + require.NoError(t, err) + assert.Equal(t, 1, tpcAcc.incCalledN) + + assert.Error(t, cc.Close()) + assert.Equal(t, 1, tpcClo.incCalledN) + }) + t.Run("close-repeat-calls", + func(t *testing.T) { + tpcAcc := &testPrometheusCounter{t: t} + tpcClo := &testPrometheusCounter{t: t} + l := &testListener{} + ctl := NewConnectionTrackingListener(l, tpcAcc, tpcClo) + require.NotNil(t, ctl) + + cc, err := ctl.Accept() + require.Nil(t, err) + require.NotNil(t, cc) + assert.Equal(t, 0, tpcClo.incCalledN) + + for i := 0; i <= 5; i++ { + assert.NoError(t, cc.Close()) + } + assert.Equal(t, 1, tpcClo.incCalledN) + }) + t.Run("inc-dec", + func(t *testing.T) { + tpcAcc := &testPrometheusCounter{t: t} + tpcClo := &testPrometheusCounter{t: t} + l := &testListener{} + + ctl := NewConnectionTrackingListener(l, tpcAcc, tpcClo) + require.NotNil(t, ctl) + assert.Equal(t, 0, tpcAcc.incCalledN) + + cc, err := ctl.Accept() + require.Nil(t, err) + require.NotNil(t, cc) + assert.Equal(t, 1, tpcAcc.incCalledN) + + require.NoError(t, cc.Close()) + assert.Equal(t, 1, tpcClo.incCalledN) + }, + ) + t.Run("more-inc-dec", + func(t *testing.T) { + tpcAcc := &testPrometheusCounter{t: t} + tpcClo := &testPrometheusCounter{t: t} + l1 := &testListener{} + l2 := &testListener{} + l3 := &testListener{} + + ctl1 := NewConnectionTrackingListener(l1, tpcAcc, tpcClo) + require.NotNil(t, ctl1) + assert.Equal(t, 0, tpcAcc.incCalledN) + + cc1, err := ctl1.Accept() + require.NoError(t, err) + require.NotNil(t, cc1) + assert.Equal(t, 1, tpcAcc.incCalledN) + + ctl2 := NewConnectionTrackingListener(l2, tpcAcc, tpcClo) + require.NotNil(t, ctl2) + cc2, err := ctl2.Accept() + require.NoError(t, err) + require.NotNil(t, cc2) + assert.Equal(t, 2, tpcAcc.incCalledN) + + require.NoError(t, cc1.Close()) + require.NoError(t, cc2.Close()) + assert.Equal(t, 2, tpcClo.incCalledN) + + ctl3 := NewConnectionTrackingListener(l3, tpcAcc, tpcClo) + require.NotNil(t, ctl3) + cc3, err := ctl3.Accept() + require.NoError(t, err) + require.NotNil(t, cc3) + assert.Equal(t, 3, tpcAcc.incCalledN) + + require.NoError(t, cc3.Close()) + assert.Equal(t, 3, tpcClo.incCalledN) + }, + ) +} + func TestNewStatsHandler(t *testing.T) { cases := []struct { name string diff --git a/internal/daemon/worker/internal/metric/cluster_server.go b/internal/daemon/worker/internal/metric/cluster_server.go index 904409c9c9..f9015c62b2 100644 --- a/internal/daemon/worker/internal/metric/cluster_server.go +++ b/internal/daemon/worker/internal/metric/cluster_server.go @@ -2,6 +2,7 @@ package metric import ( "context" + "net" "github.com/hashicorp/boundary/globals" "github.com/hashicorp/boundary/internal/daemon/metric" @@ -24,12 +25,34 @@ var grpcServerRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec Namespace: globals.MetricNamespace, Subsystem: workerClusterSubsystem, Name: "grpc_request_duration_seconds", - Help: "Histogram of latencies for gRPC requests between the a worker server and a worker client.", + Help: "Histogram of latencies for gRPC requests between a worker server and a worker client.", Buckets: prometheus.DefBuckets, }, metric.ListGrpcLabels, ) +// acceptedConnsTotal keeps a count of the total accepted connections to a worker. +var acceptedConnsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: globals.MetricNamespace, + Subsystem: workerClusterSubsystem, + Name: "accepted_connections_total", + Help: "Count of total accepted network connections to this worker.", + }, + []string{metric.LabelConnectionPurpose}, +) + +// closedConnsTotal keeps a count of the total closed connections to a worker. +var closedConnsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: globals.MetricNamespace, + Subsystem: workerClusterSubsystem, + Name: "closed_connections_total", + Help: "Count of total closed network connections to this worker.", + }, + []string{metric.LabelConnectionPurpose}, +) + // All the codes expected to be returned by boundary or the grpc framework to // requests to the cluster server. var expectedGrpcCodes = []codes.Code{ @@ -42,6 +65,11 @@ var expectedGrpcCodes = []codes.Code{ codes.Unavailable, codes.Unauthenticated, } +func InstrumentWorkerClusterTrackingListener(l net.Listener, purpose string) net.Listener { + p := prometheus.Labels{metric.LabelConnectionPurpose: purpose} + return metric.NewConnectionTrackingListener(l, acceptedConnsTotal.With(p), closedConnsTotal.With(p)) +} + // InstrumentClusterStatsHandler returns a gRPC stats.Handler which observes // cluster-specific metrics for a gRPC server. func InstrumentClusterStatsHandler(ctx context.Context) (stats.Handler, error) { @@ -54,3 +82,7 @@ func InstrumentClusterStatsHandler(ctx context.Context) (stats.Handler, error) { func InitializeClusterServerCollectors(r prometheus.Registerer, server *grpc.Server) { metric.InitializeGrpcCollectorsFromServer(r, grpcServerRequestLatency, server, expectedGrpcCodes) } + +func InitializeConnectionCounters(r prometheus.Registerer) { + metric.InitializeConnectionCounters(r, []prometheus.CounterVec{*acceptedConnsTotal, *closedConnsTotal}) +} diff --git a/internal/daemon/worker/listeners.go b/internal/daemon/worker/listeners.go index e091f738ef..cd214a7f45 100644 --- a/internal/daemon/worker/listeners.go +++ b/internal/daemon/worker/listeners.go @@ -196,6 +196,7 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger, } } + metric.InitializeConnectionCounters(w.conf.PrometheusRegisterer) metric.InitializeClusterServerCollectors(w.conf.PrometheusRegisterer, downstreamServer) ln.GrpcServer = downstreamServer @@ -214,9 +215,9 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger, return func() { go w.workerAuthSplitListener.Start() - go httpServer.Serve(proxyListener) - go ln.GrpcServer.Serve(pkiWorkerTrackingListener) - go handleSecondaryConnection(cancelCtx, revPkiWorkerTrackingListener, w.downstreamRoutes, -1) + go httpServer.Serve(metric.InstrumentWorkerClusterTrackingListener(proxyListener, "proxy")) + go ln.GrpcServer.Serve(metric.InstrumentWorkerClusterTrackingListener(pkiWorkerTrackingListener, "grpc")) + go handleSecondaryConnection(cancelCtx, metric.InstrumentWorkerClusterTrackingListener(revPkiWorkerTrackingListener, "reverse-grpc"), w.downstreamRoutes, -1) }, nil }