feat(scheduler): Support unlimited job runs (#2173)

pull/2148/head^2
Louis Ruch 4 years ago committed by GitHub
parent c4285b29cc
commit 9663404f1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -247,9 +247,8 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
jobRepoFn := func() (*job.Repository, error) {
return job.NewRepository(dbase, dbase, c.kms)
}
// TODO: the RunJobsLimit is temporary until a better fix gets in. This
// currently caps the scheduler at running 10 jobs per interval.
schedulerOpts := []scheduler.Option{scheduler.WithRunJobsLimit(10)}
// TODO: Allow setting run jobs limit from config
schedulerOpts := []scheduler.Option{scheduler.WithRunJobsLimit(-1)}
if c.conf.RawConfig.Controller.SchedulerRunJobInterval > 0 {
schedulerOpts = append(schedulerOpts, scheduler.WithRunJobsInterval(c.conf.RawConfig.Controller.SchedulerRunJobInterval))
}

@ -24,7 +24,7 @@ type Option func(*options)
// options = how options are represented
type options struct {
withNextRunIn time.Duration
withRunJobsLimit uint
withRunJobsLimit int
withLimit int
withName string
withServerId string
@ -47,7 +47,8 @@ func WithNextRunIn(d time.Duration) Option {
// WithRunJobsLimit provides an option to provide the number of jobs to run.
// If WithRunJobsLimit == 0, then default run jobs limit is used.
func WithRunJobsLimit(l uint) Option {
// If WithRunJobsLimit < 0, then no limit is used.
func WithRunJobsLimit(l int) Option {
return func(o *options) {
o.withRunJobsLimit = l
if o.withRunJobsLimit == 0 {

@ -31,7 +31,7 @@ func Test_GetOpts(t *testing.T) {
opts := getOpts(WithRunJobsLimit(0))
testOpts := getDefaultOptions()
assert.Equal(opts, testOpts)
assert.Equal(uint(defaultRunJobsLimit), opts.withRunJobsLimit)
assert.Equal(defaultRunJobsLimit, opts.withRunJobsLimit)
})
t.Run("WithLimit", func(t *testing.T) {
assert := assert.New(t)

@ -8,7 +8,7 @@ const runJobsQuery = `
job_plugin_id, job_name, ?
from job_jobs_to_run
order by next_scheduled_run asc
limit ?
%s
on conflict
(job_plugin_id, job_name)
where status = 'running'

@ -23,10 +23,20 @@ func (r *Repository) RunJobs(ctx context.Context, serverId string, opt ...Option
}
opts := getOpts(opt...)
var limit string
switch {
case opts.withRunJobsLimit == 0:
// zero signals the defaults should be used.
limit = fmt.Sprintf("limit %d", defaultRunJobsLimit)
case opts.withRunJobsLimit > 0:
limit = fmt.Sprintf("limit %d", opts.withRunJobsLimit)
}
query := fmt.Sprintf(runJobsQuery, limit)
var runs []*Run
_, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{},
func(r db.Reader, w db.Writer) error {
rows, err := w.Query(ctx, runJobsQuery, []interface{}{serverId, opts.withRunJobsLimit})
rows, err := w.Query(ctx, query, []interface{}{serverId})
if err != nil {
return errors.Wrap(ctx, err, op)
}

@ -112,7 +112,7 @@ func TestRepository_RunJobs_Limits(t *testing.T) {
kms := kms.TestKms(t, conn, wrapper)
iam.TestRepo(t, conn, wrapper)
numJobs := 10
numJobs := 20
server := testController(t, conn, wrapper)
tests := []struct {
@ -122,7 +122,7 @@ func TestRepository_RunJobs_Limits(t *testing.T) {
}{
{
name: "with-more-than-available",
opts: []Option{WithRunJobsLimit(uint(numJobs * 2))},
opts: []Option{WithRunJobsLimit(numJobs * 2)},
wantLen: numJobs,
},
{
@ -139,6 +139,11 @@ func TestRepository_RunJobs_Limits(t *testing.T) {
opts: []Option{WithRunJobsLimit(0)},
wantLen: defaultRunJobsLimit,
},
{
name: "unlimited",
opts: []Option{WithRunJobsLimit(-1)},
wantLen: numJobs,
},
}
for _, tt := range tests {
@ -156,6 +161,11 @@ func TestRepository_RunJobs_Limits(t *testing.T) {
got, err := repo.RunJobs(context.Background(), server.PrivateId, tt.opts...)
require.NoError(err)
assert.Len(got, tt.wantLen)
// Clean up jobs for next run
rows, err := rw.Query(context.Background(), "delete from job", nil)
require.NoError(err)
_ = rows.Close()
})
}
}

@ -24,7 +24,7 @@ type Option func(*options)
// options = how options are represented
type options struct {
withNextRunIn time.Duration
withRunJobsLimit uint
withRunJobsLimit int
withRunJobInterval time.Duration
withMonitorInterval time.Duration
withInterruptThreshold time.Duration
@ -43,7 +43,8 @@ func getDefaultOptions() options {
// WithRunJobsLimit provides an option to provide the number of jobs that will be requested
// by the scheduler when querying for jobs to run.
// If WithRunJobsLimit == 0, then default run jobs limit is used.
func WithRunJobsLimit(l uint) Option {
// If WithRunJobsLimit < 0, then no limit is used.
func WithRunJobsLimit(l int) Option {
return func(o *options) {
o.withRunJobsLimit = l
if o.withRunJobsLimit == 0 {

@ -29,7 +29,7 @@ type Scheduler struct {
runningJobs *sync.Map
started ua.Bool
runJobsLimit uint
runJobsLimit int
runJobsInterval time.Duration
monitorInterval time.Duration
interruptThreshold time.Duration

@ -30,7 +30,7 @@ func TestScheduler_New(t *testing.T) {
type args struct {
serverId string
jobRepo jobRepoFactory
runLimit uint
runLimit int
runInterval time.Duration
}
tests := []struct {
@ -84,6 +84,21 @@ func TestScheduler_New(t *testing.T) {
runInterval: time.Hour,
},
},
{
name: "valid-with-unlimited",
args: args{
serverId: "test-server",
jobRepo: jobRepoFn,
},
opts: []Option{
WithRunJobsLimit(-1),
},
want: args{
serverId: "test-server",
runLimit: -1,
runInterval: time.Minute,
},
},
{
name: "valid-with-limit",
args: args{

Loading…
Cancel
Save