Ensure job progress is always reported (#1396)

* Update completed and total when job finishes

* Remove unused option in test

* Add comment

* Use writer for updates and inserts
pull/1399/head
Louis Ruch 5 years ago committed by GitHub
parent 74aba58c73
commit 2dae498123
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,6 +2,7 @@ package scheduler
import (
"context"
"errors"
"fmt"
"runtime"
"testing"
@ -338,3 +339,102 @@ func TestSchedulerMonitorLoop(t *testing.T) {
require.NoError(err)
assert.Equal(string(job.Interrupted), run.Status)
}
func TestSchedulerFinalStatusUpdate(t *testing.T) {
t.Parallel()
assert, require := assert.New(t), require.New(t)
conn, _ := db.TestSetup(t, "postgres")
wrapper := db.TestWrapper(t)
rw := db.New(conn)
kmsCache := kms.TestKms(t, conn, wrapper)
iam.TestRepo(t, conn, wrapper)
sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Second))
jobReady := make(chan struct{})
jobErr := make(chan error)
fn := func(_ context.Context) error {
jobReady <- struct{}{}
return <-jobErr
}
jobStatus := make(chan JobStatus)
status := func() JobStatus {
return <-jobStatus
}
tj := testJob{name: "name", description: "desc", fn: fn, statusFn: status, nextRunIn: time.Hour}
err := sched.RegisterJob(context.Background(), tj)
require.NoError(err)
baseCtx, baseCnl := context.WithCancel(context.Background())
// call unexported start in order to bypass monitor loop
go sched.start(baseCtx)
// Wait for scheduler to run job
<-jobReady
require.Equal(mapLen(sched.runningJobs), 1)
runJob, ok := sched.runningJobs.Load(tj.name)
require.True(ok)
runId := runJob.(*runningJob).runId
// Complete job with error so FailRun is called
jobErr <- errors.New("scary error")
// Report status
jobStatus <- JobStatus{Total: 10, Completed: 10}
repo, err := job.NewRepository(rw, rw, kmsCache)
require.NoError(err)
run := waitForRunStatus(t, repo, runId, string(job.Failed))
assert.Equal(uint32(10), run.TotalCount)
assert.Equal(uint32(10), run.CompletedCount)
// Wait for scheduler to run job again
<-jobReady
require.Equal(mapLen(sched.runningJobs), 1)
runJob, ok = sched.runningJobs.Load(tj.name)
require.True(ok)
runId = runJob.(*runningJob).runId
// Complete job without error so CompleteRun is called
jobErr <- nil
// Report status
jobStatus <- JobStatus{Total: 20, Completed: 20}
repo, err = job.NewRepository(rw, rw, kmsCache)
require.NoError(err)
run = waitForRunStatus(t, repo, runId, string(job.Completed))
assert.Equal(uint32(20), run.TotalCount)
assert.Equal(uint32(20), run.CompletedCount)
baseCnl()
}
func waitForRunStatus(t *testing.T, repo *job.Repository, runId, status string) *job.Run {
t.Helper()
var run *job.Run
// Fail test if waiting for run status change takes longer than 5 seconds
timeout := time.NewTimer(5 * time.Second)
for {
select {
case <-timeout.C:
t.Fatal(fmt.Errorf("timed out waiting for job run %v to reach status: %v", runId, status))
case <-time.After(100 * time.Millisecond):
}
var err error
run, err = repo.LookupRun(context.Background(), runId)
require.NoError(t, err)
if run.Status == status {
break
}
}
return run
}

