implement configurable getDownstreamWorker timeout (#5007)

* implement configurable getDownstreamWorker timeout

* add parsing test

* typos

* fix tests

* fix worker tests

* 0 is a valid value
pull/5025/head
Sorawis Nilparuk (Bo) 2 years ago committed by GitHub
parent 1a8e954513
commit 350ac90a57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -210,6 +210,12 @@ type Controller struct {
LivenessTimeToStale any `hcl:"liveness_time_to_stale"`
LivenessTimeToStaleDuration time.Duration `hcl:"-"`
// TODO: This isn't documented (on purpose) because the right place for this
// is central configuration so you can't drift across controllers and workers
// but we don't have that yet.
GetDownstreamWorkersTimeout any `hcl:"get_downstream_workers_timeout"`
GetDownstreamWorkersTimeoutDuration time.Duration `hcl:"-"`
// SchedulerRunJobInterval is the time interval between waking up the
// scheduler to run pending jobs.
//
@ -279,6 +285,13 @@ type Worker struct {
StatusCallTimeout any `hcl:"status_call_timeout"`
StatusCallTimeoutDuration time.Duration `hcl:"-"`
// GetDownstreamWorkersTimeout represents the period of time (as a duration) timeout
// for GetDownstreamWorkers call in DownstreamWorkerTicker
//
// TODO: This is currently not documented and considered internal.
GetDownstreamWorkersTimeout any `hcl:"get_downstream_workers_timeout"`
GetDownstreamWorkersTimeoutDuration time.Duration `hcl:"-"`
// SuccessfulStatusGracePeriod represents the period of time (as a duration)
// that the worker will wait before disconnecting connections if it cannot
// successfully complete a status report to a controller. This cannot be
@ -648,6 +661,21 @@ func Parse(d string) (*Config, error) {
return nil, errors.New("Controller liveness time to stale value is negative")
}
getDownstreamWorkersTimeout := result.Controller.GetDownstreamWorkersTimeout
if util.IsNil(getDownstreamWorkersTimeout) {
getDownstreamWorkersTimeout = os.Getenv("BOUNDARY_CONTROLLER_GET_DOWNSTREAM_WORKERS_TIMEOUT")
}
if getDownstreamWorkersTimeout != nil {
t, err := parseutil.ParseDurationSecond(getDownstreamWorkersTimeout)
if err != nil {
return result, fmt.Errorf("error trying to parse controller get_downstream_workers_timeout: %w", err)
}
result.Controller.GetDownstreamWorkersTimeoutDuration = t
}
if result.Controller.GetDownstreamWorkersTimeoutDuration < 0 {
return nil, errors.New("get downstream workers timeout must be greater than 0")
}
if result.Controller.MaxPageSizeRaw != nil {
switch t := result.Controller.MaxPageSizeRaw.(type) {
case string:
@ -785,6 +813,21 @@ func Parse(d string) (*Config, error) {
return nil, errors.New("Status call timeout value is negative")
}
getDownstreamWorkersTimeoutDuration := result.Worker.GetDownstreamWorkersTimeout
if util.IsNil(getDownstreamWorkersTimeoutDuration) {
getDownstreamWorkersTimeoutDuration = os.Getenv("BOUNDARY_WORKER_GET_DOWNSTREAM_WORKERS_TIMEOUT")
}
if getDownstreamWorkersTimeoutDuration != nil {
t, err := parseutil.ParseDurationSecond(getDownstreamWorkersTimeoutDuration)
if err != nil {
return result, fmt.Errorf("error trying to parse worker get_downstream_workers_timeout: %w", err)
}
result.Worker.GetDownstreamWorkersTimeoutDuration = t
}
if result.Worker.GetDownstreamWorkersTimeoutDuration < 0 {
return nil, errors.New("get downstream workers timeout must be greater than 0")
}
successfulStatusGracePeriod := result.Worker.SuccessfulStatusGracePeriod
if util.IsNil(successfulStatusGracePeriod) {
successfulStatusGracePeriod = os.Getenv("BOUNDARY_WORKER_SUCCESSFUL_STATUS_GRACE_PERIOD")

@ -2832,6 +2832,190 @@ func TestSetupWorkerInitialUpstreams(t *testing.T) {
}
}
func TestGetDownstreamWorkersTimeout(t *testing.T) {
tests := []struct {
name string
in string
wantController bool
wantWorker bool
wantControllerTimeout time.Duration
wantWorkerTimeout time.Duration
assertErr func(*testing.T, error)
}{
{
name: "controller_valid_time_value",
in: `
controller {
name = "example-controller"
get_downstream_workers_timeout = "10s"
}`,
wantControllerTimeout: 10 * time.Second,
wantWorkerTimeout: 0,
wantController: true,
wantWorker: false,
assertErr: nil,
},
{
name: "worker_valid_time_value",
in: `
worker {
name = "example-worker"
get_downstream_workers_timeout = "5s"
}`,
wantControllerTimeout: 0,
wantWorkerTimeout: 5 * time.Second,
wantController: false,
wantWorker: true,
assertErr: nil,
},
{
name: "both_valid_time_value",
in: `
controller {
name = "example-controller"
get_downstream_workers_timeout = "5s"
}
worker {
name = "example-worker"
get_downstream_workers_timeout = "500ms"
}`,
wantControllerTimeout: 5 * time.Second,
wantWorkerTimeout: 500 * time.Millisecond,
wantController: true,
wantWorker: true,
assertErr: nil,
},
{
name: "both_unspecified_defaults_to_zero",
in: `
controller {
name = "example-controller"
}
worker {
name = "example-worker"
}`,
wantController: true,
wantWorker: true,
wantControllerTimeout: 0,
wantWorkerTimeout: 0,
assertErr: nil,
},
{
name: "controller_int_value_no_unit_assumes_seconds",
in: `
controller {
name = "example-controller"
get_downstream_workers_timeout = 100
}`,
wantController: true,
wantWorker: false,
wantControllerTimeout: 100 * time.Second,
wantWorkerTimeout: 0,
},
{
name: "worker_int_value_no_unit_assumes_seconds",
in: `
worker {
name = "example-worker"
get_downstream_workers_timeout = 30
}`,
wantController: false,
wantWorker: true,
wantWorkerTimeout: 30 * time.Second,
},
{
name: "controller_invalid_bool_value",
in: `
controller {
name = "example-controller"
get_downstream_workers_timeout = true
}`,
wantController: true,
wantWorker: false,
assertErr: func(t *testing.T, err error) {
require.Error(t, err)
require.ErrorContains(t, err, `error trying to parse controller get_downstream_workers_timeout`)
},
},
{
name: "worker_invalid_bool_value",
in: `
worker {
name = "example-worker"
get_downstream_workers_timeout = false
}`,
wantController: false,
wantWorker: true,
assertErr: func(t *testing.T, err error) {
require.Error(t, err)
require.ErrorContains(t, err, `error trying to parse worker get_downstream_workers_timeout`)
},
},
{
name: "controller_invalid_empty_value",
in: `
controller {
name = "example-controller"
get_downstream_workers_timeout = ""
}`,
wantController: true,
wantWorker: false,
wantControllerTimeout: 0,
},
{
name: "worker_invalid_empty_value",
in: `
worker {
name = "example-worker"
get_downstream_workers_timeout = ""
}`,
wantController: false,
wantWorker: true,
wantWorkerTimeout: 0,
},
{
name: "controller_invalid_zero_value",
in: `
controller {
name = "example-controller"
get_downstream_workers_timeout = "0s"
}`,
wantController: true,
wantWorker: false,
wantControllerTimeout: 0,
},
{
name: "worker_invalid_zero_value",
in: `
worker {
name = "example-worker"
get_downstream_workers_timeout = "0s"
}`,
wantController: false,
wantWorker: true,
wantWorkerTimeout: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, err := Parse(tt.in)
if tt.assertErr != nil {
tt.assertErr(t, err)
return
}
require.NoError(t, err)
if tt.wantController {
require.NotNil(t, c.Controller)
require.Equal(t, tt.wantControllerTimeout, c.Controller.GetDownstreamWorkersTimeoutDuration)
}
if tt.wantWorker {
require.NotNil(t, c.Worker)
require.Equal(t, tt.wantWorkerTimeout, c.Worker.GetDownstreamWorkersTimeoutDuration)
}
})
}
}
func TestMaxPageSize(t *testing.T) {
tests := []struct {
name string

@ -10,6 +10,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/boundary/internal/alias"
talias "github.com/hashicorp/boundary/internal/alias/target"
@ -91,7 +92,7 @@ var (
downstreamReceiverFactory func() downstreamReceiver
downstreamersFactory func(context.Context, string, string) (common.Downstreamers, error)
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver) (downstreamWorkersTicker, error)
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver, time.Duration) (downstreamWorkersTicker, error)
commandClientFactory func(context.Context, *Controller) error
extControllerFactory func(ctx context.Context, c *Controller, r db.Reader, w db.Writer, kms *kms.Kms) (intglobals.ControllerExtension, error)
)
@ -121,8 +122,9 @@ type Controller struct {
// Timing variables. These are atomics for SIGHUP support, and are int64
// because they are casted to time.Duration.
workerStatusGracePeriod *atomic.Int64
livenessTimeToStale *atomic.Int64
workerStatusGracePeriod *atomic.Int64
livenessTimeToStale *atomic.Int64
getDownstreamWorkersTimeout *atomic.Pointer[time.Duration]
apiGrpcServer *grpc.Server
apiGrpcServerListener grpcServerListener
@ -176,18 +178,19 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
metric.InitializeApiCollectors(conf.PrometheusRegisterer)
ratelimit.InitializeMetrics(conf.PrometheusRegisterer)
c := &Controller{
conf: conf,
logger: conf.Logger.Named("controller"),
started: ua.NewBool(false),
tickerWg: new(sync.WaitGroup),
schedulerWg: new(sync.WaitGroup),
workerAuthCache: new(sync.Map),
workerStatusUpdateTimes: new(sync.Map),
enabledPlugins: conf.Server.EnabledPlugins,
apiListeners: make([]*base.ServerListener, 0),
downstreamConnManager: cluster.NewDownstreamManager(),
workerStatusGracePeriod: new(atomic.Int64),
livenessTimeToStale: new(atomic.Int64),
conf: conf,
logger: conf.Logger.Named("controller"),
started: ua.NewBool(false),
tickerWg: new(sync.WaitGroup),
schedulerWg: new(sync.WaitGroup),
workerAuthCache: new(sync.Map),
workerStatusUpdateTimes: new(sync.Map),
enabledPlugins: conf.Server.EnabledPlugins,
apiListeners: make([]*base.ServerListener, 0),
downstreamConnManager: cluster.NewDownstreamManager(),
workerStatusGracePeriod: new(atomic.Int64),
livenessTimeToStale: new(atomic.Int64),
getDownstreamWorkersTimeout: new(atomic.Pointer[time.Duration]),
}
if downstreamReceiverFactory != nil {
@ -238,6 +241,15 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
c.livenessTimeToStale.Store(int64(conf.RawConfig.Controller.LivenessTimeToStaleDuration))
}
switch conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
c.getDownstreamWorkersTimeout.Store(&to)
default:
to := conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration
c.getDownstreamWorkersTimeout.Store(&to)
}
clusterListeners := make([]*base.ServerListener, 0)
for i := range conf.Listeners {
l := conf.Listeners[i]
@ -579,7 +591,7 @@ func (c *Controller) Start() error {
// we'll use "root" to designate that this is the root of the graph (aka
// a controller)
boundVer := version.Get().VersionNumber()
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns)
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns, *c.getDownstreamWorkersTimeout.Load())
if err != nil {
return fmt.Errorf("error creating downstream workers ticker: %w", err)
}

@ -187,8 +187,9 @@ type Worker struct {
// Timing variables. These are atomics for SIGHUP support, and are int64
// because they are casted to time.Duration.
successfulStatusGracePeriod *atomic.Int64
statusCallTimeoutDuration *atomic.Int64
successfulStatusGracePeriod *atomic.Int64
statusCallTimeoutDuration *atomic.Int64
getDownstreamWorkersTimeoutDuration *atomic.Pointer[time.Duration]
// AuthRotationNextRotation is useful in tests to understand how long to
// sleep
@ -222,17 +223,18 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
lastStatusSuccess: new(atomic.Value),
controllerMultihopConn: new(atomic.Value),
// controllerUpstreamMsgConn: new(atomic.Value),
tags: new(atomic.Value),
updateTags: ua.NewBool(false),
nonceFn: base62.Random,
WorkerAuthCurrentKeyId: new(ua.String),
operationalState: new(atomic.Value),
downstreamConnManager: cluster.NewDownstreamManager(),
localStorageState: new(atomic.Value),
successfulStatusGracePeriod: new(atomic.Int64),
statusCallTimeoutDuration: new(atomic.Int64),
upstreamConnectionState: new(atomic.Value),
downstreamWorkers: new(atomic.Pointer[downstreamersContainer]),
tags: new(atomic.Value),
updateTags: ua.NewBool(false),
nonceFn: base62.Random,
WorkerAuthCurrentKeyId: new(ua.String),
operationalState: new(atomic.Value),
downstreamConnManager: cluster.NewDownstreamManager(),
localStorageState: new(atomic.Value),
successfulStatusGracePeriod: new(atomic.Int64),
statusCallTimeoutDuration: new(atomic.Int64),
getDownstreamWorkersTimeoutDuration: new(atomic.Pointer[time.Duration]),
upstreamConnectionState: new(atomic.Value),
downstreamWorkers: new(atomic.Pointer[downstreamersContainer]),
}
w.operationalState.Store(server.UnknownOperationalState)
@ -334,6 +336,14 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
default:
w.statusCallTimeoutDuration.Store(int64(conf.RawConfig.Worker.StatusCallTimeoutDuration))
}
switch conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
w.getDownstreamWorkersTimeoutDuration.Store(&to)
default:
to := conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration
w.getDownstreamWorkersTimeoutDuration.Store(&to)
}
// FIXME: This is really ugly, but works.
session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load())

Loading…
Cancel
Save