From 2dae498123007696db2f8432c28b73c86623c17a Mon Sep 17 00:00:00 2001 From: Louis Ruch Date: Fri, 16 Jul 2021 18:39:43 -0400 Subject: [PATCH] Ensure job progress is always reported (#1396) * Update completed and total when job finishes * Remove unused option in test * Add comment * Use writer for updates and inserts --- .../scheduler/additional_verification_test.go | 100 ++++++++++++++++++ .../job/additional_verification_test.go | 4 +- internal/scheduler/job/query.go | 12 ++- internal/scheduler/job/repository_run.go | 30 ++++-- internal/scheduler/job/repository_run_test.go | 46 +++++++- internal/scheduler/scheduler.go | 10 +- 6 files changed, 178 insertions(+), 24 deletions(-) diff --git a/internal/scheduler/additional_verification_test.go b/internal/scheduler/additional_verification_test.go index d1f5335aa8..d89b8f674a 100644 --- a/internal/scheduler/additional_verification_test.go +++ b/internal/scheduler/additional_verification_test.go @@ -2,6 +2,7 @@ package scheduler import ( "context" + "errors" "fmt" "runtime" "testing" @@ -338,3 +339,102 @@ func TestSchedulerMonitorLoop(t *testing.T) { require.NoError(err) assert.Equal(string(job.Interrupted), run.Status) } + +func TestSchedulerFinalStatusUpdate(t *testing.T) { + t.Parallel() + assert, require := assert.New(t), require.New(t) + conn, _ := db.TestSetup(t, "postgres") + wrapper := db.TestWrapper(t) + rw := db.New(conn) + kmsCache := kms.TestKms(t, conn, wrapper) + iam.TestRepo(t, conn, wrapper) + + sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Second)) + + jobReady := make(chan struct{}) + jobErr := make(chan error) + fn := func(_ context.Context) error { + jobReady <- struct{}{} + return <-jobErr + } + + jobStatus := make(chan JobStatus) + status := func() JobStatus { + return <-jobStatus + } + tj := testJob{name: "name", description: "desc", fn: fn, statusFn: status, nextRunIn: time.Hour} + err := sched.RegisterJob(context.Background(), tj) + require.NoError(err) + + baseCtx, baseCnl := context.WithCancel(context.Background()) + // call unexported start in order to bypass monitor loop + go sched.start(baseCtx) + + // Wait for scheduler to run job + <-jobReady + + require.Equal(mapLen(sched.runningJobs), 1) + runJob, ok := sched.runningJobs.Load(tj.name) + require.True(ok) + runId := runJob.(*runningJob).runId + + // Complete job with error so FailRun is called + jobErr <- errors.New("scary error") + + // Report status + jobStatus <- JobStatus{Total: 10, Completed: 10} + + repo, err := job.NewRepository(rw, rw, kmsCache) + require.NoError(err) + + run := waitForRunStatus(t, repo, runId, string(job.Failed)) + assert.Equal(uint32(10), run.TotalCount) + assert.Equal(uint32(10), run.CompletedCount) + + // Wait for scheduler to run job again + <-jobReady + + require.Equal(mapLen(sched.runningJobs), 1) + runJob, ok = sched.runningJobs.Load(tj.name) + require.True(ok) + runId = runJob.(*runningJob).runId + + // Complete job without error so CompleteRun is called + jobErr <- nil + + // Report status + jobStatus <- JobStatus{Total: 20, Completed: 20} + + repo, err = job.NewRepository(rw, rw, kmsCache) + require.NoError(err) + + run = waitForRunStatus(t, repo, runId, string(job.Completed)) + assert.Equal(uint32(20), run.TotalCount) + assert.Equal(uint32(20), run.CompletedCount) + + baseCnl() +} + +func waitForRunStatus(t *testing.T, repo *job.Repository, runId, status string) *job.Run { + t.Helper() + var run *job.Run + + // Fail test if waiting for run status change takes longer than 5 seconds + timeout := time.NewTimer(5 * time.Second) + for { + select { + case <-timeout.C: + t.Fatal(fmt.Errorf("timed out waiting for job run %v to reach status: %v", runId, status)) + case <-time.After(100 * time.Millisecond): + } + + var err error + run, err = repo.LookupRun(context.Background(), runId) + require.NoError(t, err) + if run.Status == status { + break + } + } + + return run +} diff --git a/internal/scheduler/job/additional_verification_test.go b/internal/scheduler/job/additional_verification_test.go index 4117822301..420ff74f02 100644 --- a/internal/scheduler/job/additional_verification_test.go +++ b/internal/scheduler/job/additional_verification_test.go @@ -49,7 +49,7 @@ func TestJobWorkflow(t *testing.T) { require.NoError(err) assert.Nil(newRuns) - run, err = repo.CompleteRun(context.Background(), run.PrivateId, time.Hour) + run, err = repo.CompleteRun(context.Background(), run.PrivateId, time.Hour, 0, 0) require.NoError(err) assert.Equal(Completed.string(), run.Status) @@ -80,7 +80,7 @@ func TestJobWorkflow(t *testing.T) { require.NoError(err) assert.Nil(newRuns) - newRun, err = repo.FailRun(context.Background(), newRun.PrivateId) + newRun, err = repo.FailRun(context.Background(), newRun.PrivateId, 0, 0) require.NoError(err) assert.Equal(Failed.string(), newRun.Status) diff --git a/internal/scheduler/job/query.go b/internal/scheduler/job/query.go index d334f019b6..fc057d4c3f 100644 --- a/internal/scheduler/job/query.go +++ b/internal/scheduler/job/query.go @@ -65,8 +65,10 @@ const completeRunQuery = ` update job_run set - status = 'completed', - end_time = current_timestamp + completed_count = ?, + total_count = ?, + status = 'completed', + end_time = current_timestamp where private_id = ? and status = 'running' @@ -77,8 +79,10 @@ const failRunQuery = ` update job_run set - status = 'failed', - end_time = current_timestamp + completed_count = ?, + total_count = ?, + status = 'failed', + end_time = current_timestamp where private_id = ? and status = 'running' diff --git a/internal/scheduler/job/repository_run.go b/internal/scheduler/job/repository_run.go index b1f3f648b3..13512ed4cb 100644 --- a/internal/scheduler/job/repository_run.go +++ b/internal/scheduler/job/repository_run.go @@ -26,7 +26,7 @@ func (r *Repository) RunJobs(ctx context.Context, serverId string, opt ...Option var runs []*Run _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error { - rows, err := r.Query(ctx, runJobsQuery, []interface{}{serverId, opts.withRunJobsLimit}) + rows, err := w.Query(ctx, runJobsQuery, []interface{}{serverId, opts.withRunJobsLimit}) if err != nil { return errors.Wrap(err, op) } @@ -65,7 +65,7 @@ func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed run.PrivateId = runId _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error { - rows, err := r.Query(ctx, updateProgressQuery, []interface{}{completed, total, runId}) + rows, err := w.Query(ctx, updateProgressQuery, []interface{}{completed, total, runId}) if err != nil { return errors.Wrap(err, op) } @@ -105,7 +105,8 @@ func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed } // CompleteRun updates the Run repository entry for the provided runId. -// It sets the status to 'completed' and updates the run's EndTime to the current database time. +// It sets the status to 'completed', updates the run's EndTime to the current database +// time, and sets the completed and total counts. // CompleteRun also updates the Job repository entry that is associated with this run, // setting the job's NextScheduledRun to the current database time incremented by the nextRunIn // parameter. @@ -114,7 +115,7 @@ func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed // or interrupted), any future calls to CompleteRun will return an error with Code // errors.InvalidJobRunState. // All options are ignored. -func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn time.Duration, _ ...Option) (*Run, error) { +func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn time.Duration, completed, total int, _ ...Option) (*Run, error) { const op = "job.(Repository).CompleteRun" if runId == "" { return nil, errors.New(errors.InvalidParameter, op, "missing run id") @@ -124,7 +125,11 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti run.PrivateId = runId _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error { - rows, err := r.Query(ctx, completeRunQuery, []interface{}{runId}) + // TODO (lcr 07/2021) this can potentially overwrite completed and total values + // persisted by the scheduler's monitor jobs loop. + // Add an on update sql trigger to protect the job_run table, once progress + // values are used in the critical path. + rows, err := w.Query(ctx, completeRunQuery, []interface{}{completed, total, runId}) if err != nil { return errors.Wrap(err, op) } @@ -153,7 +158,7 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti return errors.New(errors.InvalidJobRunState, op, fmt.Sprintf("job run was in a final run state: %v", run.Status)) } - rows1, err := r.Query(ctx, setNextScheduledRunQuery, []interface{}{int(nextRunIn.Round(time.Second).Seconds()), run.JobPluginId, run.JobName}) + rows1, err := w.Query(ctx, setNextScheduledRunQuery, []interface{}{int(nextRunIn.Round(time.Second).Seconds()), run.JobPluginId, run.JobName}) if err != nil { return errors.Wrap(err, op, errors.WithMsg(fmt.Sprintf("failed to set next scheduled run time for job: %s", run.JobName))) } @@ -183,13 +188,14 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti } // FailRun updates the Run repository entry for the provided runId. -// It sets the status to 'failed' and updates the run's EndTime to the current database time. +// It sets the status to 'failed' and updates the run's EndTime to the current database +// time, and sets the completed and total counts. // // Once a run has been persisted with a final run status (completed, failed // or interrupted), any future calls to FailRun will return an error with Code // errors.InvalidJobRunState. // All options are ignored. -func (r *Repository) FailRun(ctx context.Context, runId string, _ ...Option) (*Run, error) { +func (r *Repository) FailRun(ctx context.Context, runId string, completed, total int, _ ...Option) (*Run, error) { const op = "job.(Repository).FailRun" if runId == "" { return nil, errors.New(errors.InvalidParameter, op, "missing run id") @@ -199,7 +205,11 @@ func (r *Repository) FailRun(ctx context.Context, runId string, _ ...Option) (*R run.PrivateId = runId _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error { - rows, err := r.Query(ctx, failRunQuery, []interface{}{runId}) + // TODO (lcr 07/2021) this can potentially overwrite completed and total values + // persisted by the scheduler's monitor jobs loop. + // Add an on update sql trigger to protect the job_run table, once progress + // values are used in the critical path. + rows, err := w.Query(ctx, failRunQuery, []interface{}{completed, total, runId}) if err != nil { return errors.Wrap(err, op) } @@ -263,7 +273,7 @@ func (r *Repository) InterruptRuns(ctx context.Context, interruptThreshold time. var runs []*Run _, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{}, func(r db.Reader, w db.Writer) error { - rows, err := r.Query(ctx, query, args) + rows, err := w.Query(ctx, query, args) if err != nil { return errors.Wrap(err, op) } diff --git a/internal/scheduler/job/repository_run_test.go b/internal/scheduler/job/repository_run_test.go index cf0b577caf..879963630d 100644 --- a/internal/scheduler/job/repository_run_test.go +++ b/internal/scheduler/job/repository_run_test.go @@ -188,7 +188,7 @@ func TestRepository_RunJobsOrder(t *testing.T) { assert.Equal(run.JobPluginId, firstJob.PluginId) // End first job with time between last and middle - _, err = repo.CompleteRun(context.Background(), run.PrivateId, -6*time.Hour) + _, err = repo.CompleteRun(context.Background(), run.PrivateId, -6*time.Hour, 0, 0) require.NoError(err) runs, err = repo.RunJobs(context.Background(), server.PrivateId) @@ -445,10 +445,14 @@ func TestRepository_CompleteRun(t *testing.T) { server := testController(t, conn, wrapper) job := testJob(t, conn, "name", "description", wrapper) + type args struct { + completed, total int + } tests := []struct { name string orig *Run nextRunIn time.Duration + args args wantErr bool wantErrCode errors.Code wantErrMsg string @@ -513,6 +517,18 @@ func TestRepository_CompleteRun(t *testing.T) { }, nextRunIn: time.Hour, }, + { + name: "valid-with-progress", + orig: &Run{ + JobRun: &store.JobRun{ + JobName: job.Name, + JobPluginId: job.PluginId, + ServerId: server.PrivateId, + Status: Running.string(), + }, + }, + args: args{completed: 10, total: 20}, + }, } for _, tt := range tests { @@ -531,7 +547,7 @@ func TestRepository_CompleteRun(t *testing.T) { privateId = tt.orig.PrivateId } - got, err := repo.CompleteRun(context.Background(), privateId, tt.nextRunIn) + got, err := repo.CompleteRun(context.Background(), privateId, tt.nextRunIn, tt.args.completed, tt.args.total) if tt.wantErr { require.Error(err) assert.Truef(errors.Match(errors.T(tt.wantErrCode), err), "Unexpected error %s", err) @@ -549,6 +565,8 @@ func TestRepository_CompleteRun(t *testing.T) { require.NotNil(got) assert.NotEmpty(got.EndTime) assert.Equal(Completed.string(), got.Status) + assert.Equal(tt.args.completed, int(got.CompletedCount)) + assert.Equal(tt.args.total, int(got.TotalCount)) updatedJob, err := repo.LookupJob(context.Background(), tt.orig.JobName) assert.NoError(err) @@ -573,7 +591,7 @@ func TestRepository_CompleteRun(t *testing.T) { require.NoError(err) require.NotNil(repo) - got, err := repo.CompleteRun(context.Background(), "fake-run-id", time.Hour) + got, err := repo.CompleteRun(context.Background(), "fake-run-id", time.Hour, 0, 0) require.Error(err) require.Nil(got) assert.Truef(errors.Match(errors.T(errors.RecordNotFound), err), "Unexpected error %s", err) @@ -592,9 +610,13 @@ func TestRepository_FailRun(t *testing.T) { server := testController(t, conn, wrapper) job := testJob(t, conn, "name", "description", wrapper) + type args struct { + completed, total int + } tests := []struct { name string orig *Run + args args wantErr bool wantErrCode errors.Code wantErrMsg string @@ -658,6 +680,18 @@ func TestRepository_FailRun(t *testing.T) { }, }, }, + { + name: "valid-with-progress", + orig: &Run{ + JobRun: &store.JobRun{ + JobName: job.Name, + JobPluginId: job.PluginId, + ServerId: server.PrivateId, + Status: Running.string(), + }, + }, + args: args{completed: 10, total: 20}, + }, } for _, tt := range tests { @@ -676,7 +710,7 @@ func TestRepository_FailRun(t *testing.T) { privateId = tt.orig.PrivateId } - got, err := repo.FailRun(context.Background(), privateId) + got, err := repo.FailRun(context.Background(), privateId, tt.args.completed, tt.args.total) if tt.wantErr { require.Error(err) assert.Truef(errors.Match(errors.T(tt.wantErrCode), err), "Unexpected error %s", err) @@ -694,6 +728,8 @@ func TestRepository_FailRun(t *testing.T) { require.NotNil(got) assert.NotEmpty(got.EndTime) assert.Equal(Failed.string(), got.Status) + assert.Equal(tt.args.completed, int(got.CompletedCount)) + assert.Equal(tt.args.total, int(got.TotalCount)) // Delete job run so it does not clash with future runs _, err = repo.deleteRun(context.Background(), privateId) @@ -707,7 +743,7 @@ func TestRepository_FailRun(t *testing.T) { require.NoError(err) require.NotNil(repo) - got, err := repo.FailRun(context.Background(), "fake-run-id") + got, err := repo.FailRun(context.Background(), "fake-run-id", 0, 0) require.Error(err) require.Nil(got) assert.Truef(errors.Match(errors.T(errors.RecordNotFound), err), "Unexpected error %s", err) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 8be6cd0134..9d5f87f02f 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -186,7 +186,7 @@ func (s *Scheduler) start(ctx context.Context) { err := s.runJob(ctx, r) if err != nil { s.logger.Error("error starting job", "error", err) - if _, inner := repo.FailRun(ctx, r.PrivateId); inner != nil { + if _, inner := repo.FailRun(ctx, r.PrivateId, 0, 0); inner != nil { s.logger.Error("error updating failed job run", "error", inner) } } @@ -223,6 +223,10 @@ func (s *Scheduler) runJob(ctx context.Context, r *job.Run) error { defer rj.cancelCtx() runErr := j.Run(jobContext) var updateErr error + + // Get final status report to update run progress with + status := j.Status() + switch runErr { case nil: s.logger.Debug("job run complete", "run id", r.PrivateId, "name", j.Name()) @@ -230,10 +234,10 @@ func (s *Scheduler) runJob(ctx context.Context, r *job.Run) error { if inner != nil { s.logger.Error("error getting next run time", "name", j.Name(), "error", inner) } - _, updateErr = repo.CompleteRun(jobContext, r.PrivateId, nextRun) + _, updateErr = repo.CompleteRun(jobContext, r.PrivateId, nextRun, status.Completed, status.Total) default: s.logger.Debug("job run failed", "run id", r.PrivateId, "name", j.Name(), "error", runErr) - _, updateErr = repo.FailRun(jobContext, r.PrivateId) + _, updateErr = repo.FailRun(jobContext, r.PrivateId, status.Completed, status.Total) } if updateErr != nil {