|
|
|
|
@ -275,22 +275,74 @@ func (w *Worker) workerConnectionInfo(addr string) (*structpb.Struct, error) {
|
|
|
|
|
return st, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// monitorUpstreamConnectionState listens for new state changes from grpc client
|
|
|
|
|
// connection and updates the state
|
|
|
|
|
// monitorUpstreamConnectionState listens for new state changes from grpc client connection,
|
|
|
|
|
// updates the state, and logs significant state transitions for observability
|
|
|
|
|
func monitorUpstreamConnectionState(ctx context.Context, cc *grpc.ClientConn, connectionState *atomic.Value) {
|
|
|
|
|
const op = "worker.monitorUpstreamConnectionState"
|
|
|
|
|
var state connectivity.State
|
|
|
|
|
if v := connectionState.Load(); !util.IsNil(v) {
|
|
|
|
|
state = v.(connectivity.State)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for cc.WaitForStateChange(ctx, state) {
|
|
|
|
|
previousState := state
|
|
|
|
|
state = cc.GetState()
|
|
|
|
|
|
|
|
|
|
// if the client is shutdown, exit function
|
|
|
|
|
if state == connectivity.Shutdown {
|
|
|
|
|
event.WriteSysEvent(ctx, op, "upstream connection shut down",
|
|
|
|
|
"upstream_address", cc.Target(),
|
|
|
|
|
"previous_state", previousState.String(),
|
|
|
|
|
"current_state", state.String(),
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
connectionState.Store(state)
|
|
|
|
|
|
|
|
|
|
switch state {
|
|
|
|
|
case connectivity.TransientFailure:
|
|
|
|
|
event.WriteError(ctx, op,
|
|
|
|
|
fmt.Errorf("upstream connection entered transient failure state; worker cannot reach upstream"),
|
|
|
|
|
event.WithInfo(
|
|
|
|
|
"upstream_address", cc.Target(),
|
|
|
|
|
"previous_state", previousState.String(),
|
|
|
|
|
"current_state", state.String(),
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
case connectivity.Connecting:
|
|
|
|
|
if previousState == connectivity.Ready || previousState == connectivity.TransientFailure {
|
|
|
|
|
event.WriteSysEvent(ctx, op, "upstream connection is reconnecting",
|
|
|
|
|
"upstream_address", cc.Target(),
|
|
|
|
|
"previous_state", previousState.String(),
|
|
|
|
|
"current_state", state.String(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
case connectivity.Ready:
|
|
|
|
|
if previousState != connectivity.Ready {
|
|
|
|
|
event.WriteSysEvent(ctx, op, "upstream connection is ready",
|
|
|
|
|
"upstream_address", cc.Target(),
|
|
|
|
|
"previous_state", previousState.String(),
|
|
|
|
|
"current_state", state.String(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
case connectivity.Idle:
|
|
|
|
|
if previousState == connectivity.Ready {
|
|
|
|
|
event.WriteSysEvent(ctx, op, "upstream connection has become idle",
|
|
|
|
|
"upstream_address", cc.Target(),
|
|
|
|
|
"previous_state", previousState.String(),
|
|
|
|
|
"current_state", state.String(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
event.WriteError(ctx, op,
|
|
|
|
|
fmt.Errorf("upstream connection entered unexpected state"),
|
|
|
|
|
event.WithInfo(
|
|
|
|
|
"upstream_address", cc.Target(),
|
|
|
|
|
"previous_state", previousState.String(),
|
|
|
|
|
"current_state", state.String(),
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|