@ -49,7 +49,7 @@ func TestJobWorkflow(t *testing.T) {
require.NoError(err)
assert.Nil(newRuns)
run, err = repo.CompleteRun(context.Background(), run.PrivateId, time.Hour)
run, err = repo.CompleteRun(context.Background(), run.PrivateId, time.Hour, 0, 0)
require.NoError(err)
assert.Equal(Completed.string(), run.Status)
@ -80,7 +80,7 @@ func TestJobWorkflow(t *testing.T) {
require.NoError(err)
assert.Nil(newRuns)
newRun, err = repo.FailRun(context.Background(), newRun.PrivateId)
newRun, err = repo.FailRun(context.Background(), newRun.PrivateId, 0, 0)
require.NoError(err)
assert.Equal(Failed.string(), newRun.Status)

@ -65,8 +65,10 @@ const completeRunQuery = `
update
job_run
set
status = 'completed',
end_time = current_timestamp
completed_count = ?,
total_count = ?,
status = 'completed',
end_time = current_timestamp
where
private_id = ?
and status = 'running'
@ -77,8 +79,10 @@ const failRunQuery = `
update
job_run
set
status = 'failed',
end_time = current_timestamp
completed_count = ?,
total_count = ?,
status = 'failed',
end_time = current_timestamp
where
private_id = ?
and status = 'running'

@ -26,7 +26,7 @@ func (r *Repository) RunJobs(ctx context.Context, serverId string, opt ...Option
var runs []*Run
_, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{},
func(r db.Reader, w db.Writer) error {
rows, err := r.Query(ctx, runJobsQuery, []interface{}{serverId, opts.withRunJobsLimit})
rows, err := w.Query(ctx, runJobsQuery, []interface{}{serverId, opts.withRunJobsLimit})
if err != nil {
return errors.Wrap(err, op)
}
@ -65,7 +65,7 @@ func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed
run.PrivateId = runId
_, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{},
func(r db.Reader, w db.Writer) error {
rows, err := r.Query(ctx, updateProgressQuery, []interface{}{completed, total, runId})
rows, err := w.Query(ctx, updateProgressQuery, []interface{}{completed, total, runId})
if err != nil {
return errors.Wrap(err, op)
}
@ -105,7 +105,8 @@ func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed
}
// CompleteRun updates the Run repository entry for the provided runId.
// It sets the status to 'completed' and updates the run's EndTime to the current database time.
// It sets the status to 'completed', updates the run's EndTime to the current database
// time, and sets the completed and total counts.
// CompleteRun also updates the Job repository entry that is associated with this run,
// setting the job's NextScheduledRun to the current database time incremented by the nextRunIn
// parameter.
@ -114,7 +115,7 @@ func (r *Repository) UpdateProgress(ctx context.Context, runId string, completed
// or interrupted), any future calls to CompleteRun will return an error with Code
// errors.InvalidJobRunState.
// All options are ignored.
func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn time.Duration, _ ...Option) (*Run, error) {
func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn time.Duration, completed, total int, _ ...Option) (*Run, error) {
const op = "job.(Repository).CompleteRun"
if runId == "" {
return nil, errors.New(errors.InvalidParameter, op, "missing run id")
@ -124,7 +125,11 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti
run.PrivateId = runId
_, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{},
func(r db.Reader, w db.Writer) error {
rows, err := r.Query(ctx, completeRunQuery, []interface{}{runId})
// TODO (lcr 07/2021) this can potentially overwrite completed and total values
// persisted by the scheduler's monitor jobs loop.
// Add an on update sql trigger to protect the job_run table, once progress
// values are used in the critical path.
rows, err := w.Query(ctx, completeRunQuery, []interface{}{completed, total, runId})
if err != nil {
return errors.Wrap(err, op)
}
@ -153,7 +158,7 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti
return errors.New(errors.InvalidJobRunState, op, fmt.Sprintf("job run was in a final run state: %v", run.Status))
}
rows1, err := r.Query(ctx, setNextScheduledRunQuery, []interface{}{int(nextRunIn.Round(time.Second).Seconds()), run.JobPluginId, run.JobName})
rows1, err := w.Query(ctx, setNextScheduledRunQuery, []interface{}{int(nextRunIn.Round(time.Second).Seconds()), run.JobPluginId, run.JobName})
if err != nil {
return errors.Wrap(err, op, errors.WithMsg(fmt.Sprintf("failed to set next scheduled run time for job: %s", run.JobName)))
}
@ -183,13 +188,14 @@ func (r *Repository) CompleteRun(ctx context.Context, runId string, nextRunIn ti
}
// FailRun updates the Run repository entry for the provided runId.
// It sets the status to 'failed' and updates the run's EndTime to the current database time.
// It sets the status to 'failed' and updates the run's EndTime to the current database
// time, and sets the completed and total counts.
//
// Once a run has been persisted with a final run status (completed, failed
// or interrupted), any future calls to FailRun will return an error with Code
// errors.InvalidJobRunState.
// All options are ignored.
func (r *Repository) FailRun(ctx context.Context, runId string, _ ...Option) (*Run, error) {
func (r *Repository) FailRun(ctx context.Context, runId string, completed, total int, _ ...Option) (*Run, error) {
const op = "job.(Repository).FailRun"
if runId == "" {
return nil, errors.New(errors.InvalidParameter, op, "missing run id")
@ -199,7 +205,11 @@ func (r *Repository) FailRun(ctx context.Context, runId string, _ ...Option) (*R
run.PrivateId = runId
_, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{},
func(r db.Reader, w db.Writer) error {
rows, err := r.Query(ctx, failRunQuery, []interface{}{runId})
// TODO (lcr 07/2021) this can potentially overwrite completed and total values
// persisted by the scheduler's monitor jobs loop.
// Add an on update sql trigger to protect the job_run table, once progress
// values are used in the critical path.
rows, err := w.Query(ctx, failRunQuery, []interface{}{completed, total, runId})
if err != nil {
return errors.Wrap(err, op)
}
@ -263,7 +273,7 @@ func (r *Repository) InterruptRuns(ctx context.Context, interruptThreshold time.
var runs []*Run
_, err := r.writer.DoTx(ctx, db.StdRetryCnt, db.ExpBackoff{},
func(r db.Reader, w db.Writer) error {
rows, err := r.Query(ctx, query, args)
rows, err := w.Query(ctx, query, args)
if err != nil {
return errors.Wrap(err, op)
}

@ -188,7 +188,7 @@ func TestRepository_RunJobsOrder(t *testing.T) {
assert.Equal(run.JobPluginId, firstJob.PluginId)
// End first job with time between last and middle
_, err = repo.CompleteRun(context.Background(), run.PrivateId, -6*time.Hour)
_, err = repo.CompleteRun(context.Background(), run.PrivateId, -6*time.Hour, 0, 0)
require.NoError(err)
runs, err = repo.RunJobs(context.Background(), server.PrivateId)
@ -445,10 +445,14 @@ func TestRepository_CompleteRun(t *testing.T) {
server := testController(t, conn, wrapper)
job := testJob(t, conn, "name", "description", wrapper)
type args struct {
completed, total int
}
tests := []struct {
name string
orig *Run
nextRunIn time.Duration
args args
wantErr bool
wantErrCode errors.Code
wantErrMsg string
@ -513,6 +517,18 @@ func TestRepository_CompleteRun(t *testing.T) {
},
nextRunIn: time.Hour,
},
{
name: "valid-with-progress",
orig: &Run{
JobRun: &store.JobRun{
JobName: job.Name,
JobPluginId: job.PluginId,
ServerId: server.PrivateId,
Status: Running.string(),
},
},
args: args{completed: 10, total: 20},
},
}
for _, tt := range tests {
@ -531,7 +547,7 @@ func TestRepository_CompleteRun(t *testing.T) {
privateId = tt.orig.PrivateId
}
got, err := repo.CompleteRun(context.Background(), privateId, tt.nextRunIn)
got, err := repo.CompleteRun(context.Background(), privateId, tt.nextRunIn, tt.args.completed, tt.args.total)
if tt.wantErr {
require.Error(err)
assert.Truef(errors.Match(errors.T(tt.wantErrCode), err), "Unexpected error %s", err)
@ -549,6 +565,8 @@ func TestRepository_CompleteRun(t *testing.T) {
require.NotNil(got)
assert.NotEmpty(got.EndTime)
assert.Equal(Completed.string(), got.Status)
assert.Equal(tt.args.completed, int(got.CompletedCount))
assert.Equal(tt.args.total, int(got.TotalCount))
updatedJob, err := repo.LookupJob(context.Background(), tt.orig.JobName)
assert.NoError(err)
@ -573,7 +591,7 @@ func TestRepository_CompleteRun(t *testing.T) {
require.NoError(err)
require.NotNil(repo)
got, err := repo.CompleteRun(context.Background(), "fake-run-id", time.Hour)
got, err := repo.CompleteRun(context.Background(), "fake-run-id", time.Hour, 0, 0)
require.Error(err)
require.Nil(got)
assert.Truef(errors.Match(errors.T(errors.RecordNotFound), err), "Unexpected error %s", err)
@ -592,9 +610,13 @@ func TestRepository_FailRun(t *testing.T) {
server := testController(t, conn, wrapper)
job := testJob(t, conn, "name", "description", wrapper)
type args struct {
completed, total int
}
tests := []struct {
name string
orig *Run
args args
wantErr bool
wantErrCode errors.Code
wantErrMsg string
@ -658,6 +680,18 @@ func TestRepository_FailRun(t *testing.T) {
},
},
},
{
name: "valid-with-progress",
orig: &Run{
JobRun: &store.JobRun{
JobName: job.Name,
JobPluginId: job.PluginId,
ServerId: server.PrivateId,
Status: Running.string(),
},
},
args: args{completed: 10, total: 20},
},
}
for _, tt := range tests {
@ -676,7 +710,7 @@ func TestRepository_FailRun(t *testing.T) {
privateId = tt.orig.PrivateId
}
got, err := repo.FailRun(context.Background(), privateId)
got, err := repo.FailRun(context.Background(), privateId, tt.args.completed, tt.args.total)
if tt.wantErr {
require.Error(err)
assert.Truef(errors.Match(errors.T(tt.wantErrCode), err), "Unexpected error %s", err)
@ -694,6 +728,8 @@ func TestRepository_FailRun(t *testing.T) {
require.NotNil(got)
assert.NotEmpty(got.EndTime)
assert.Equal(Failed.string(), got.Status)
assert.Equal(tt.args.completed, int(got.CompletedCount))
assert.Equal(tt.args.total, int(got.TotalCount))
// Delete job run so it does not clash with future runs
_, err = repo.deleteRun(context.Background(), privateId)
@ -707,7 +743,7 @@ func TestRepository_FailRun(t *testing.T) {
require.NoError(err)
require.NotNil(repo)
got, err := repo.FailRun(context.Background(), "fake-run-id")
got, err := repo.FailRun(context.Background(), "fake-run-id", 0, 0)
require.Error(err)
require.Nil(got)
assert.Truef(errors.Match(errors.T(errors.RecordNotFound), err), "Unexpected error %s", err)

@ -186,7 +186,7 @@ func (s *Scheduler) start(ctx context.Context) {
err := s.runJob(ctx, r)
if err != nil {
s.logger.Error("error starting job", "error", err)
if _, inner := repo.FailRun(ctx, r.PrivateId); inner != nil {
if _, inner := repo.FailRun(ctx, r.PrivateId, 0, 0); inner != nil {
s.logger.Error("error updating failed job run", "error", inner)
}
}
@ -223,6 +223,10 @@ func (s *Scheduler) runJob(ctx context.Context, r *job.Run) error {
defer rj.cancelCtx()
runErr := j.Run(jobContext)
var updateErr error
// Get final status report to update run progress with
status := j.Status()
switch runErr {
case nil:
s.logger.Debug("job run complete", "run id", r.PrivateId, "name", j.Name())
@ -230,10 +234,10 @@ func (s *Scheduler) runJob(ctx context.Context, r *job.Run) error {
if inner != nil {
s.logger.Error("error getting next run time", "name", j.Name(), "error", inner)
}
_, updateErr = repo.CompleteRun(jobContext, r.PrivateId, nextRun)
_, updateErr = repo.CompleteRun(jobContext, r.PrivateId, nextRun, status.Completed, status.Total)
default:
s.logger.Debug("job run failed", "run id", r.PrivateId, "name", j.Name(), "error", runErr)
_, updateErr = repo.FailRun(jobContext, r.PrivateId)
_, updateErr = repo.FailRun(jobContext, r.PrivateId, status.Completed, status.Total)
}
if updateErr != nil {

Loading…
Cancel
Save