feat(scheduler): set intervals from config (#2443)

pull/2445/head
Louis Ruch 4 years ago committed by GitHub
parent d0a61b29b8
commit 5812a42ba3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -627,7 +627,7 @@ func (c *Command) Run(args []string) int {
if c.flagCreateLoopbackHostPlugin {
c.DevLoopbackHostPluginId = "pl_1234567890"
c.EnabledPlugins = append(c.EnabledPlugins, base.EnabledPluginHostLoopback)
c.Config.Controller.SchedulerRunJobInterval = 100 * time.Millisecond
c.Config.Controller.Scheduler.JobRunIntervalDuration = 100 * time.Millisecond
}
switch c.flagDatabaseUrl {
case "":

@ -132,10 +132,11 @@ type Config struct {
}
type Controller struct {
Name string `hcl:"name"`
Description string `hcl:"description"`
Database *Database `hcl:"database"`
PublicClusterAddr string `hcl:"public_cluster_addr"`
Name string `hcl:"name"`
Description string `hcl:"description"`
Database *Database `hcl:"database"`
PublicClusterAddr string `hcl:"public_cluster_addr"`
Scheduler *Scheduler `hcl:"scheduler"`
// AuthTokenTimeToLive is the total valid lifetime of a token denoted by time.Duration
AuthTokenTimeToLive interface{} `hcl:"auth_token_time_to_live"`
@ -158,12 +159,6 @@ type Controller struct {
//
// TODO: This field is currently internal.
StatusGracePeriodDuration time.Duration `hcl:"-"`
// SchedulerRunJobInterval is the time interval between waking up the
// scheduler to run pending jobs.
//
// TODO: This field is currently internal.
SchedulerRunJobInterval time.Duration `hcl:"-"`
}
func (c *Controller) InitNameIfEmpty() error {
@ -238,6 +233,21 @@ type Database struct {
SkipSharedLockAcquisition bool `hcl:"skip_shared_lock_acquisition"`
}
// Scheduler is the configuration block that specifies the job scheduler behavior on the controller
type Scheduler struct {
// JobRunInterval is the time interval between waking up the
// scheduler to run pending jobs.
//
JobRunInterval interface{} `hcl:"job_run_interval"`
JobRunIntervalDuration time.Duration
// MonitorInterval is the time interval between waking up the
// scheduler to monitor for jobs that are defunct.
//
MonitorInterval interface{} `hcl:"monitor_interval"`
MonitorIntervalDuration time.Duration
}
type Plugins struct {
ExecutionDir string `hcl:"execution_dir"`
}
@ -387,6 +397,24 @@ func Parse(d string) (*Config, error) {
result.Controller.GracefulShutdownWaitDuration = t
}
if result.Controller.Scheduler != nil {
if result.Controller.Scheduler.JobRunInterval != "" {
t, err := parseutil.ParseDurationSecond(result.Controller.Scheduler.JobRunInterval)
if err != nil {
return result, err
}
result.Controller.Scheduler.JobRunIntervalDuration = t
}
if result.Controller.Scheduler.MonitorInterval != "" {
t, err := parseutil.ParseDurationSecond(result.Controller.Scheduler.MonitorInterval)
if err != nil {
return result, err
}
result.Controller.Scheduler.MonitorIntervalDuration = t
}
}
if result.Controller.Database != nil {
if result.Controller.Database.MaxOpenConnectionsRaw != nil {
switch t := result.Controller.Database.MaxOpenConnectionsRaw.(type) {

@ -539,6 +539,96 @@ func TestParsingName(t *testing.T) {
}
}
func TestParsingSchedulerIntervals(t *testing.T) {
t.Parallel()
cases := []struct {
name string
config string
wantErr bool
wantMonitorInterval time.Duration
wantRunJobInterval time.Duration
}{
{
name: "invalid-run-interval",
config: `
controller {
scheduler {
job_run_interval = "hello"
}
}
`,
wantErr: true,
},
{
name: "invalid-monitor-interval",
config: `
controller {
scheduler {
monitor_interval = "hello"
}
}
`,
wantErr: true,
},
{
name: "valid-undefined",
config: `controller { scheduler {} }`,
wantMonitorInterval: 0,
wantRunJobInterval: 0,
},
{
name: "run-job-interval",
config: `
controller {
scheduler {
job_run_interval = "10m"
}
}
`,
wantMonitorInterval: 0,
wantRunJobInterval: 10 * time.Minute,
},
{
name: "monitor-interval",
config: `
controller {
scheduler {
monitor_interval = "6h"
}
}
`,
wantMonitorInterval: 6 * time.Hour,
wantRunJobInterval: 0,
},
{
name: "both",
config: `
controller {
scheduler {
monitor_interval = "7d"
job_run_interval = "20s"
}
}
`,
wantMonitorInterval: 7 * 24 * time.Hour,
wantRunJobInterval: 20 * time.Second,
},
}
for _, tt := range cases {
tt := tt
t.Run(tt.name, func(t *testing.T) {
out, err := Parse(tt.config)
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, tt.wantMonitorInterval, out.Controller.Scheduler.MonitorIntervalDuration)
assert.Equal(t, tt.wantRunJobInterval, out.Controller.Scheduler.JobRunIntervalDuration)
})
}
}
func TestWorkerTags(t *testing.T) {
defaultStateFn := func(t *testing.T, tags string) {
t.Setenv("BOUNDARY_WORKER_TAGS", tags)

@ -294,8 +294,13 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
}
// TODO: Allow setting run jobs limit from config
schedulerOpts := []scheduler.Option{scheduler.WithRunJobsLimit(-1)}
if c.conf.RawConfig.Controller.SchedulerRunJobInterval > 0 {
schedulerOpts = append(schedulerOpts, scheduler.WithRunJobsInterval(c.conf.RawConfig.Controller.SchedulerRunJobInterval))
if sche := c.conf.RawConfig.Controller.Scheduler; sche != nil {
if sche.JobRunIntervalDuration > 0 {
schedulerOpts = append(schedulerOpts, scheduler.WithRunJobsInterval(sche.JobRunIntervalDuration))
}
if sche.MonitorIntervalDuration > 0 {
schedulerOpts = append(schedulerOpts, scheduler.WithMonitorInterval(sche.MonitorIntervalDuration))
}
}
c.scheduler, err = scheduler.New(c.conf.RawConfig.Controller.Name, jobRepoFn, schedulerOpts...)
if err != nil {

@ -569,7 +569,11 @@ func TestControllerConfig(t testing.TB, ctx context.Context, tc *TestController,
if opts.Config.Controller.Name == "" {
require.NoError(t, opts.Config.Controller.InitNameIfEmpty())
}
opts.Config.Controller.SchedulerRunJobInterval = opts.SchedulerRunJobInterval
if opts.Config.Controller.Scheduler == nil {
opts.Config.Controller.Scheduler = new(config.Scheduler)
}
opts.Config.Controller.Scheduler.JobRunIntervalDuration = opts.SchedulerRunJobInterval
switch {
case opts.DisableEventing:

@ -28,10 +28,11 @@ func TestScheduler_New(t *testing.T) {
}
type args struct {
serverId string
jobRepo jobRepoFactory
runLimit int
runInterval time.Duration
serverId string
jobRepo jobRepoFactory
runLimit int
runInterval time.Duration
monitorInterval time.Duration
}
tests := []struct {
name string
@ -64,9 +65,10 @@ func TestScheduler_New(t *testing.T) {
jobRepo: jobRepoFn,
},
want: args{
serverId: "test-server",
runLimit: 1,
runInterval: time.Minute,
serverId: "test-server",
runLimit: defaultRunJobsLimit,
runInterval: defaultRunJobsInterval,
monitorInterval: defaultMonitorInterval,
},
},
{
@ -79,9 +81,10 @@ func TestScheduler_New(t *testing.T) {
WithRunJobsInterval(time.Hour),
},
want: args{
serverId: "test-server",
runLimit: 1,
runInterval: time.Hour,
serverId: "test-server",
runLimit: defaultRunJobsLimit,
monitorInterval: defaultMonitorInterval,
runInterval: time.Hour,
},
},
{
@ -94,9 +97,10 @@ func TestScheduler_New(t *testing.T) {
WithRunJobsLimit(-1),
},
want: args{
serverId: "test-server",
runLimit: -1,
runInterval: time.Minute,
serverId: "test-server",
runLimit: -1,
runInterval: defaultRunJobsInterval,
monitorInterval: defaultMonitorInterval,
},
},
{
@ -109,13 +113,30 @@ func TestScheduler_New(t *testing.T) {
WithRunJobsLimit(20),
},
want: args{
serverId: "test-server",
runLimit: 20,
runInterval: time.Minute,
serverId: "test-server",
runLimit: 20,
runInterval: defaultRunJobsInterval,
monitorInterval: defaultMonitorInterval,
},
},
{
name: "valid-with-limit-and-interval",
name: "valid-with-monitor",
args: args{
serverId: "test-server",
jobRepo: jobRepoFn,
},
opts: []Option{
WithMonitorInterval(time.Hour),
},
want: args{
serverId: "test-server",
runLimit: defaultRunJobsLimit,
runInterval: defaultRunJobsInterval,
monitorInterval: time.Hour,
},
},
{
name: "valid-with-all",
args: args{
serverId: "test-server",
jobRepo: jobRepoFn,
@ -123,11 +144,13 @@ func TestScheduler_New(t *testing.T) {
opts: []Option{
WithRunJobsInterval(time.Hour),
WithRunJobsLimit(20),
WithMonitorInterval(2 * time.Hour),
},
want: args{
serverId: "test-server",
runLimit: 20,
runInterval: time.Hour,
serverId: "test-server",
runLimit: 20,
runInterval: time.Hour,
monitorInterval: 2 * time.Hour,
},
},
}
@ -149,6 +172,7 @@ func TestScheduler_New(t *testing.T) {
assert.Equal(tt.want.serverId, got.serverId)
assert.Equal(tt.want.runLimit, got.runJobsLimit)
assert.Equal(tt.want.runInterval, got.runJobsInterval)
assert.Equal(tt.want.monitorInterval, got.monitorInterval)
assert.NotNil(got.jobRepoFn)
assert.NotNil(got.runningJobs)
assert.NotNil(got.registeredJobs)

@ -52,7 +52,7 @@ description will be read.
- `max_idle_connections` - Can be used to control the maximum number of
idle connections in the idle connection pool.
If `max_open_connections` is greater than 0 but less than
`max_idle_connections`, then `max_idle_connections` will be reduced to match the `max_open_connections` limit.
`max_idle_connections`, then `max_idle_connections` will be reduced to match the `max_open_connections` limit.
Setting this value to 0 will mean that no idle connections are retained.
If not set or set to less than 0, the default
[sql.DB](https://pkg.go.dev/database/sql#DB.SetMaxIdleConns) setting will be used.
@ -89,8 +89,18 @@ description will be read.
to all tokens from all auth methods). Valid time units are anything specified by Golang's
[ParseDuration()](https://golang.org/pkg/time/#ParseDuration) method. Default is 1 day.
- `scheduler` - The configuration block that specifies the job scheduler behavior on the controller.
- `job_run_interval` - The interval at which the scheduler will call the database to check if
there are any jobs that need to run. Default is 1 minute.
- `monitor_interval` - The interval at which the scheduler will check and interrupt any defuncted
jobs that were running on another scheduler. A job is considered defuncted if it has not reported
a status to the database for 5 minutes. Once a job is interrupted it will be run immediate on the
first controller available. Default is 30 seconds.
- `graceful_shutdown_wait_duration` - Amount of time Boundary will wait before initiating the shutdown procedure,
after receiving a shutdown signal. In this state, Boundary still processes requests as normal but replies
after receiving a shutdown signal. In this state, Boundary still processes requests as normal but replies
with `503 Service Unavailable` to any health requests. This is designed to allow an operator to configure
load-balancers to preemptively stop new traffic to a Boundary instance that is going away. Valid time units
are anything specified by Go's [ParseDuration()](https://golang.org/pkg/time/#ParseDuration) method. Only

Loading…
Cancel
Save