feat(metric): adds accepted/closed connections counters for controller and worker cluster connections (#2656)

same contents as PR https://github.com/hashicorp/boundary/pull/2593 but this time actually push to main
pull/2660/head
Haotian 4 years ago committed by GitHub
parent 7c2d0692f6
commit 80f72b8511
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,6 +2,7 @@ package metric
import (
"context"
"net"
"github.com/hashicorp/boundary/globals"
"github.com/hashicorp/boundary/internal/daemon/metric"
@ -33,6 +34,28 @@ var grpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec(
metric.ListGrpcLabels,
)
// acceptedConnsTotal keeps a count of the total accepted connections to a controller.
var acceptedConnsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: globals.MetricNamespace,
Subsystem: clusterSubSystem,
Name: "accepted_connections_total",
Help: "Count of total accepted network connections to this controller.",
},
[]string{metric.LabelConnectionPurpose},
)
// closedConnsTotal keeps a count of the total closed connections to a controller.
var closedConnsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: globals.MetricNamespace,
Subsystem: clusterSubSystem,
Name: "closed_connections_total",
Help: "Count of total closed network connections to this controller.",
},
[]string{metric.LabelConnectionPurpose},
)
// All the codes expected to be returned by boundary or the grpc framework to
// requests to the cluster server.
var expectedGrpcCodes = []codes.Code{
@ -45,6 +68,11 @@ var expectedGrpcCodes = []codes.Code{
codes.Unavailable, codes.Unauthenticated,
}
func InstrumentClusterTrackingListener(l net.Listener, purpose string) net.Listener {
p := prometheus.Labels{metric.LabelConnectionPurpose: purpose}
return metric.NewConnectionTrackingListener(l, acceptedConnsTotal.With(p), closedConnsTotal.With(p))
}
// InstrumentClusterStatsHandler returns a gRPC stats.Handler which observes
// cluster specific metrics. Use with the cluster gRPC server.
func InstrumentClusterStatsHandler(ctx context.Context) (stats.Handler, error) {
@ -57,3 +85,7 @@ func InstrumentClusterStatsHandler(ctx context.Context) (stats.Handler, error) {
func InitializeClusterCollectors(r prometheus.Registerer, server *grpc.Server) {
metric.InitializeGrpcCollectorsFromServer(r, grpcRequestLatency, server, expectedGrpcCodes)
}
func InitializeConnectionCounters(r prometheus.Registerer) {
metric.InitializeConnectionCounters(r, []prometheus.CounterVec{*acceptedConnsTotal, *closedConnsTotal})
}

@ -237,6 +237,7 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error
}
}
metric.InitializeConnectionCounters(c.conf.PrometheusRegisterer)
metric.InitializeClusterCollectors(c.conf.PrometheusRegisterer, workerServer)
ln.GrpcServer = workerServer
@ -249,13 +250,13 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error
}
}()
go func() {
err := handleSecondaryConnection(c.baseContext, multiplexingReverseGrpcListener, c.downstreamRoutes, -1)
err := handleSecondaryConnection(c.baseContext, metric.InstrumentClusterTrackingListener(revPkiWorkerTrackingListener, "reverse-grpc"), c.downstreamRoutes, -1)
if err != nil {
event.WriteError(c.baseContext, op, err, event.WithInfoMsg("handleSecondaryConnection error"))
}
}()
go func() {
err := ln.GrpcServer.Serve(multiplexingAuthedListener)
err := ln.GrpcServer.Serve(metric.InstrumentClusterTrackingListener(multiplexingAuthedListener, "grpc"))
if err != nil {
event.WriteError(c.baseContext, op, err, event.WithInfoMsg("multiplexingAuthedListener error"))
}

@ -15,12 +15,13 @@ import (
)
const (
LabelGrpcService = "grpc_service"
LabelGrpcMethod = "grpc_method"
LabelGrpcCode = "grpc_code"
LabelHttpPath = "path"
LabelHttpMethod = "method"
LabelHttpCode = "code"
LabelConnectionPurpose = "purpose"
LabelGrpcService = "grpc_service"
LabelGrpcMethod = "grpc_method"
LabelGrpcCode = "grpc_code"
LabelHttpPath = "path"
LabelHttpMethod = "method"
LabelHttpCode = "code"
invalidPathValue = "invalid"
)
@ -86,6 +87,15 @@ func InitializeGrpcCollectorsFromPackage(r prometheus.Registerer, v prometheus.O
}
}
func InitializeConnectionCounters(r prometheus.Registerer, counters []prometheus.CounterVec) {
if r == nil {
return
}
for _, c := range counters {
r.MustRegister(c)
}
}
// InitializeGrpcCollectorsFromServer registers and zeroes a Prometheus
// histogram, finding all service and method labels from the provided gRPC
// server.

