feat: Worker Connectivity Health Check (#3650)

* feat: Worker Connectivity Health Check

Add worker upstream connection state. API calls to the worker's ops `/heath?worker_info=1` endpoint should now return a new filed called `upstream_connection_state`

```
{
   "worker_process_info":{
      "state":"active",
      "active_session_count":0,
      "upstream_connection_state":"READY"
   }
}
```

The states returned will be GRPC connection states: https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html

There could be multiple upstream addresses but GRPC manual resolver determines the state of the connection.
jimlambrt-make-gen
Elim Tsiagbey 3 years ago committed by GitHub
parent c55df52d44
commit fabd4093de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -38,6 +38,7 @@ import (
"github.com/mitchellh/cli"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/connectivity"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/wrapperspb"
)
@ -723,8 +724,9 @@ func TestCreateOpsHandler(t *testing.T) {
pbResp := &pbs.GetHealthResponse{}
require.NoError(t, jsonpb.Unmarshal(rsp.Body, pbResp))
want := &pbs.GetHealthResponse{WorkerProcessInfo: &pbhealth.HealthInfo{
State: server.ActiveOperationalState.String(),
ActiveSessionCount: wrapperspb.UInt32(0),
State: server.ActiveOperationalState.String(),
ActiveSessionCount: wrapperspb.UInt32(0),
UpstreamConnectionState: connectivity.TransientFailure.String(),
}}
assert.Empty(t, cmp.Diff(want, pbResp, protocmp.Transform()))
},
@ -747,8 +749,9 @@ func TestCreateOpsHandler(t *testing.T) {
pbResp := &pbs.GetHealthResponse{}
require.NoError(t, jsonpb.Unmarshal(rsp.Body, pbResp))
want := &pbs.GetHealthResponse{WorkerProcessInfo: &pbhealth.HealthInfo{
State: server.ActiveOperationalState.String(),
ActiveSessionCount: wrapperspb.UInt32(0),
State: server.ActiveOperationalState.String(),
ActiveSessionCount: wrapperspb.UInt32(0),
UpstreamConnectionState: connectivity.TransientFailure.String(),
}}
assert.Empty(t, cmp.Diff(want, pbResp, protocmp.Transform()))
},

@ -19,6 +19,7 @@ import (
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
"github.com/hashicorp/boundary/globals"
@ -30,6 +31,7 @@ import (
pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/hashicorp/boundary/internal/observability/event"
"github.com/hashicorp/boundary/internal/server"
"github.com/hashicorp/boundary/internal/util"
"github.com/hashicorp/boundary/version"
"github.com/hashicorp/go-secure-stdlib/base62"
"github.com/hashicorp/nodeenrollment"
@ -38,6 +40,7 @@ import (
"github.com/hashicorp/nodeenrollment/util/toggledlogger"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/proto"
@ -211,6 +214,44 @@ func (w *Worker) v1KmsAuthDialFn(ctx context.Context, addr string, extraAlpnProt
func (w *Worker) createClientConn(addr string) error {
const op = "worker.(Worker).createClientConn"
var res resolver.Builder
for _, v := range w.addressReceivers {
if rec, ok := v.(*grpcResolverReceiver); ok {
res = rec.Resolver
}
}
if res == nil {
return errors.New(w.baseContext, errors.Internal, op, "unable to find a resolver.Builder amongst the address receivers")
}
dialOpts := createDefaultGRPCDialOptions(res, w.upstreamDialerFunc())
cc, err := grpc.DialContext(w.baseContext,
fmt.Sprintf("%s:///%s", res.Scheme(), addr),
dialOpts...,
)
if err != nil {
return fmt.Errorf("error dialing controller for worker auth: %w", err)
}
w.GrpcClientConn = cc
w.controllerMultihopConn.Store(multihop.NewMultihopServiceClient(cc))
var producer handlers.UpstreamMessageServiceClientProducer
producer = func(context.Context) (pbs.UpstreamMessageServiceClient, error) {
return pbs.NewUpstreamMessageServiceClient(cc), nil
}
w.controllerUpstreamMsgConn.Store(&producer)
go monitorUpstreamConnectionState(w.baseContext, cc, w.upstreamConnectionState)
return nil
}
// createDefaultGRPCDialOptions creates grpc.DialOption using default options
func createDefaultGRPCDialOptions(res resolver.Builder, upstreamDialerFn func(context.Context, string) (net.Conn, error)) []grpc.DialOption {
defaultTimeout := (time.Second + time.Nanosecond).String()
defServiceConfig := fmt.Sprintf(`
{
@ -224,21 +265,13 @@ func (w *Worker) createClientConn(addr string) error {
]
}
`, defaultTimeout)
var res resolver.Builder
for _, v := range w.addressReceivers {
if rec, ok := v.(*grpcResolverReceiver); ok {
res = rec.Resolver
}
}
if res == nil {
return errors.New(w.baseContext, errors.Internal, op, "unable to find a resolver.Builder amongst the address receivers")
}
dialOpts := []grpc.DialOption{
grpc.WithResolvers(res),
grpc.WithUnaryInterceptor(metric.InstrumentClusterClient()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(math.MaxInt32)),
grpc.WithContextDialer(w.upstreamDialerFunc()),
grpc.WithContextDialer(upstreamDialerFn),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(defServiceConfig),
// Don't have the resolver reach out for a service config from the
@ -253,25 +286,8 @@ func (w *Worker) createClientConn(addr string) error {
},
}),
}
cc, err := grpc.DialContext(w.baseContext,
fmt.Sprintf("%s:///%s", res.Scheme(), addr),
dialOpts...,
)
if err != nil {
return fmt.Errorf("error dialing controller for worker auth: %w", err)
}
w.GrpcClientConn = cc
w.controllerMultihopConn.Store(multihop.NewMultihopServiceClient(cc))
var producer handlers.UpstreamMessageServiceClientProducer
producer = func(context.Context) (pbs.UpstreamMessageServiceClient, error) {
return pbs.NewUpstreamMessageServiceClient(cc), nil
}
w.controllerUpstreamMsgConn.Store(&producer)
return nil
return dialOpts
}
func (w *Worker) workerAuthTLSConfig(extraAlpnProtos ...string) (*tls.Config, *base.WorkerAuthInfo, error) {
@ -403,3 +419,23 @@ func (w *Worker) workerConnectionInfo(addr string) (*structpb.Struct, error) {
}
return st, nil
}
// monitorUpstreamConnectionState listens for new state changes from grpc client
// connection and updates the state
func monitorUpstreamConnectionState(ctx context.Context, cc *grpc.ClientConn, connectionState *atomic.Value) {
var state connectivity.State
if v := connectionState.Load(); !util.IsNil(v) {
state = v.(connectivity.State)
}
for cc.WaitForStateChange(ctx, state) {
newState := cc.GetState()
// if the client is shutdown, exit function
if newState == connectivity.Shutdown {
return
}
connectionState.Store(newState)
}
}

@ -0,0 +1,144 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package worker
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"testing"
"time"
opsservices "github.com/hashicorp/boundary/internal/gen/ops/services"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/nettest"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver/manual"
)
func TestMonitorUpstreamConnectionState(t *testing.T) {
ctx := context.Background()
stateCtx, cancelStateCtx := context.WithCancel(ctx)
upstreamConnectionState := new(atomic.Value)
servers, err := createTestServers(t)
require.NoError(t, err)
scheme := strconv.FormatInt(time.Now().UnixNano(), 36)
res := manual.NewBuilderWithScheme(scheme)
grpcResolver := &grpcResolverReceiver{res}
dialOpts := createDefaultGRPCDialOptions(res, nil)
cc, err := grpc.Dial(
fmt.Sprintf("%s:///%s", res.Scheme(), servers[0].address),
dialOpts...,
)
require.NoError(t, err)
// track GRPC state changes
go monitorUpstreamConnectionState(stateCtx, cc, upstreamConnectionState)
grpcResolver.InitialAddresses([]string{servers[0].address})
t.Cleanup(func() {
cc.Close()
cancelStateCtx()
assert.Equal(t, connectivity.Shutdown, cc.GetState())
for _, s := range servers {
s.srv.GracefulStop()
}
})
tests := []struct {
name string
expectedResponse *opsservices.GetHealthResponse
addresses []string
expectedState connectivity.State
}{
{
name: "connection with 1 good address",
addresses: []string{servers[0].address},
expectedState: connectivity.Ready,
},
{
name: "connection with multiple good addresses",
addresses: []string{servers[1].address, servers[2].address},
expectedState: connectivity.Ready,
},
{
name: "connection with bad address",
addresses: []string{"bad_address"},
expectedState: connectivity.TransientFailure,
},
{
name: "connection with 1 bad address and 1 good address",
addresses: []string{servers[0].address, "bad_address"},
expectedState: connectivity.Ready,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
doneWait := make(chan struct{})
grpcResolver.SetAddresses(tt.addresses)
go waitForConnectionStateCondition(upstreamConnectionState, tt.expectedState, doneWait)
select {
case <-doneWait:
// The connection condition was met, Proceed with assertions
case <-time.After(2 * time.Second):
t.Error("Time out waiting for condition")
}
got := upstreamConnectionState.Load()
assert.Equal(t, tt.expectedState, got)
})
}
}
func waitForConnectionStateCondition(upstreamConnectionState *atomic.Value, expectedValue connectivity.State, ch chan<- struct{}) {
for {
currentValue := upstreamConnectionState.Load()
if expectedValue == currentValue {
ch <- struct{}{}
return
}
// Small delay for checking state
time.Sleep(time.Millisecond)
}
}
type serverTestInfo struct {
srv *grpc.Server
address string
}
func createTestServers(t *testing.T) ([]*serverTestInfo, error) {
serverCount := 4
servers := make([]*serverTestInfo, 0, serverCount)
for i := 0; i < serverCount; i++ {
listener, err := nettest.NewLocalListener("tcp")
if err != nil {
return nil, err
}
srv := grpc.NewServer()
lInfo := &serverTestInfo{srv: srv, address: listener.Addr().String()}
servers = append(servers, lInfo)
go func(i int) {
servers[i].srv.Serve(listener)
}(i)
}
return servers, nil
}

@ -14,6 +14,7 @@ import (
pbhealth "github.com/hashicorp/boundary/internal/gen/worker/health"
"github.com/hashicorp/boundary/internal/server"
"github.com/hashicorp/boundary/internal/util"
"google.golang.org/grpc/connectivity"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/wrapperspb"
)
@ -50,12 +51,19 @@ func (w workerHealthServer) GetHealth(ctx context.Context, req *opsservices.GetH
// HealthInformation returns the current worker process health information.
func (w *Worker) HealthInformation() *pbhealth.HealthInfo {
state := server.UnknownOperationalState
operationalState := server.UnknownOperationalState
if v := w.operationalState.Load(); !util.IsNil(v) {
state = v.(server.OperationalState)
operationalState = v.(server.OperationalState)
}
var upstreamConnectionState connectivity.State
if v := w.upstreamConnectionState.Load(); !util.IsNil(v) {
upstreamConnectionState = v.(connectivity.State)
}
healthInfo := &pbhealth.HealthInfo{
State: state.String(),
State: operationalState.String(),
UpstreamConnectionState: upstreamConnectionState.String(),
}
if w.sessionManager == nil {

@ -79,7 +79,14 @@ func TestGetHealth(t *testing.T) {
resp := &opsservices.GetHealthResponse{}
require.NoError(t, healthCheckMarshaler.Unmarshal(b, resp))
assert.Empty(t, cmp.Diff(tt.expectedResponse, resp, protocmp.Transform()))
assert.Empty(t,
cmp.Diff(
tt.expectedResponse,
resp,
protocmp.Transform(),
protocmp.IgnoreFields(&pbhealth.HealthInfo{}, "upstream_connection_state"),
),
)
})
}
}

@ -127,10 +127,11 @@ type Worker struct {
recorderManager recorderManager
everAuthenticated *ua.Uint32
lastStatusSuccess *atomic.Value
workerStartTime time.Time
operationalState *atomic.Value
everAuthenticated *ua.Uint32
lastStatusSuccess *atomic.Value
workerStartTime time.Time
operationalState *atomic.Value
upstreamConnectionState *atomic.Value
controllerMultihopConn *atomic.Value
@ -207,6 +208,7 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
pkiConnManager: cluster.NewDownstreamManager(),
successfulStatusGracePeriod: new(atomic.Int64),
statusCallTimeoutDuration: new(atomic.Int64),
upstreamConnectionState: new(atomic.Value),
}
w.operationalState.Store(server.UnknownOperationalState)

@ -29,9 +29,10 @@ type HealthInfo struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
State string `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
ActiveSessionCount *wrapperspb.UInt32Value `protobuf:"bytes,2,opt,name=active_session_count,json=active_connection_count,proto3" json:"active_session_count,omitempty"`
SessionConnections map[string]uint32 `protobuf:"bytes,3,rep,name=session_connections,proto3" json:"session_connections,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
State string `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
ActiveSessionCount *wrapperspb.UInt32Value `protobuf:"bytes,2,opt,name=active_session_count,json=active_connection_count,proto3" json:"active_session_count,omitempty"`
SessionConnections map[string]uint32 `protobuf:"bytes,3,rep,name=session_connections,proto3" json:"session_connections,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
UpstreamConnectionState string `protobuf:"bytes,4,opt,name=upstream_connection_state,proto3" json:"upstream_connection_state,omitempty"`
}
func (x *HealthInfo) Reset() {
@ -87,6 +88,13 @@ func (x *HealthInfo) GetSessionConnections() map[string]uint32 {
return nil
}
func (x *HealthInfo) GetUpstreamConnectionState() string {
if x != nil {
return x.UpstreamConnectionState
}
return ""
}
var File_worker_health_v1_health_service_proto protoreflect.FileDescriptor
var file_worker_health_v1_health_service_proto_rawDesc = []byte{
@ -95,7 +103,7 @@ var file_worker_health_v1_health_service_proto_rawDesc = []byte{
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x2e,
0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70,
0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa6, 0x02, 0x0a, 0x0a, 0x48, 0x65,
0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe4, 0x02, 0x0a, 0x0a, 0x48, 0x65,
0x61, 0x6c, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74,
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x53,
0x0a, 0x14, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e,
@ -109,16 +117,20 @@ var file_worker_health_v1_health_service_proto_rawDesc = []byte{
0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x53,
0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x13, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f,
0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x45, 0x0a, 0x17, 0x53,
0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02,
0x38, 0x01, 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64,
0x61, 0x72, 0x79, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e,
0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x3b, 0x68,
0x65, 0x61, 0x6c, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x3c, 0x0a, 0x19, 0x75,
0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69,
0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x19,
0x75, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74,
0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x1a, 0x45, 0x0a, 0x17, 0x53, 0x65, 0x73,
0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45,
0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68,
0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72,
0x79, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x77,
0x6f, 0x72, 0x6b, 0x65, 0x72, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x3b, 0x68, 0x65, 0x61,
0x6c, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

@ -13,4 +13,5 @@ message HealthInfo {
string state = 1;
google.protobuf.UInt32Value active_session_count = 2 [json_name = "active_connection_count"];
map<string, uint32> session_connections = 3 [json_name = "session_connections"];
string upstream_connection_state = 4 [json_name = "upstream_connection_state"];
}

Loading…
Cancel
Save