diff --git a/internal/daemon/worker/controller_connection.go b/internal/daemon/worker/controller_connection.go index 724ce40d97..a01572a2d6 100644 --- a/internal/daemon/worker/controller_connection.go +++ b/internal/daemon/worker/controller_connection.go @@ -275,22 +275,74 @@ func (w *Worker) workerConnectionInfo(addr string) (*structpb.Struct, error) { return st, nil } -// monitorUpstreamConnectionState listens for new state changes from grpc client -// connection and updates the state +// monitorUpstreamConnectionState listens for new state changes from grpc client connection, +// updates the state, and logs significant state transitions for observability func monitorUpstreamConnectionState(ctx context.Context, cc *grpc.ClientConn, connectionState *atomic.Value) { + const op = "worker.monitorUpstreamConnectionState" var state connectivity.State if v := connectionState.Load(); !util.IsNil(v) { state = v.(connectivity.State) } for cc.WaitForStateChange(ctx, state) { + previousState := state state = cc.GetState() // if the client is shutdown, exit function if state == connectivity.Shutdown { + event.WriteSysEvent(ctx, op, "upstream connection shut down", + "upstream_address", cc.Target(), + "previous_state", previousState.String(), + "current_state", state.String(), + ) return } connectionState.Store(state) + + switch state { + case connectivity.TransientFailure: + event.WriteError(ctx, op, + fmt.Errorf("upstream connection entered transient failure state; worker cannot reach upstream"), + event.WithInfo( + "upstream_address", cc.Target(), + "previous_state", previousState.String(), + "current_state", state.String(), + ), + ) + case connectivity.Connecting: + if previousState == connectivity.Ready || previousState == connectivity.TransientFailure { + event.WriteSysEvent(ctx, op, "upstream connection is reconnecting", + "upstream_address", cc.Target(), + "previous_state", previousState.String(), + "current_state", state.String(), + ) + } + case connectivity.Ready: + if previousState != connectivity.Ready { + event.WriteSysEvent(ctx, op, "upstream connection is ready", + "upstream_address", cc.Target(), + "previous_state", previousState.String(), + "current_state", state.String(), + ) + } + case connectivity.Idle: + if previousState == connectivity.Ready { + event.WriteSysEvent(ctx, op, "upstream connection has become idle", + "upstream_address", cc.Target(), + "previous_state", previousState.String(), + "current_state", state.String(), + ) + } + default: + event.WriteError(ctx, op, + fmt.Errorf("upstream connection entered unexpected state"), + event.WithInfo( + "upstream_address", cc.Target(), + "previous_state", previousState.String(), + "current_state", state.String(), + ), + ) + } } }