From 9663404f1a460eefe20411a31bf3f7402bd12fd6 Mon Sep 17 00:00:00 2001 From: Louis Ruch Date: Sat, 11 Jun 2022 13:23:28 -0700 Subject: [PATCH] feat(scheduler): Support unlimited job runs (#2173) --- internal/daemon/controller/controller.go | 5 ++--- internal/scheduler/job/options.go | 5 +++-- internal/scheduler/job/options_test.go | 2 +- internal/scheduler/job/query.go | 2 +- internal/scheduler/job/repository_run.go | 12 +++++++++++- internal/scheduler/job/repository_run_test.go | 14 ++++++++++++-- internal/scheduler/options.go | 5 +++-- internal/scheduler/scheduler.go | 2 +- internal/scheduler/scheduler_test.go | 17 ++++++++++++++++- 9 files changed, 50 insertions(+), 14 deletions(-) diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index 6c651a21f1..9fa9162dcf 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -247,9 +247,8 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { jobRepoFn := func() (*job.Repository, error) { return job.NewRepository(dbase, dbase, c.kms) } - // TODO: the RunJobsLimit is temporary until a better fix gets in. This - // currently caps the scheduler at running 10 jobs per interval. - schedulerOpts := []scheduler.Option{scheduler.WithRunJobsLimit(10)} + // TODO: Allow setting run jobs limit from config + schedulerOpts := []scheduler.Option{scheduler.WithRunJobsLimit(-1)} if c.conf.RawConfig.Controller.SchedulerRunJobInterval > 0 { schedulerOpts = append(schedulerOpts, scheduler.WithRunJobsInterval(c.conf.RawConfig.Controller.SchedulerRunJobInterval)) } diff --git a/internal/scheduler/job/options.go b/internal/scheduler/job/options.go index 5b0cc64cfe..f1413d3797 100644 --- a/internal/scheduler/job/options.go +++ b/internal/scheduler/job/options.go @@ -24,7 +24,7 @@ type Option func(*options) // options = how options are represented type options struct { withNextRunIn time.Duration - withRunJobsLimit uint + withRunJobsLimit int withLimit int withName string withServerId string @@ -47,7 +47,8 @@ func WithNextRunIn(d time.Duration) Option { // WithRunJobsLimit provides an option to provide the number of jobs to run. // If WithRunJobsLimit == 0, then default run jobs limit is used. -func WithRunJobsLimit(l uint) Option { +// If WithRunJobsLimit < 0, then no limit is used. +func WithRunJobsLimit(l int) Option { return func(o *options) { o.withRunJobsLimit = l if o.withRunJobsLimit == 0 { diff --git a/internal/scheduler/job/options_test.go b/internal/scheduler/job/options_test.go index 1b7e4e78db..cf7f768c90 100644 --- a/internal/scheduler/job/options_test.go +++ b/internal/scheduler/job/options_test.go @@ -31,7 +31,7 @@ func Test_GetOpts(t *testing.T) { opts := getOpts(WithRunJobsLimit(0)) testOpts := getDefaultOptions() assert.Equal(opts, testOpts) - assert.Equal(uint(defaultRunJobsLimit), opts.withRunJobsLimit) + assert.Equal(defaultRunJobsLimit, opts.withRunJobsLimit) }) t.Run("WithLimit", func(t *testing.T) { assert := assert.New(t) diff --git a/internal/scheduler/job/query.go b/internal/scheduler/job/query.go index 48f5e185e6..a4e93ee153 100644 --- a/internal/scheduler/job/query.go +++ b/internal/scheduler/job/query.go @@ -8,7 +8,7 @@ const runJobsQuery = ` job_plugin_id, job_name, ? from job_jobs_to_run order by next_scheduled_run asc - limit ? + %s on conflict (job_plugin_id, job_name) where status = 'running' diff --git a/internal/scheduler/job/repository_run.go b/internal/scheduler/job/repository_run.go index 0c630b3323..0137fb29d2 100644 --- a/internal/scheduler/job/repository_run.go +++ b/internal/scheduler/job/repository_run.go @@ -23,10 +23,20 @@ func (r *Repository) RunJobs(ctx context.Context, serverId string, opt ...Option } opts := getOpts(opt...) + var limit string + switch { + case opts.withRunJobsLimit == 0: + // zero signals the defaults should be used. + limit = fmt.Sprintf("limit %d", defaultRunJobsLimit) + case opts.withRunJobsLimit > 0: + limit = fmt.Sprintf("limit %d", opts.withRunJobsLimit) + } + + query := fmt.Sprintf(runJobsQuery, limit) var runs []*Run _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error { - rows, err := w.Query(ctx, runJobsQuery, []interface{}{serverId, opts.withRunJobsLimit}) + rows, err := w.Query(ctx, query, []interface{}{serverId}) if err != nil { return errors.Wrap(ctx, err, op) } diff --git a/internal/scheduler/job/repository_run_test.go b/internal/scheduler/job/repository_run_test.go index 3647b32806..ed8b661600 100644 --- a/internal/scheduler/job/repository_run_test.go +++ b/internal/scheduler/job/repository_run_test.go @@ -112,7 +112,7 @@ func TestRepository_RunJobs_Limits(t *testing.T) { kms := kms.TestKms(t, conn, wrapper) iam.TestRepo(t, conn, wrapper) - numJobs := 10 + numJobs := 20 server := testController(t, conn, wrapper) tests := []struct { @@ -122,7 +122,7 @@ func TestRepository_RunJobs_Limits(t *testing.T) { }{ { name: "with-more-than-available", - opts: []Option{WithRunJobsLimit(uint(numJobs * 2))}, + opts: []Option{WithRunJobsLimit(numJobs * 2)}, wantLen: numJobs, }, { @@ -139,6 +139,11 @@ func TestRepository_RunJobs_Limits(t *testing.T) { opts: []Option{WithRunJobsLimit(0)}, wantLen: defaultRunJobsLimit, }, + { + name: "unlimited", + opts: []Option{WithRunJobsLimit(-1)}, + wantLen: numJobs, + }, } for _, tt := range tests { @@ -156,6 +161,11 @@ func TestRepository_RunJobs_Limits(t *testing.T) { got, err := repo.RunJobs(context.Background(), server.PrivateId, tt.opts...) require.NoError(err) assert.Len(got, tt.wantLen) + + // Clean up jobs for next run + rows, err := rw.Query(context.Background(), "delete from job", nil) + require.NoError(err) + _ = rows.Close() }) } } diff --git a/internal/scheduler/options.go b/internal/scheduler/options.go index d8e4816590..6acccef335 100644 --- a/internal/scheduler/options.go +++ b/internal/scheduler/options.go @@ -24,7 +24,7 @@ type Option func(*options) // options = how options are represented type options struct { withNextRunIn time.Duration - withRunJobsLimit uint + withRunJobsLimit int withRunJobInterval time.Duration withMonitorInterval time.Duration withInterruptThreshold time.Duration @@ -43,7 +43,8 @@ func getDefaultOptions() options { // WithRunJobsLimit provides an option to provide the number of jobs that will be requested // by the scheduler when querying for jobs to run. // If WithRunJobsLimit == 0, then default run jobs limit is used. -func WithRunJobsLimit(l uint) Option { +// If WithRunJobsLimit < 0, then no limit is used. +func WithRunJobsLimit(l int) Option { return func(o *options) { o.withRunJobsLimit = l if o.withRunJobsLimit == 0 { diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 313a2ce55b..626b5d00c7 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -29,7 +29,7 @@ type Scheduler struct { runningJobs *sync.Map started ua.Bool - runJobsLimit uint + runJobsLimit int runJobsInterval time.Duration monitorInterval time.Duration interruptThreshold time.Duration diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 6f25524ff7..5cbdefcd44 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -30,7 +30,7 @@ func TestScheduler_New(t *testing.T) { type args struct { serverId string jobRepo jobRepoFactory - runLimit uint + runLimit int runInterval time.Duration } tests := []struct { @@ -84,6 +84,21 @@ func TestScheduler_New(t *testing.T) { runInterval: time.Hour, }, }, + { + name: "valid-with-unlimited", + args: args{ + serverId: "test-server", + jobRepo: jobRepoFn, + }, + opts: []Option{ + WithRunJobsLimit(-1), + }, + want: args{ + serverId: "test-server", + runLimit: -1, + runInterval: time.Minute, + }, + }, { name: "valid-with-limit", args: args{