From fabd4093deb803e31bf9a2d8a0f1b8fc85a5bd75 Mon Sep 17 00:00:00 2001 From: Elim Tsiagbey Date: Fri, 1 Sep 2023 13:42:41 -0400 Subject: [PATCH] feat: Worker Connectivity Health Check (#3650) * feat: Worker Connectivity Health Check Add worker upstream connection state. API calls to the worker's ops `/heath?worker_info=1` endpoint should now return a new filed called `upstream_connection_state` ``` { "worker_process_info":{ "state":"active", "active_session_count":0, "upstream_connection_state":"READY" } } ``` The states returned will be GRPC connection states: https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html There could be multiple upstream addresses but GRPC manual resolver determines the state of the connection. --- internal/cmd/ops/server_test.go | 11 +- .../daemon/worker/controller_connection.go | 92 +++++++---- .../worker/controller_connection_test.go | 144 ++++++++++++++++++ internal/daemon/worker/health.go | 14 +- internal/daemon/worker/health_test.go | 9 +- internal/daemon/worker/worker.go | 10 +- .../gen/worker/health/health_service.pb.go | 40 +++-- .../worker/health/v1/health_service.proto | 1 + 8 files changed, 267 insertions(+), 54 deletions(-) create mode 100644 internal/daemon/worker/controller_connection_test.go diff --git a/internal/cmd/ops/server_test.go b/internal/cmd/ops/server_test.go index 7af33767a3..6fe324e236 100644 --- a/internal/cmd/ops/server_test.go +++ b/internal/cmd/ops/server_test.go @@ -38,6 +38,7 @@ import ( "github.com/mitchellh/cli" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc/connectivity" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -723,8 +724,9 @@ func TestCreateOpsHandler(t *testing.T) { pbResp := &pbs.GetHealthResponse{} require.NoError(t, jsonpb.Unmarshal(rsp.Body, pbResp)) want := &pbs.GetHealthResponse{WorkerProcessInfo: &pbhealth.HealthInfo{ - State: server.ActiveOperationalState.String(), - ActiveSessionCount: wrapperspb.UInt32(0), + State: server.ActiveOperationalState.String(), + ActiveSessionCount: wrapperspb.UInt32(0), + UpstreamConnectionState: connectivity.TransientFailure.String(), }} assert.Empty(t, cmp.Diff(want, pbResp, protocmp.Transform())) }, @@ -747,8 +749,9 @@ func TestCreateOpsHandler(t *testing.T) { pbResp := &pbs.GetHealthResponse{} require.NoError(t, jsonpb.Unmarshal(rsp.Body, pbResp)) want := &pbs.GetHealthResponse{WorkerProcessInfo: &pbhealth.HealthInfo{ - State: server.ActiveOperationalState.String(), - ActiveSessionCount: wrapperspb.UInt32(0), + State: server.ActiveOperationalState.String(), + ActiveSessionCount: wrapperspb.UInt32(0), + UpstreamConnectionState: connectivity.TransientFailure.String(), }} assert.Empty(t, cmp.Diff(want, pbResp, protocmp.Transform())) }, diff --git a/internal/daemon/worker/controller_connection.go b/internal/daemon/worker/controller_connection.go index ecf27d9218..3505db6fdb 100644 --- a/internal/daemon/worker/controller_connection.go +++ b/internal/daemon/worker/controller_connection.go @@ -19,6 +19,7 @@ import ( "os" "path/filepath" "strings" + "sync/atomic" "time" "github.com/hashicorp/boundary/globals" @@ -30,6 +31,7 @@ import ( pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" "github.com/hashicorp/boundary/internal/observability/event" "github.com/hashicorp/boundary/internal/server" + "github.com/hashicorp/boundary/internal/util" "github.com/hashicorp/boundary/version" "github.com/hashicorp/go-secure-stdlib/base62" "github.com/hashicorp/nodeenrollment" @@ -38,6 +40,7 @@ import ( "github.com/hashicorp/nodeenrollment/util/toggledlogger" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/resolver" "google.golang.org/protobuf/proto" @@ -211,6 +214,44 @@ func (w *Worker) v1KmsAuthDialFn(ctx context.Context, addr string, extraAlpnProt func (w *Worker) createClientConn(addr string) error { const op = "worker.(Worker).createClientConn" + + var res resolver.Builder + for _, v := range w.addressReceivers { + if rec, ok := v.(*grpcResolverReceiver); ok { + res = rec.Resolver + } + } + if res == nil { + return errors.New(w.baseContext, errors.Internal, op, "unable to find a resolver.Builder amongst the address receivers") + } + + dialOpts := createDefaultGRPCDialOptions(res, w.upstreamDialerFunc()) + + cc, err := grpc.DialContext(w.baseContext, + fmt.Sprintf("%s:///%s", res.Scheme(), addr), + dialOpts..., + ) + if err != nil { + return fmt.Errorf("error dialing controller for worker auth: %w", err) + } + + w.GrpcClientConn = cc + w.controllerMultihopConn.Store(multihop.NewMultihopServiceClient(cc)) + + var producer handlers.UpstreamMessageServiceClientProducer + producer = func(context.Context) (pbs.UpstreamMessageServiceClient, error) { + return pbs.NewUpstreamMessageServiceClient(cc), nil + } + + w.controllerUpstreamMsgConn.Store(&producer) + + go monitorUpstreamConnectionState(w.baseContext, cc, w.upstreamConnectionState) + + return nil +} + +// createDefaultGRPCDialOptions creates grpc.DialOption using default options +func createDefaultGRPCDialOptions(res resolver.Builder, upstreamDialerFn func(context.Context, string) (net.Conn, error)) []grpc.DialOption { defaultTimeout := (time.Second + time.Nanosecond).String() defServiceConfig := fmt.Sprintf(` { @@ -224,21 +265,13 @@ func (w *Worker) createClientConn(addr string) error { ] } `, defaultTimeout) - var res resolver.Builder - for _, v := range w.addressReceivers { - if rec, ok := v.(*grpcResolverReceiver); ok { - res = rec.Resolver - } - } - if res == nil { - return errors.New(w.baseContext, errors.Internal, op, "unable to find a resolver.Builder amongst the address receivers") - } + dialOpts := []grpc.DialOption{ grpc.WithResolvers(res), grpc.WithUnaryInterceptor(metric.InstrumentClusterClient()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(math.MaxInt32)), - grpc.WithContextDialer(w.upstreamDialerFunc()), + grpc.WithContextDialer(upstreamDialerFn), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(defServiceConfig), // Don't have the resolver reach out for a service config from the @@ -253,25 +286,8 @@ func (w *Worker) createClientConn(addr string) error { }, }), } - cc, err := grpc.DialContext(w.baseContext, - fmt.Sprintf("%s:///%s", res.Scheme(), addr), - dialOpts..., - ) - if err != nil { - return fmt.Errorf("error dialing controller for worker auth: %w", err) - } - w.GrpcClientConn = cc - w.controllerMultihopConn.Store(multihop.NewMultihopServiceClient(cc)) - - var producer handlers.UpstreamMessageServiceClientProducer - producer = func(context.Context) (pbs.UpstreamMessageServiceClient, error) { - return pbs.NewUpstreamMessageServiceClient(cc), nil - } - - w.controllerUpstreamMsgConn.Store(&producer) - - return nil + return dialOpts } func (w *Worker) workerAuthTLSConfig(extraAlpnProtos ...string) (*tls.Config, *base.WorkerAuthInfo, error) { @@ -403,3 +419,23 @@ func (w *Worker) workerConnectionInfo(addr string) (*structpb.Struct, error) { } return st, nil } + +// monitorUpstreamConnectionState listens for new state changes from grpc client +// connection and updates the state +func monitorUpstreamConnectionState(ctx context.Context, cc *grpc.ClientConn, connectionState *atomic.Value) { + var state connectivity.State + if v := connectionState.Load(); !util.IsNil(v) { + state = v.(connectivity.State) + } + + for cc.WaitForStateChange(ctx, state) { + newState := cc.GetState() + + // if the client is shutdown, exit function + if newState == connectivity.Shutdown { + return + } + + connectionState.Store(newState) + } +} diff --git a/internal/daemon/worker/controller_connection_test.go b/internal/daemon/worker/controller_connection_test.go new file mode 100644 index 0000000000..b15c092786 --- /dev/null +++ b/internal/daemon/worker/controller_connection_test.go @@ -0,0 +1,144 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package worker + +import ( + "context" + "fmt" + "strconv" + "sync/atomic" + "testing" + "time" + + opsservices "github.com/hashicorp/boundary/internal/gen/ops/services" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver/manual" +) + +func TestMonitorUpstreamConnectionState(t *testing.T) { + ctx := context.Background() + stateCtx, cancelStateCtx := context.WithCancel(ctx) + + upstreamConnectionState := new(atomic.Value) + + servers, err := createTestServers(t) + require.NoError(t, err) + + scheme := strconv.FormatInt(time.Now().UnixNano(), 36) + res := manual.NewBuilderWithScheme(scheme) + grpcResolver := &grpcResolverReceiver{res} + + dialOpts := createDefaultGRPCDialOptions(res, nil) + + cc, err := grpc.Dial( + fmt.Sprintf("%s:///%s", res.Scheme(), servers[0].address), + dialOpts..., + ) + + require.NoError(t, err) + + // track GRPC state changes + go monitorUpstreamConnectionState(stateCtx, cc, upstreamConnectionState) + + grpcResolver.InitialAddresses([]string{servers[0].address}) + + t.Cleanup(func() { + cc.Close() + cancelStateCtx() + + assert.Equal(t, connectivity.Shutdown, cc.GetState()) + + for _, s := range servers { + s.srv.GracefulStop() + } + }) + + tests := []struct { + name string + expectedResponse *opsservices.GetHealthResponse + addresses []string + expectedState connectivity.State + }{ + { + name: "connection with 1 good address", + addresses: []string{servers[0].address}, + expectedState: connectivity.Ready, + }, + { + name: "connection with multiple good addresses", + addresses: []string{servers[1].address, servers[2].address}, + expectedState: connectivity.Ready, + }, + { + name: "connection with bad address", + addresses: []string{"bad_address"}, + expectedState: connectivity.TransientFailure, + }, + { + name: "connection with 1 bad address and 1 good address", + addresses: []string{servers[0].address, "bad_address"}, + expectedState: connectivity.Ready, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + doneWait := make(chan struct{}) + grpcResolver.SetAddresses(tt.addresses) + + go waitForConnectionStateCondition(upstreamConnectionState, tt.expectedState, doneWait) + + select { + case <-doneWait: + // The connection condition was met, Proceed with assertions + case <-time.After(2 * time.Second): + t.Error("Time out waiting for condition") + } + + got := upstreamConnectionState.Load() + assert.Equal(t, tt.expectedState, got) + }) + } +} + +func waitForConnectionStateCondition(upstreamConnectionState *atomic.Value, expectedValue connectivity.State, ch chan<- struct{}) { + for { + currentValue := upstreamConnectionState.Load() + if expectedValue == currentValue { + ch <- struct{}{} + return + } + // Small delay for checking state + time.Sleep(time.Millisecond) + } +} + +type serverTestInfo struct { + srv *grpc.Server + address string +} + +func createTestServers(t *testing.T) ([]*serverTestInfo, error) { + serverCount := 4 + + servers := make([]*serverTestInfo, 0, serverCount) + for i := 0; i < serverCount; i++ { + listener, err := nettest.NewLocalListener("tcp") + if err != nil { + return nil, err + } + srv := grpc.NewServer() + lInfo := &serverTestInfo{srv: srv, address: listener.Addr().String()} + servers = append(servers, lInfo) + go func(i int) { + servers[i].srv.Serve(listener) + }(i) + } + + return servers, nil +} diff --git a/internal/daemon/worker/health.go b/internal/daemon/worker/health.go index 28961189f9..185b10297a 100644 --- a/internal/daemon/worker/health.go +++ b/internal/daemon/worker/health.go @@ -14,6 +14,7 @@ import ( pbhealth "github.com/hashicorp/boundary/internal/gen/worker/health" "github.com/hashicorp/boundary/internal/server" "github.com/hashicorp/boundary/internal/util" + "google.golang.org/grpc/connectivity" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -50,12 +51,19 @@ func (w workerHealthServer) GetHealth(ctx context.Context, req *opsservices.GetH // HealthInformation returns the current worker process health information. func (w *Worker) HealthInformation() *pbhealth.HealthInfo { - state := server.UnknownOperationalState + operationalState := server.UnknownOperationalState if v := w.operationalState.Load(); !util.IsNil(v) { - state = v.(server.OperationalState) + operationalState = v.(server.OperationalState) } + + var upstreamConnectionState connectivity.State + if v := w.upstreamConnectionState.Load(); !util.IsNil(v) { + upstreamConnectionState = v.(connectivity.State) + } + healthInfo := &pbhealth.HealthInfo{ - State: state.String(), + State: operationalState.String(), + UpstreamConnectionState: upstreamConnectionState.String(), } if w.sessionManager == nil { diff --git a/internal/daemon/worker/health_test.go b/internal/daemon/worker/health_test.go index 6176e73557..a2a7311afa 100644 --- a/internal/daemon/worker/health_test.go +++ b/internal/daemon/worker/health_test.go @@ -79,7 +79,14 @@ func TestGetHealth(t *testing.T) { resp := &opsservices.GetHealthResponse{} require.NoError(t, healthCheckMarshaler.Unmarshal(b, resp)) - assert.Empty(t, cmp.Diff(tt.expectedResponse, resp, protocmp.Transform())) + assert.Empty(t, + cmp.Diff( + tt.expectedResponse, + resp, + protocmp.Transform(), + protocmp.IgnoreFields(&pbhealth.HealthInfo{}, "upstream_connection_state"), + ), + ) }) } } diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index 5f28cbfb2e..972c2141c2 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -127,10 +127,11 @@ type Worker struct { recorderManager recorderManager - everAuthenticated *ua.Uint32 - lastStatusSuccess *atomic.Value - workerStartTime time.Time - operationalState *atomic.Value + everAuthenticated *ua.Uint32 + lastStatusSuccess *atomic.Value + workerStartTime time.Time + operationalState *atomic.Value + upstreamConnectionState *atomic.Value controllerMultihopConn *atomic.Value @@ -207,6 +208,7 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { pkiConnManager: cluster.NewDownstreamManager(), successfulStatusGracePeriod: new(atomic.Int64), statusCallTimeoutDuration: new(atomic.Int64), + upstreamConnectionState: new(atomic.Value), } w.operationalState.Store(server.UnknownOperationalState) diff --git a/internal/gen/worker/health/health_service.pb.go b/internal/gen/worker/health/health_service.pb.go index cb6d178bef..a0e70eb1b3 100644 --- a/internal/gen/worker/health/health_service.pb.go +++ b/internal/gen/worker/health/health_service.pb.go @@ -29,9 +29,10 @@ type HealthInfo struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - State string `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` - ActiveSessionCount *wrapperspb.UInt32Value `protobuf:"bytes,2,opt,name=active_session_count,json=active_connection_count,proto3" json:"active_session_count,omitempty"` - SessionConnections map[string]uint32 `protobuf:"bytes,3,rep,name=session_connections,proto3" json:"session_connections,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + State string `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` + ActiveSessionCount *wrapperspb.UInt32Value `protobuf:"bytes,2,opt,name=active_session_count,json=active_connection_count,proto3" json:"active_session_count,omitempty"` + SessionConnections map[string]uint32 `protobuf:"bytes,3,rep,name=session_connections,proto3" json:"session_connections,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + UpstreamConnectionState string `protobuf:"bytes,4,opt,name=upstream_connection_state,proto3" json:"upstream_connection_state,omitempty"` } func (x *HealthInfo) Reset() { @@ -87,6 +88,13 @@ func (x *HealthInfo) GetSessionConnections() map[string]uint32 { return nil } +func (x *HealthInfo) GetUpstreamConnectionState() string { + if x != nil { + return x.UpstreamConnectionState + } + return "" +} + var File_worker_health_v1_health_service_proto protoreflect.FileDescriptor var file_worker_health_v1_health_service_proto_rawDesc = []byte{ @@ -95,7 +103,7 @@ var file_worker_health_v1_health_service_proto_rawDesc = []byte{ 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, - 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa6, 0x02, 0x0a, 0x0a, 0x48, 0x65, + 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe4, 0x02, 0x0a, 0x0a, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x53, 0x0a, 0x14, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, @@ -109,16 +117,20 @@ var file_worker_health_v1_health_service_proto_rawDesc = []byte{ 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x13, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, - 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x45, 0x0a, 0x17, 0x53, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, - 0x38, 0x01, 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64, - 0x61, 0x72, 0x79, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, - 0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x3b, 0x68, - 0x65, 0x61, 0x6c, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x3c, 0x0a, 0x19, 0x75, + 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x19, + 0x75, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x1a, 0x45, 0x0a, 0x17, 0x53, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, + 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, + 0x79, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x77, + 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x3b, 0x68, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/proto/worker/health/v1/health_service.proto b/internal/proto/worker/health/v1/health_service.proto index 00869e1e35..97208e6245 100644 --- a/internal/proto/worker/health/v1/health_service.proto +++ b/internal/proto/worker/health/v1/health_service.proto @@ -13,4 +13,5 @@ message HealthInfo { string state = 1; google.protobuf.UInt32Value active_session_count = 2 [json_name = "active_connection_count"]; map session_connections = 3 [json_name = "session_connections"]; + string upstream_connection_state = 4 [json_name = "upstream_connection_state"]; }