diff --git a/internal/cmd/commands/dev/dev.go b/internal/cmd/commands/dev/dev.go index d06a5cbd2d..32661c9427 100644 --- a/internal/cmd/commands/dev/dev.go +++ b/internal/cmd/commands/dev/dev.go @@ -627,7 +627,7 @@ func (c *Command) Run(args []string) int { if c.flagCreateLoopbackHostPlugin { c.DevLoopbackHostPluginId = "pl_1234567890" c.EnabledPlugins = append(c.EnabledPlugins, base.EnabledPluginHostLoopback) - c.Config.Controller.SchedulerRunJobInterval = 100 * time.Millisecond + c.Config.Controller.Scheduler.JobRunIntervalDuration = 100 * time.Millisecond } switch c.flagDatabaseUrl { case "": diff --git a/internal/cmd/config/config.go b/internal/cmd/config/config.go index 7243818165..244377f117 100644 --- a/internal/cmd/config/config.go +++ b/internal/cmd/config/config.go @@ -132,10 +132,11 @@ type Config struct { } type Controller struct { - Name string `hcl:"name"` - Description string `hcl:"description"` - Database *Database `hcl:"database"` - PublicClusterAddr string `hcl:"public_cluster_addr"` + Name string `hcl:"name"` + Description string `hcl:"description"` + Database *Database `hcl:"database"` + PublicClusterAddr string `hcl:"public_cluster_addr"` + Scheduler *Scheduler `hcl:"scheduler"` // AuthTokenTimeToLive is the total valid lifetime of a token denoted by time.Duration AuthTokenTimeToLive interface{} `hcl:"auth_token_time_to_live"` @@ -158,12 +159,6 @@ type Controller struct { // // TODO: This field is currently internal. StatusGracePeriodDuration time.Duration `hcl:"-"` - - // SchedulerRunJobInterval is the time interval between waking up the - // scheduler to run pending jobs. - // - // TODO: This field is currently internal. - SchedulerRunJobInterval time.Duration `hcl:"-"` } func (c *Controller) InitNameIfEmpty() error { @@ -238,6 +233,21 @@ type Database struct { SkipSharedLockAcquisition bool `hcl:"skip_shared_lock_acquisition"` } +// Scheduler is the configuration block that specifies the job scheduler behavior on the controller +type Scheduler struct { + // JobRunInterval is the time interval between waking up the + // scheduler to run pending jobs. + // + JobRunInterval interface{} `hcl:"job_run_interval"` + JobRunIntervalDuration time.Duration + + // MonitorInterval is the time interval between waking up the + // scheduler to monitor for jobs that are defunct. + // + MonitorInterval interface{} `hcl:"monitor_interval"` + MonitorIntervalDuration time.Duration +} + type Plugins struct { ExecutionDir string `hcl:"execution_dir"` } @@ -387,6 +397,24 @@ func Parse(d string) (*Config, error) { result.Controller.GracefulShutdownWaitDuration = t } + if result.Controller.Scheduler != nil { + if result.Controller.Scheduler.JobRunInterval != "" { + t, err := parseutil.ParseDurationSecond(result.Controller.Scheduler.JobRunInterval) + if err != nil { + return result, err + } + result.Controller.Scheduler.JobRunIntervalDuration = t + } + + if result.Controller.Scheduler.MonitorInterval != "" { + t, err := parseutil.ParseDurationSecond(result.Controller.Scheduler.MonitorInterval) + if err != nil { + return result, err + } + result.Controller.Scheduler.MonitorIntervalDuration = t + } + } + if result.Controller.Database != nil { if result.Controller.Database.MaxOpenConnectionsRaw != nil { switch t := result.Controller.Database.MaxOpenConnectionsRaw.(type) { diff --git a/internal/cmd/config/config_test.go b/internal/cmd/config/config_test.go index da1e8f912f..48858384f5 100644 --- a/internal/cmd/config/config_test.go +++ b/internal/cmd/config/config_test.go @@ -539,6 +539,96 @@ func TestParsingName(t *testing.T) { } } +func TestParsingSchedulerIntervals(t *testing.T) { + t.Parallel() + cases := []struct { + name string + config string + wantErr bool + wantMonitorInterval time.Duration + wantRunJobInterval time.Duration + }{ + { + name: "invalid-run-interval", + config: ` +controller { + scheduler { + job_run_interval = "hello" + } +} +`, + wantErr: true, + }, + { + name: "invalid-monitor-interval", + config: ` +controller { + scheduler { + monitor_interval = "hello" + } +} +`, + wantErr: true, + }, + { + name: "valid-undefined", + config: `controller { scheduler {} }`, + wantMonitorInterval: 0, + wantRunJobInterval: 0, + }, + { + name: "run-job-interval", + config: ` +controller { + scheduler { + job_run_interval = "10m" + } +} +`, + wantMonitorInterval: 0, + wantRunJobInterval: 10 * time.Minute, + }, + { + name: "monitor-interval", + config: ` +controller { + scheduler { + monitor_interval = "6h" + } +} +`, + wantMonitorInterval: 6 * time.Hour, + wantRunJobInterval: 0, + }, + { + name: "both", + config: ` +controller { + scheduler { + monitor_interval = "7d" + job_run_interval = "20s" + } +} +`, + wantMonitorInterval: 7 * 24 * time.Hour, + wantRunJobInterval: 20 * time.Second, + }, + } + for _, tt := range cases { + tt := tt + t.Run(tt.name, func(t *testing.T) { + out, err := Parse(tt.config) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tt.wantMonitorInterval, out.Controller.Scheduler.MonitorIntervalDuration) + assert.Equal(t, tt.wantRunJobInterval, out.Controller.Scheduler.JobRunIntervalDuration) + }) + } +} + func TestWorkerTags(t *testing.T) { defaultStateFn := func(t *testing.T, tags string) { t.Setenv("BOUNDARY_WORKER_TAGS", tags) diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index 1c94474660..a6dd86b540 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -294,8 +294,13 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { } // TODO: Allow setting run jobs limit from config schedulerOpts := []scheduler.Option{scheduler.WithRunJobsLimit(-1)} - if c.conf.RawConfig.Controller.SchedulerRunJobInterval > 0 { - schedulerOpts = append(schedulerOpts, scheduler.WithRunJobsInterval(c.conf.RawConfig.Controller.SchedulerRunJobInterval)) + if sche := c.conf.RawConfig.Controller.Scheduler; sche != nil { + if sche.JobRunIntervalDuration > 0 { + schedulerOpts = append(schedulerOpts, scheduler.WithRunJobsInterval(sche.JobRunIntervalDuration)) + } + if sche.MonitorIntervalDuration > 0 { + schedulerOpts = append(schedulerOpts, scheduler.WithMonitorInterval(sche.MonitorIntervalDuration)) + } } c.scheduler, err = scheduler.New(c.conf.RawConfig.Controller.Name, jobRepoFn, schedulerOpts...) if err != nil { diff --git a/internal/daemon/controller/testing.go b/internal/daemon/controller/testing.go index 7bd35266a2..ed542ca314 100644 --- a/internal/daemon/controller/testing.go +++ b/internal/daemon/controller/testing.go @@ -569,7 +569,11 @@ func TestControllerConfig(t testing.TB, ctx context.Context, tc *TestController, if opts.Config.Controller.Name == "" { require.NoError(t, opts.Config.Controller.InitNameIfEmpty()) } - opts.Config.Controller.SchedulerRunJobInterval = opts.SchedulerRunJobInterval + + if opts.Config.Controller.Scheduler == nil { + opts.Config.Controller.Scheduler = new(config.Scheduler) + } + opts.Config.Controller.Scheduler.JobRunIntervalDuration = opts.SchedulerRunJobInterval switch { case opts.DisableEventing: diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 5cbdefcd44..ad2da12e08 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -28,10 +28,11 @@ func TestScheduler_New(t *testing.T) { } type args struct { - serverId string - jobRepo jobRepoFactory - runLimit int - runInterval time.Duration + serverId string + jobRepo jobRepoFactory + runLimit int + runInterval time.Duration + monitorInterval time.Duration } tests := []struct { name string @@ -64,9 +65,10 @@ func TestScheduler_New(t *testing.T) { jobRepo: jobRepoFn, }, want: args{ - serverId: "test-server", - runLimit: 1, - runInterval: time.Minute, + serverId: "test-server", + runLimit: defaultRunJobsLimit, + runInterval: defaultRunJobsInterval, + monitorInterval: defaultMonitorInterval, }, }, { @@ -79,9 +81,10 @@ func TestScheduler_New(t *testing.T) { WithRunJobsInterval(time.Hour), }, want: args{ - serverId: "test-server", - runLimit: 1, - runInterval: time.Hour, + serverId: "test-server", + runLimit: defaultRunJobsLimit, + monitorInterval: defaultMonitorInterval, + runInterval: time.Hour, }, }, { @@ -94,9 +97,10 @@ func TestScheduler_New(t *testing.T) { WithRunJobsLimit(-1), }, want: args{ - serverId: "test-server", - runLimit: -1, - runInterval: time.Minute, + serverId: "test-server", + runLimit: -1, + runInterval: defaultRunJobsInterval, + monitorInterval: defaultMonitorInterval, }, }, { @@ -109,13 +113,30 @@ func TestScheduler_New(t *testing.T) { WithRunJobsLimit(20), }, want: args{ - serverId: "test-server", - runLimit: 20, - runInterval: time.Minute, + serverId: "test-server", + runLimit: 20, + runInterval: defaultRunJobsInterval, + monitorInterval: defaultMonitorInterval, }, }, { - name: "valid-with-limit-and-interval", + name: "valid-with-monitor", + args: args{ + serverId: "test-server", + jobRepo: jobRepoFn, + }, + opts: []Option{ + WithMonitorInterval(time.Hour), + }, + want: args{ + serverId: "test-server", + runLimit: defaultRunJobsLimit, + runInterval: defaultRunJobsInterval, + monitorInterval: time.Hour, + }, + }, + { + name: "valid-with-all", args: args{ serverId: "test-server", jobRepo: jobRepoFn, @@ -123,11 +144,13 @@ func TestScheduler_New(t *testing.T) { opts: []Option{ WithRunJobsInterval(time.Hour), WithRunJobsLimit(20), + WithMonitorInterval(2 * time.Hour), }, want: args{ - serverId: "test-server", - runLimit: 20, - runInterval: time.Hour, + serverId: "test-server", + runLimit: 20, + runInterval: time.Hour, + monitorInterval: 2 * time.Hour, }, }, } @@ -149,6 +172,7 @@ func TestScheduler_New(t *testing.T) { assert.Equal(tt.want.serverId, got.serverId) assert.Equal(tt.want.runLimit, got.runJobsLimit) assert.Equal(tt.want.runInterval, got.runJobsInterval) + assert.Equal(tt.want.monitorInterval, got.monitorInterval) assert.NotNil(got.jobRepoFn) assert.NotNil(got.runningJobs) assert.NotNil(got.registeredJobs) diff --git a/website/content/docs/configuration/controller.mdx b/website/content/docs/configuration/controller.mdx index b395a8edfe..b465614cfe 100644 --- a/website/content/docs/configuration/controller.mdx +++ b/website/content/docs/configuration/controller.mdx @@ -52,7 +52,7 @@ description will be read. - `max_idle_connections` - Can be used to control the maximum number of idle connections in the idle connection pool. If `max_open_connections` is greater than 0 but less than - `max_idle_connections`, then `max_idle_connections` will be reduced to match the `max_open_connections` limit. + `max_idle_connections`, then `max_idle_connections` will be reduced to match the `max_open_connections` limit. Setting this value to 0 will mean that no idle connections are retained. If not set or set to less than 0, the default [sql.DB](https://pkg.go.dev/database/sql#DB.SetMaxIdleConns) setting will be used. @@ -89,8 +89,18 @@ description will be read. to all tokens from all auth methods). Valid time units are anything specified by Golang's [ParseDuration()](https://golang.org/pkg/time/#ParseDuration) method. Default is 1 day. +- `scheduler` - The configuration block that specifies the job scheduler behavior on the controller. + + - `job_run_interval` - The interval at which the scheduler will call the database to check if + there are any jobs that need to run. Default is 1 minute. + + - `monitor_interval` - The interval at which the scheduler will check and interrupt any defuncted + jobs that were running on another scheduler. A job is considered defuncted if it has not reported + a status to the database for 5 minutes. Once a job is interrupted it will be run immediate on the + first controller available. Default is 30 seconds. + - `graceful_shutdown_wait_duration` - Amount of time Boundary will wait before initiating the shutdown procedure, - after receiving a shutdown signal. In this state, Boundary still processes requests as normal but replies + after receiving a shutdown signal. In this state, Boundary still processes requests as normal but replies with `503 Service Unavailable` to any health requests. This is designed to allow an operator to configure load-balancers to preemptively stop new traffic to a Boundary instance that is going away. Valid time units are anything specified by Go's [ParseDuration()](https://golang.org/pkg/time/#ParseDuration) method. Only