@ -4,7 +4,9 @@ package metric
import (
"context"
"net"
"strings"
"sync"
"time"
"github.com/hashicorp/boundary/internal/errors"
@ -95,6 +97,41 @@ func (r requestRecorder) Record(err error) {
r.reqLatency.With(r.labels).Observe(time.Since(r.start).Seconds())
}
type connectionTrackingListener struct {
net.Listener
acceptedConns prometheus.Counter
closedConns prometheus.Counter
}
func (l *connectionTrackingListener) Accept() (net.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
l.acceptedConns.Inc()
return &connectionTrackingListenerConn{Conn: conn, closedConns: l.closedConns}, nil
}
// NewConnectionTrackingListener registers a new Prometheus gauge with an unique
// connection type label and wraps an existing listener to track when connections
// are accepted and closed.
// Multiple calls to Close() a listener connection will only decrement the gauge
// once. A call to Close() will decrement the gauge even if Close() errors.
func NewConnectionTrackingListener(l net.Listener, ac prometheus.Counter, cc prometheus.Counter) *connectionTrackingListener {
return &connectionTrackingListener{Listener: l, acceptedConns: ac, closedConns: cc}
}
type connectionTrackingListenerConn struct {
net.Conn
dec sync.Once
closedConns prometheus.Counter
}
func (c *connectionTrackingListenerConn) Close() error {
c.dec.Do(func() { c.closedConns.Inc() })
return c.Conn.Close()
}
// StatusFromError retrieves the *status.Status from the provided error. It'll
// attempt to unwrap the *status.Error, which is something status.FromError
// does not do.

@ -2,7 +2,9 @@ package metric
import (
"context"
"errors"
"fmt"
"net"
"testing"
"time"
@ -15,10 +17,12 @@ import (
"google.golang.org/grpc/status"
)
const testSubsystem = "test_metric"
var grpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: globals.MetricNamespace,
Subsystem: "test_metric",
Subsystem: testSubsystem,
Name: "grpc_request_duration_seconds",
Help: "Test histogram.",
Buckets: prometheus.DefBuckets,
@ -26,6 +30,225 @@ var grpcRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec(
ListGrpcLabels,
)
var testAcceptedConns prometheus.CounterVec = *prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: globals.MetricNamespace,
Subsystem: testSubsystem,
Name: "test_accepted_connections_total",
Help: "Test CounterVec.",
},
[]string{LabelConnectionPurpose},
)
var testClosedConns prometheus.CounterVec = *prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: globals.MetricNamespace,
Subsystem: testSubsystem,
Name: "test_closed_connections_total",
Help: "Test CounterVec.",
},
[]string{LabelConnectionPurpose},
)
type testPrometheusCounter struct {
prometheus.Metric
prometheus.Collector
incCalledN int
t *testing.T
}
func (tpg *testPrometheusCounter) Inc() { tpg.incCalledN++ }
func (tpg *testPrometheusCounter) Add(float64) { tpg.t.Fatal("testPrometheusCounter Add() called") }
type testListener struct {
net.Listener
lastClientConn net.Conn
}
func (l *testListener) Accept() (net.Conn, error) {
s, c := net.Pipe()
l.lastClientConn = c
return s, nil
}
func (l *testListener) Close() error {
return l.lastClientConn.Close()
}
type erroringAcceptListener struct {
net.Listener
}
func (l *erroringAcceptListener) Accept() (net.Conn, error) {
return nil, errors.New("error for testcase")
}
type erroringCloseListener struct {
net.Listener
lastClientConn net.Conn
}
func (l *erroringCloseListener) Accept() (net.Conn, error) {
s, c := net.Pipe()
l.lastClientConn = c
return &erroringConn{Conn: s}, nil
}
type erroringConn struct {
net.Conn
}
func (c *erroringConn) Close() error {
c.Conn.Close()
return errors.New("error for testcase")
}
func TestNewConnectionTrackingListener(t *testing.T) {
t.Run("set-label",
func(t *testing.T) {
l := &testListener{}
acceptedConns := testAcceptedConns.With(prometheus.Labels{LabelConnectionPurpose: "test_label"})
closedConns := testClosedConns.With(prometheus.Labels{LabelConnectionPurpose: "test_label"})
ctl := NewConnectionTrackingListener(l, acceptedConns, closedConns)
require.NotNil(t, ctl)
assert.Equal(t, ctl.Listener, l)
cc, err := ctl.Accept()
require.NoError(t, err)
require.NotNil(t, cc)
// check purpose label was populated correctly by attempting to delete it
assert.Equal(t, testAcceptedConns.DeleteLabelValues("test_label"), true)
assert.Equal(t, testClosedConns.DeleteLabelValues("test_label"), true)
require.NoError(t, cc.Close())
})
t.Run("accept-err",
func(t *testing.T) {
tpcAcc := &testPrometheusCounter{t: t}
tpcClo := &testPrometheusCounter{t: t}
el := &erroringAcceptListener{}
ctl := NewConnectionTrackingListener(el, tpcAcc, tpcClo)
require.NotNil(t, ctl)
cc, err := ctl.Accept()
assert.Nil(t, cc)
assert.Contains(t, "error for testcase", err.Error())
assert.Equal(t, 0, tpcAcc.incCalledN)
assert.Equal(t, 0, tpcClo.incCalledN)
})
t.Run("accept-multiple",
func(t *testing.T) {
tpcAcc := &testPrometheusCounter{t: t}
tpcClo := &testPrometheusCounter{t: t}
n := 10
for i := 0; i < n; i++ {
l := &testListener{}
ctl := NewConnectionTrackingListener(l, tpcAcc, tpcClo)
require.NotNil(t, ctl)
cc, err := ctl.Accept()
require.NotNil(t, cc)
require.NoError(t, err)
assert.Equal(t, i+1, tpcAcc.incCalledN)
require.NoError(t, cc.Close())
assert.Equal(t, i+1, tpcClo.incCalledN)
}
})
t.Run("close-err",
func(t *testing.T) {
tpcAcc := &testPrometheusCounter{t: t}
tpcClo := &testPrometheusCounter{t: t}
el := &erroringCloseListener{}
ctl := NewConnectionTrackingListener(el, tpcAcc, tpcClo)
require.NotNil(t, ctl)
cc, err := ctl.Accept()
require.NotNil(t, cc)
require.NoError(t, err)
assert.Equal(t, 1, tpcAcc.incCalledN)
assert.Error(t, cc.Close())
assert.Equal(t, 1, tpcClo.incCalledN)
})
t.Run("close-repeat-calls",
func(t *testing.T) {
tpcAcc := &testPrometheusCounter{t: t}
tpcClo := &testPrometheusCounter{t: t}
l := &testListener{}
ctl := NewConnectionTrackingListener(l, tpcAcc, tpcClo)
require.NotNil(t, ctl)
cc, err := ctl.Accept()
require.Nil(t, err)
require.NotNil(t, cc)
assert.Equal(t, 0, tpcClo.incCalledN)
for i := 0; i <= 5; i++ {
assert.NoError(t, cc.Close())
}
assert.Equal(t, 1, tpcClo.incCalledN)
})
t.Run("inc-dec",
func(t *testing.T) {
tpcAcc := &testPrometheusCounter{t: t}
tpcClo := &testPrometheusCounter{t: t}
l := &testListener{}
ctl := NewConnectionTrackingListener(l, tpcAcc, tpcClo)
require.NotNil(t, ctl)
assert.Equal(t, 0, tpcAcc.incCalledN)
cc, err := ctl.Accept()
require.Nil(t, err)
require.NotNil(t, cc)
assert.Equal(t, 1, tpcAcc.incCalledN)
require.NoError(t, cc.Close())
assert.Equal(t, 1, tpcClo.incCalledN)
},
)
t.Run("more-inc-dec",
func(t *testing.T) {
tpcAcc := &testPrometheusCounter{t: t}
tpcClo := &testPrometheusCounter{t: t}
l1 := &testListener{}
l2 := &testListener{}
l3 := &testListener{}
ctl1 := NewConnectionTrackingListener(l1, tpcAcc, tpcClo)
require.NotNil(t, ctl1)
assert.Equal(t, 0, tpcAcc.incCalledN)
cc1, err := ctl1.Accept()
require.NoError(t, err)
require.NotNil(t, cc1)
assert.Equal(t, 1, tpcAcc.incCalledN)
ctl2 := NewConnectionTrackingListener(l2, tpcAcc, tpcClo)
require.NotNil(t, ctl2)
cc2, err := ctl2.Accept()
require.NoError(t, err)
require.NotNil(t, cc2)
assert.Equal(t, 2, tpcAcc.incCalledN)
require.NoError(t, cc1.Close())
require.NoError(t, cc2.Close())
assert.Equal(t, 2, tpcClo.incCalledN)
ctl3 := NewConnectionTrackingListener(l3, tpcAcc, tpcClo)
require.NotNil(t, ctl3)
cc3, err := ctl3.Accept()
require.NoError(t, err)
require.NotNil(t, cc3)
assert.Equal(t, 3, tpcAcc.incCalledN)
require.NoError(t, cc3.Close())
assert.Equal(t, 3, tpcClo.incCalledN)
},
)
}
func TestNewStatsHandler(t *testing.T) {
cases := []struct {
name string

@ -2,6 +2,7 @@ package metric
import (
"context"
"net"
"github.com/hashicorp/boundary/globals"
"github.com/hashicorp/boundary/internal/daemon/metric"
@ -24,12 +25,34 @@ var grpcServerRequestLatency prometheus.ObserverVec = prometheus.NewHistogramVec
Namespace: globals.MetricNamespace,
Subsystem: workerClusterSubsystem,
Name: "grpc_request_duration_seconds",
Help: "Histogram of latencies for gRPC requests between the a worker server and a worker client.",
Help: "Histogram of latencies for gRPC requests between a worker server and a worker client.",
Buckets: prometheus.DefBuckets,
},
metric.ListGrpcLabels,
)
// acceptedConnsTotal keeps a count of the total accepted connections to a worker.
var acceptedConnsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: globals.MetricNamespace,
Subsystem: workerClusterSubsystem,
Name: "accepted_connections_total",
Help: "Count of total accepted network connections to this worker.",
},
[]string{metric.LabelConnectionPurpose},
)
// closedConnsTotal keeps a count of the total closed connections to a worker.
var closedConnsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: globals.MetricNamespace,
Subsystem: workerClusterSubsystem,
Name: "closed_connections_total",
Help: "Count of total closed network connections to this worker.",
},
[]string{metric.LabelConnectionPurpose},
)
// All the codes expected to be returned by boundary or the grpc framework to
// requests to the cluster server.
var expectedGrpcCodes = []codes.Code{
@ -42,6 +65,11 @@ var expectedGrpcCodes = []codes.Code{
codes.Unavailable, codes.Unauthenticated,
}
func InstrumentWorkerClusterTrackingListener(l net.Listener, purpose string) net.Listener {
p := prometheus.Labels{metric.LabelConnectionPurpose: purpose}
return metric.NewConnectionTrackingListener(l, acceptedConnsTotal.With(p), closedConnsTotal.With(p))
}
// InstrumentClusterStatsHandler returns a gRPC stats.Handler which observes
// cluster-specific metrics for a gRPC server.
func InstrumentClusterStatsHandler(ctx context.Context) (stats.Handler, error) {
@ -54,3 +82,7 @@ func InstrumentClusterStatsHandler(ctx context.Context) (stats.Handler, error) {
func InitializeClusterServerCollectors(r prometheus.Registerer, server *grpc.Server) {
metric.InitializeGrpcCollectorsFromServer(r, grpcServerRequestLatency, server, expectedGrpcCodes)
}
func InitializeConnectionCounters(r prometheus.Registerer) {
metric.InitializeConnectionCounters(r, []prometheus.CounterVec{*acceptedConnsTotal, *closedConnsTotal})
}

@ -196,6 +196,7 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger,
}
}
metric.InitializeConnectionCounters(w.conf.PrometheusRegisterer)
metric.InitializeClusterServerCollectors(w.conf.PrometheusRegisterer, downstreamServer)
ln.GrpcServer = downstreamServer
@ -214,9 +215,9 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger,
return func() {
go w.workerAuthSplitListener.Start()
go httpServer.Serve(proxyListener)
go ln.GrpcServer.Serve(pkiWorkerTrackingListener)
go handleSecondaryConnection(cancelCtx, revPkiWorkerTrackingListener, w.downstreamRoutes, -1)
go httpServer.Serve(metric.InstrumentWorkerClusterTrackingListener(proxyListener, "proxy"))
go ln.GrpcServer.Serve(metric.InstrumentWorkerClusterTrackingListener(pkiWorkerTrackingListener, "grpc"))
go handleSecondaryConnection(cancelCtx, metric.InstrumentWorkerClusterTrackingListener(revPkiWorkerTrackingListener, "reverse-grpc"), w.downstreamRoutes, -1)
}, nil
}

Loading…
Cancel
Save