backport of commit 5e111f7b31

pull/5018/head
Sorawis Nilparuk 2 years ago
parent 19982259dd
commit af4608dc2f

@ -210,6 +210,12 @@ type Controller struct {
LivenessTimeToStale any `hcl:"liveness_time_to_stale"`
LivenessTimeToStaleDuration time.Duration `hcl:"-"`
// TODO: This isn't documented (on purpose) because the right place for this
// is central configuration so you can't drift across controllers and workers
// but we don't have that yet.
GetDownstreamWorkersTimeout any `hcl:"get_downstream_workers_timeout"`
GetDownstreamWorkersTimeoutDuration time.Duration `hcl:"-"`
// SchedulerRunJobInterval is the time interval between waking up the
// scheduler to run pending jobs.
//
@ -279,6 +285,13 @@ type Worker struct {
StatusCallTimeout any `hcl:"status_call_timeout"`
StatusCallTimeoutDuration time.Duration `hcl:"-"`
// GetDownstreamWorkersTimeout represents the period of time (as a duration) timeout
// for GetDownstreamWorkers call in DownstreamWorkerTicker
//
// TODO: This is currently not documented and considered internal.
GetDownstreamWorkersTimeout any `hcl:"get_downstream_workers_timeout"`
GetDownstreamWorkersTimeoutDuration time.Duration `hcl:"-"`
// SuccessfulStatusGracePeriod represents the period of time (as a duration)
// that the worker will wait before disconnecting connections if it cannot
// successfully complete a status report to a controller. This cannot be
@ -648,6 +661,21 @@ func Parse(d string) (*Config, error) {
return nil, errors.New("Controller liveness time to stale value is negative")
}
getDownstreamWorkersTimeout := result.Controller.GetDownstreamWorkersTimeout
if util.IsNil(getDownstreamWorkersTimeout) {
getDownstreamWorkersTimeout = os.Getenv("BOUNDARY_CONTROLLER_GET_DOWNSTREAM_WORKERS_TIMEOUT")
}
if getDownstreamWorkersTimeout != nil {
t, err := parseutil.ParseDurationSecond(getDownstreamWorkersTimeout)
if err != nil {
return result, err
}
result.Controller.GetDownstreamWorkersTimeoutDuration = t
}
if result.Controller.GetDownstreamWorkersTimeoutDuration < 0 {
return nil, errors.New("GetDownstreamWorkersTimeoutDuration value is negative")
}
if result.Controller.MaxPageSizeRaw != nil {
switch t := result.Controller.MaxPageSizeRaw.(type) {
case string:
@ -785,6 +813,21 @@ func Parse(d string) (*Config, error) {
return nil, errors.New("Status call timeout value is negative")
}
getDownstreamWorkersTimeoutDuration := result.Worker.GetDownstreamWorkersTimeout
if util.IsNil(getDownstreamWorkersTimeoutDuration) {
getDownstreamWorkersTimeoutDuration = os.Getenv("BOUNDARY_WORKER_GET_DOWNSTREAM_WORKERS_TIMEOUT")
}
if getDownstreamWorkersTimeoutDuration != nil {
t, err := parseutil.ParseDurationSecond(getDownstreamWorkersTimeoutDuration)
if err != nil {
return result, fmt.Errorf("error parsing get_downstream_worker_call_timeout: %w", err)
}
result.Worker.GetDownstreamWorkersTimeoutDuration = t
}
if result.Worker.GetDownstreamWorkersTimeoutDuration < 0 {
return nil, errors.New("GetDownstreamWorkersTimeoutDuration call timeout value is negative")
}
successfulStatusGracePeriod := result.Worker.SuccessfulStatusGracePeriod
if util.IsNil(successfulStatusGracePeriod) {
successfulStatusGracePeriod = os.Getenv("BOUNDARY_WORKER_SUCCESSFUL_STATUS_GRACE_PERIOD")

@ -10,6 +10,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/boundary/internal/alias"
talias "github.com/hashicorp/boundary/internal/alias/target"
@ -91,7 +92,7 @@ var (
downstreamReceiverFactory func() downstreamReceiver
downstreamersFactory func(context.Context, string, string) (common.Downstreamers, error)
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver) (downstreamWorkersTicker, error)
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver, time.Duration) (downstreamWorkersTicker, error)
commandClientFactory func(context.Context, *Controller) error
extControllerFactory func(ctx context.Context, c *Controller, r db.Reader, w db.Writer, kms *kms.Kms) (intglobals.ControllerExtension, error)
)
@ -121,8 +122,9 @@ type Controller struct {
// Timing variables. These are atomics for SIGHUP support, and are int64
// because they are casted to time.Duration.
workerStatusGracePeriod *atomic.Int64
livenessTimeToStale *atomic.Int64
workerStatusGracePeriod *atomic.Int64
livenessTimeToStale *atomic.Int64
getDownstreamWorkersTimeout *atomic.Pointer[time.Duration]
apiGrpcServer *grpc.Server
apiGrpcServerListener grpcServerListener
@ -176,18 +178,19 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
metric.InitializeApiCollectors(conf.PrometheusRegisterer)
ratelimit.InitializeMetrics(conf.PrometheusRegisterer)
c := &Controller{
conf: conf,
logger: conf.Logger.Named("controller"),
started: ua.NewBool(false),
tickerWg: new(sync.WaitGroup),
schedulerWg: new(sync.WaitGroup),
workerAuthCache: new(sync.Map),
workerStatusUpdateTimes: new(sync.Map),
enabledPlugins: conf.Server.EnabledPlugins,
apiListeners: make([]*base.ServerListener, 0),
downstreamConnManager: cluster.NewDownstreamManager(),
workerStatusGracePeriod: new(atomic.Int64),
livenessTimeToStale: new(atomic.Int64),
conf: conf,
logger: conf.Logger.Named("controller"),
started: ua.NewBool(false),
tickerWg: new(sync.WaitGroup),
schedulerWg: new(sync.WaitGroup),
workerAuthCache: new(sync.Map),
workerStatusUpdateTimes: new(sync.Map),
enabledPlugins: conf.Server.EnabledPlugins,
apiListeners: make([]*base.ServerListener, 0),
downstreamConnManager: cluster.NewDownstreamManager(),
workerStatusGracePeriod: new(atomic.Int64),
livenessTimeToStale: new(atomic.Int64),
getDownstreamWorkersTimeout: new(atomic.Pointer[time.Duration]),
}
if downstreamReceiverFactory != nil {
@ -238,6 +241,15 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
c.livenessTimeToStale.Store(int64(conf.RawConfig.Controller.LivenessTimeToStaleDuration))
}
switch conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
c.getDownstreamWorkersTimeout.Store(&to)
default:
to := conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration
c.getDownstreamWorkersTimeout.Store(&to)
}
clusterListeners := make([]*base.ServerListener, 0)
for i := range conf.Listeners {
l := conf.Listeners[i]
@ -579,7 +591,7 @@ func (c *Controller) Start() error {
// we'll use "root" to designate that this is the root of the graph (aka
// a controller)
boundVer := version.Get().VersionNumber()
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns)
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns, *c.getDownstreamWorkersTimeout.Load())
if err != nil {
return fmt.Errorf("error creating downstream workers ticker: %w", err)
}

@ -187,8 +187,9 @@ type Worker struct {
// Timing variables. These are atomics for SIGHUP support, and are int64
// because they are casted to time.Duration.
successfulStatusGracePeriod *atomic.Int64
statusCallTimeoutDuration *atomic.Int64
successfulStatusGracePeriod *atomic.Int64
statusCallTimeoutDuration *atomic.Int64
getDownstreamWorkersTimeoutDuration *atomic.Pointer[time.Duration]
// AuthRotationNextRotation is useful in tests to understand how long to
// sleep
@ -222,17 +223,18 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
lastStatusSuccess: new(atomic.Value),
controllerMultihopConn: new(atomic.Value),
// controllerUpstreamMsgConn: new(atomic.Value),
tags: new(atomic.Value),
updateTags: ua.NewBool(false),
nonceFn: base62.Random,
WorkerAuthCurrentKeyId: new(ua.String),
operationalState: new(atomic.Value),
downstreamConnManager: cluster.NewDownstreamManager(),
localStorageState: new(atomic.Value),
successfulStatusGracePeriod: new(atomic.Int64),
statusCallTimeoutDuration: new(atomic.Int64),
upstreamConnectionState: new(atomic.Value),
downstreamWorkers: new(atomic.Pointer[downstreamersContainer]),
tags: new(atomic.Value),
updateTags: ua.NewBool(false),
nonceFn: base62.Random,
WorkerAuthCurrentKeyId: new(ua.String),
operationalState: new(atomic.Value),
downstreamConnManager: cluster.NewDownstreamManager(),
localStorageState: new(atomic.Value),
successfulStatusGracePeriod: new(atomic.Int64),
statusCallTimeoutDuration: new(atomic.Int64),
getDownstreamWorkersTimeoutDuration: new(atomic.Pointer[time.Duration]),
upstreamConnectionState: new(atomic.Value),
downstreamWorkers: new(atomic.Pointer[downstreamersContainer]),
}
w.operationalState.Store(server.UnknownOperationalState)
@ -334,6 +336,14 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
default:
w.statusCallTimeoutDuration.Store(int64(conf.RawConfig.Worker.StatusCallTimeoutDuration))
}
switch conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
w.getDownstreamWorkersTimeoutDuration.Store(&to)
default:
to := conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration
w.getDownstreamWorkersTimeoutDuration.Store(&to)
}
// FIXME: This is really ugly, but works.
session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load())

Loading…
Cancel
Save