Allow reloading some server timings (#5071)

Allows changing grace period and a few other things timing-related via
HUP instead of just a restart.

Tested manually via config changes and HUP.
pull/5075/head
Jeff Mitchell 2 years ago committed by GitHub
parent 35d1df7388
commit 66af7dba98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -859,6 +859,10 @@ func (c *Command) Reload(newConf *config.Config) error {
reloadErrors = stderrors.Join(reloadErrors, fmt.Errorf("failed to reload controller api rate limits: %w", err))
}
if err := c.reloadControllerTimings(newConf); err != nil {
reloadErrors = stderrors.Join(reloadErrors, fmt.Errorf("failed to reload controller timings: %w", err))
}
if newConf != nil && c.worker != nil {
workerReloadErr := func() error {
if newConf.Controller != nil {
@ -977,6 +981,14 @@ func (c *Command) reloadControllerRateLimits(newConfig *config.Config) error {
return c.controller.ReloadRateLimiter(newConfig)
}
func (c *Command) reloadControllerTimings(newConfig *config.Config) error {
if c.controller == nil || newConfig == nil || newConfig.Controller == nil {
return nil
}
return c.controller.ReloadTimings(newConfig)
}
// acquireSchemaManager returns a schema manager and generally acquires a shared lock on
// the database. This is done as a mechanism to disallow running migration commands
// while the database is in use.

@ -682,3 +682,39 @@ func (c *Controller) Shutdown() error {
func (c *Controller) WorkerStatusUpdateTimes() *sync.Map {
return c.workerStatusUpdateTimes
}
// ReloadTimings reloads timing related parameters
func (c *Controller) ReloadTimings(newConfig *config.Config) error {
const op = "controller.(Controller).ReloadTimings"
switch {
case newConfig == nil:
return errors.New(c.baseContext, errors.InvalidParameter, op, "nil config")
case newConfig.Controller == nil:
return errors.New(c.baseContext, errors.InvalidParameter, op, "nil config.Controller")
}
switch newConfig.Controller.WorkerStatusGracePeriodDuration {
case 0:
c.workerStatusGracePeriod.Store(int64(server.DefaultLiveness))
default:
c.workerStatusGracePeriod.Store(int64(newConfig.Controller.WorkerStatusGracePeriodDuration))
}
switch newConfig.Controller.LivenessTimeToStaleDuration {
case 0:
c.livenessTimeToStale.Store(int64(server.DefaultLiveness))
default:
c.livenessTimeToStale.Store(int64(newConfig.Controller.LivenessTimeToStaleDuration))
}
switch newConfig.Controller.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
c.getDownstreamWorkersTimeout.Store(&to)
default:
to := newConfig.Controller.GetDownstreamWorkersTimeoutDuration
c.getDownstreamWorkersTimeout.Store(&to)
}
return nil
}

@ -414,6 +414,29 @@ func (w *Worker) Reload(ctx context.Context, newConf *config.Config) {
ar.InitialAddresses(w.conf.RawConfig.Worker.InitialUpstreams)
}
}
switch newConf.Worker.SuccessfulStatusGracePeriodDuration {
case 0:
w.successfulStatusGracePeriod.Store(int64(server.DefaultLiveness))
default:
w.successfulStatusGracePeriod.Store(int64(newConf.Worker.SuccessfulStatusGracePeriodDuration))
}
switch newConf.Worker.StatusCallTimeoutDuration {
case 0:
w.statusCallTimeoutDuration.Store(int64(common.DefaultStatusTimeout))
default:
w.statusCallTimeoutDuration.Store(int64(newConf.Worker.StatusCallTimeoutDuration))
}
switch newConf.Worker.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
w.getDownstreamWorkersTimeoutDuration.Store(&to)
default:
to := newConf.Worker.GetDownstreamWorkersTimeoutDuration
w.getDownstreamWorkersTimeoutDuration.Store(&to)
}
// See comment about this in worker.go
session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load())
}
func (w *Worker) Start() error {

@ -37,7 +37,7 @@ type sessionConnectionCleanupJob struct {
// The amount of time to give disconnected workers before marking their
// connections as closed. This should be larger than the liveness setting
// for the worker.
gracePeriod *atomic.Int64
workerStatusGracePeriod *atomic.Int64
// The total number of connections closed in the last run.
totalClosed int
@ -47,21 +47,21 @@ type sessionConnectionCleanupJob struct {
func newSessionConnectionCleanupJob(
ctx context.Context,
writer db.Writer,
gracePeriod *atomic.Int64,
workerStatusGracePeriod *atomic.Int64,
) (*sessionConnectionCleanupJob, error) {
const op = "session.newNewSessionConnectionCleanupJob"
switch {
case writer == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db writer")
case gracePeriod == nil:
case workerStatusGracePeriod == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grace period")
case gracePeriod.Load() == 0:
case workerStatusGracePeriod.Load() == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "grace period is zero")
}
return &sessionConnectionCleanupJob{
writer: writer,
gracePeriod: gracePeriod,
writer: writer,
workerStatusGracePeriod: workerStatusGracePeriod,
}, nil
}
@ -98,7 +98,7 @@ func (j *sessionConnectionCleanupJob) Run(ctx context.Context) error {
j.totalClosed = 0
// Run the atomic dead worker cleanup job.
gracePeriod := time.Duration(j.gracePeriod.Load())
gracePeriod := time.Duration(j.workerStatusGracePeriod.Load())
results, err := j.closeConnectionsForDeadWorkers(ctx, gracePeriod)
if err != nil {
return errors.Wrap(ctx, err, op)

@ -18,14 +18,14 @@ import (
const deleteTerminatedThreshold = time.Hour
// RegisterJobs registers session related jobs with the provided scheduler.
func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, gracePeriod *atomic.Int64) error {
func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, workerStatusGracePeriod *atomic.Int64) error {
const op = "session.RegisterJobs"
if gracePeriod == nil {
if workerStatusGracePeriod == nil {
return errors.New(ctx, errors.InvalidParameter, op, "nil grace period")
}
sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(ctx, w, gracePeriod)
sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(ctx, w, workerStatusGracePeriod)
if err != nil {
return fmt.Errorf("error creating session cleanup job: %w", err)
}

Loading…
Cancel
Save