diff --git a/internal/cmd/config/config.go b/internal/cmd/config/config.go index efb7c1e297..83a4764826 100644 --- a/internal/cmd/config/config.go +++ b/internal/cmd/config/config.go @@ -210,6 +210,12 @@ type Controller struct { LivenessTimeToStale any `hcl:"liveness_time_to_stale"` LivenessTimeToStaleDuration time.Duration `hcl:"-"` + // TODO: This isn't documented (on purpose) because the right place for this + // is central configuration so you can't drift across controllers and workers + // but we don't have that yet. + GetDownstreamWorkersTimeout any `hcl:"get_downstream_workers_timeout"` + GetDownstreamWorkersTimeoutDuration time.Duration `hcl:"-"` + // SchedulerRunJobInterval is the time interval between waking up the // scheduler to run pending jobs. // @@ -279,6 +285,13 @@ type Worker struct { StatusCallTimeout any `hcl:"status_call_timeout"` StatusCallTimeoutDuration time.Duration `hcl:"-"` + // GetDownstreamWorkersTimeout represents the period of time (as a duration) timeout + // for GetDownstreamWorkers call in DownstreamWorkerTicker + // + // TODO: This is currently not documented and considered internal. + GetDownstreamWorkersTimeout any `hcl:"get_downstream_workers_timeout"` + GetDownstreamWorkersTimeoutDuration time.Duration `hcl:"-"` + // SuccessfulStatusGracePeriod represents the period of time (as a duration) // that the worker will wait before disconnecting connections if it cannot // successfully complete a status report to a controller. This cannot be @@ -648,6 +661,21 @@ func Parse(d string) (*Config, error) { return nil, errors.New("Controller liveness time to stale value is negative") } + getDownstreamWorkersTimeout := result.Controller.GetDownstreamWorkersTimeout + if util.IsNil(getDownstreamWorkersTimeout) { + getDownstreamWorkersTimeout = os.Getenv("BOUNDARY_CONTROLLER_GET_DOWNSTREAM_WORKERS_TIMEOUT") + } + if getDownstreamWorkersTimeout != nil { + t, err := parseutil.ParseDurationSecond(getDownstreamWorkersTimeout) + if err != nil { + return result, err + } + result.Controller.GetDownstreamWorkersTimeoutDuration = t + } + if result.Controller.GetDownstreamWorkersTimeoutDuration < 0 { + return nil, errors.New("GetDownstreamWorkersTimeoutDuration value is negative") + } + if result.Controller.MaxPageSizeRaw != nil { switch t := result.Controller.MaxPageSizeRaw.(type) { case string: @@ -785,6 +813,21 @@ func Parse(d string) (*Config, error) { return nil, errors.New("Status call timeout value is negative") } + getDownstreamWorkersTimeoutDuration := result.Worker.GetDownstreamWorkersTimeout + if util.IsNil(getDownstreamWorkersTimeoutDuration) { + getDownstreamWorkersTimeoutDuration = os.Getenv("BOUNDARY_WORKER_GET_DOWNSTREAM_WORKERS_TIMEOUT") + } + if getDownstreamWorkersTimeoutDuration != nil { + t, err := parseutil.ParseDurationSecond(getDownstreamWorkersTimeoutDuration) + if err != nil { + return result, fmt.Errorf("error parsing get_downstream_worker_call_timeout: %w", err) + } + result.Worker.GetDownstreamWorkersTimeoutDuration = t + } + if result.Worker.GetDownstreamWorkersTimeoutDuration < 0 { + return nil, errors.New("GetDownstreamWorkersTimeoutDuration call timeout value is negative") + } + successfulStatusGracePeriod := result.Worker.SuccessfulStatusGracePeriod if util.IsNil(successfulStatusGracePeriod) { successfulStatusGracePeriod = os.Getenv("BOUNDARY_WORKER_SUCCESSFUL_STATUS_GRACE_PERIOD") diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index 5dc3b3fb16..8f4d617d25 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -10,6 +10,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/hashicorp/boundary/internal/alias" talias "github.com/hashicorp/boundary/internal/alias/target" @@ -91,7 +92,7 @@ var ( downstreamReceiverFactory func() downstreamReceiver downstreamersFactory func(context.Context, string, string) (common.Downstreamers, error) - downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver) (downstreamWorkersTicker, error) + downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver, time.Duration) (downstreamWorkersTicker, error) commandClientFactory func(context.Context, *Controller) error extControllerFactory func(ctx context.Context, c *Controller, r db.Reader, w db.Writer, kms *kms.Kms) (intglobals.ControllerExtension, error) ) @@ -121,8 +122,9 @@ type Controller struct { // Timing variables. These are atomics for SIGHUP support, and are int64 // because they are casted to time.Duration. - workerStatusGracePeriod *atomic.Int64 - livenessTimeToStale *atomic.Int64 + workerStatusGracePeriod *atomic.Int64 + livenessTimeToStale *atomic.Int64 + getDownstreamWorkersTimeout *atomic.Pointer[time.Duration] apiGrpcServer *grpc.Server apiGrpcServerListener grpcServerListener @@ -176,18 +178,19 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { metric.InitializeApiCollectors(conf.PrometheusRegisterer) ratelimit.InitializeMetrics(conf.PrometheusRegisterer) c := &Controller{ - conf: conf, - logger: conf.Logger.Named("controller"), - started: ua.NewBool(false), - tickerWg: new(sync.WaitGroup), - schedulerWg: new(sync.WaitGroup), - workerAuthCache: new(sync.Map), - workerStatusUpdateTimes: new(sync.Map), - enabledPlugins: conf.Server.EnabledPlugins, - apiListeners: make([]*base.ServerListener, 0), - downstreamConnManager: cluster.NewDownstreamManager(), - workerStatusGracePeriod: new(atomic.Int64), - livenessTimeToStale: new(atomic.Int64), + conf: conf, + logger: conf.Logger.Named("controller"), + started: ua.NewBool(false), + tickerWg: new(sync.WaitGroup), + schedulerWg: new(sync.WaitGroup), + workerAuthCache: new(sync.Map), + workerStatusUpdateTimes: new(sync.Map), + enabledPlugins: conf.Server.EnabledPlugins, + apiListeners: make([]*base.ServerListener, 0), + downstreamConnManager: cluster.NewDownstreamManager(), + workerStatusGracePeriod: new(atomic.Int64), + livenessTimeToStale: new(atomic.Int64), + getDownstreamWorkersTimeout: new(atomic.Pointer[time.Duration]), } if downstreamReceiverFactory != nil { @@ -238,6 +241,15 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { c.livenessTimeToStale.Store(int64(conf.RawConfig.Controller.LivenessTimeToStaleDuration)) } + switch conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration { + case 0: + to := server.DefaultLiveness + c.getDownstreamWorkersTimeout.Store(&to) + default: + to := conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration + c.getDownstreamWorkersTimeout.Store(&to) + } + clusterListeners := make([]*base.ServerListener, 0) for i := range conf.Listeners { l := conf.Listeners[i] @@ -579,7 +591,7 @@ func (c *Controller) Start() error { // we'll use "root" to designate that this is the root of the graph (aka // a controller) boundVer := version.Get().VersionNumber() - dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns) + dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns, *c.getDownstreamWorkersTimeout.Load()) if err != nil { return fmt.Errorf("error creating downstream workers ticker: %w", err) } diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index bdbcbfaa55..002e52c9de 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -187,8 +187,9 @@ type Worker struct { // Timing variables. These are atomics for SIGHUP support, and are int64 // because they are casted to time.Duration. - successfulStatusGracePeriod *atomic.Int64 - statusCallTimeoutDuration *atomic.Int64 + successfulStatusGracePeriod *atomic.Int64 + statusCallTimeoutDuration *atomic.Int64 + getDownstreamWorkersTimeoutDuration *atomic.Pointer[time.Duration] // AuthRotationNextRotation is useful in tests to understand how long to // sleep @@ -222,17 +223,18 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { lastStatusSuccess: new(atomic.Value), controllerMultihopConn: new(atomic.Value), // controllerUpstreamMsgConn: new(atomic.Value), - tags: new(atomic.Value), - updateTags: ua.NewBool(false), - nonceFn: base62.Random, - WorkerAuthCurrentKeyId: new(ua.String), - operationalState: new(atomic.Value), - downstreamConnManager: cluster.NewDownstreamManager(), - localStorageState: new(atomic.Value), - successfulStatusGracePeriod: new(atomic.Int64), - statusCallTimeoutDuration: new(atomic.Int64), - upstreamConnectionState: new(atomic.Value), - downstreamWorkers: new(atomic.Pointer[downstreamersContainer]), + tags: new(atomic.Value), + updateTags: ua.NewBool(false), + nonceFn: base62.Random, + WorkerAuthCurrentKeyId: new(ua.String), + operationalState: new(atomic.Value), + downstreamConnManager: cluster.NewDownstreamManager(), + localStorageState: new(atomic.Value), + successfulStatusGracePeriod: new(atomic.Int64), + statusCallTimeoutDuration: new(atomic.Int64), + getDownstreamWorkersTimeoutDuration: new(atomic.Pointer[time.Duration]), + upstreamConnectionState: new(atomic.Value), + downstreamWorkers: new(atomic.Pointer[downstreamersContainer]), } w.operationalState.Store(server.UnknownOperationalState) @@ -334,6 +336,14 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { default: w.statusCallTimeoutDuration.Store(int64(conf.RawConfig.Worker.StatusCallTimeoutDuration)) } + switch conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration { + case 0: + to := server.DefaultLiveness + w.getDownstreamWorkersTimeoutDuration.Store(&to) + default: + to := conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration + w.getDownstreamWorkersTimeoutDuration.Store(&to) + } // FIXME: This is really ugly, but works. session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load())