diff --git a/internal/credential/vault/repository_credential_store.go b/internal/credential/vault/repository_credential_store.go index 1b58a36f21..8ba3d3e27a 100644 --- a/internal/credential/vault/repository_credential_store.go +++ b/internal/credential/vault/repository_credential_store.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/boundary/internal/errors" "github.com/hashicorp/boundary/internal/kms" "github.com/hashicorp/boundary/internal/oplog" + "github.com/hashicorp/boundary/internal/scheduler" "github.com/hashicorp/go-secure-stdlib/parseutil" vault "github.com/hashicorp/vault/api" ) @@ -724,8 +725,8 @@ func (r *Repository) DeleteCredentialStore(ctx context.Context, publicId string, if rows > 0 { // Schedule token revocation and credential store cleanup jobs to run immediately - _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, tokenRevocationJobName, 0) - _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, credentialStoreCleanupJobName, 0) + _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, tokenRevocationJobName, 0, scheduler.WithRunNow(true)) + _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, credentialStoreCleanupJobName, 0, scheduler.WithRunNow(true)) } return rows, nil } diff --git a/internal/host/plugin/job_set_sync.go b/internal/host/plugin/job_set_sync.go index 30a73b6ef4..be49d0fb5b 100644 --- a/internal/host/plugin/job_set_sync.go +++ b/internal/host/plugin/job_set_sync.go @@ -191,7 +191,7 @@ func (r *SetSyncJob) syncSets(ctx context.Context, setIds []string) error { return errors.New(ctx, errors.InvalidParameter, op, "no set ids") } - // Fist, look up the sets corresponding to the set IDs + // First, look up the sets corresponding to the set IDs var setAggs []*hostSetAgg if err := r.reader.SearchWhere(ctx, &setAggs, "public_id in (?)", []interface{}{setIds}); err != nil { return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("can't retrieve sets %v", setIds))) diff --git a/internal/host/plugin/repository_host_catalog.go b/internal/host/plugin/repository_host_catalog.go index f914a34977..2302521f64 100644 --- a/internal/host/plugin/repository_host_catalog.go +++ b/internal/host/plugin/repository_host_catalog.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/boundary/internal/observability/event" "github.com/hashicorp/boundary/internal/oplog" hostplugin "github.com/hashicorp/boundary/internal/plugin/host" + "github.com/hashicorp/boundary/internal/scheduler" pb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostcatalogs" pbset "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostsets" plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin" @@ -503,7 +504,7 @@ func (r *Repository) UpdateCatalog(ctx context.Context, c *HostCatalog, version } if runSyncJob { - _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0) + _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0, scheduler.WithRunNow(true)) } // Even if we didn't update any records, if we were able to find the record diff --git a/internal/host/plugin/repository_host_set.go b/internal/host/plugin/repository_host_set.go index 482bb16888..ec65ba5a8a 100644 --- a/internal/host/plugin/repository_host_set.go +++ b/internal/host/plugin/repository_host_set.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/boundary/internal/libs/patchstruct" "github.com/hashicorp/boundary/internal/oplog" hostplugin "github.com/hashicorp/boundary/internal/plugin/host" + "github.com/hashicorp/boundary/internal/scheduler" pb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostsets" plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin" "google.golang.org/grpc/codes" @@ -164,7 +165,7 @@ func (r *Repository) CreateSet(ctx context.Context, scopeId string, s *HostSet, } // The set now exists in the plugin, sync it immediately. - _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0) + _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0, scheduler.WithRunNow(true)) plg, err := r.getPlugin(ctx, c.GetPluginId()) if err != nil { @@ -522,13 +523,15 @@ func (r *Repository) UpdateSet(ctx context.Context, scopeId string, s *HostSet, switch { case updateAttributes: // Request a host sync since we have updated attributes. - _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0) + _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0, scheduler.WithRunNow(true)) case updateSyncInterval: + var schOpt []scheduler.Option tilNextSync := time.Until(returnedSet.LastSyncTime.AsTime().Add(time.Duration(returnedSet.SyncIntervalSeconds) * time.Second)) - if tilNextSync < 0 { + if tilNextSync <= 0 { tilNextSync = 0 + schOpt = append(schOpt, scheduler.WithRunNow(true)) } - _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, tilNextSync) + _ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, tilNextSync, schOpt...) } return returnedSet, hosts, plg, numUpdated, nil diff --git a/internal/scheduler/additional_verification_test.go b/internal/scheduler/additional_verification_test.go index ec2779c703..16fb614abc 100644 --- a/internal/scheduler/additional_verification_test.go +++ b/internal/scheduler/additional_verification_test.go @@ -518,6 +518,106 @@ func TestSchedulerFinalStatusUpdate(t *testing.T) { close(jobStatus) } +func TestSchedulerRunNow(t *testing.T) { + // do not use t.Parallel() since it relies on the sys eventer + require := require.New(t) + conn, _ := db.TestSetup(t, "postgres") + wrapper := db.TestWrapper(t) + rw := db.New(conn) + kmsCache := kms.TestKms(t, conn, wrapper) + iam.TestRepo(t, conn, wrapper) + event.TestEnableEventing(t, true) + testConfig := event.DefaultEventerConfig() + testLock := &sync.Mutex{} + testLogger := hclog.New(&hclog.LoggerOptions{ + Mutex: testLock, + }) + err := event.InitSysEventer(testLogger, testLock, "TestSchedulerWorkflow", event.WithEventerConfig(testConfig)) + require.NoError(err) + + // Create test scheduler that only runs jobs every hour + sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Hour)) + + jobCh := make(chan struct{}) + jobReady := make(chan struct{}) + testDone := make(chan struct{}) + fn := func(_ context.Context) error { + select { + case <-testDone: + return nil + case jobReady <- struct{}{}: + } + <-jobCh + return nil + } + tj := testJob{name: "name", description: "desc", fn: fn, nextRunIn: time.Hour} + err = sched.RegisterJob(context.Background(), tj) + require.NoError(err) + + baseCtx, baseCnl := context.WithCancel(context.Background()) + defer baseCnl() + var wg sync.WaitGroup + err = sched.Start(baseCtx, &wg) + require.NoError(err) + + // Wait for scheduler to run job + <-jobReady + require.Equal(mapLen(sched.runningJobs), 1) + + runJob, ok := sched.runningJobs.Load(tj.name) + require.True(ok) + runId := runJob.(*runningJob).runId + + // Complete job + jobCh <- struct{}{} + + repo, err := job.NewRepository(rw, rw, kmsCache) + require.NoError(err) + waitForRunStatus(t, repo, runId, string(job.Completed)) + + // Update job to run immediately once scheduling loop is called + err = sched.UpdateJobNextRunInAtLeast(context.Background(), tj.name, 0) + require.NoError(err) + + // Verify no job is not running + select { + case <-jobReady: + t.Fatal("expected not job to be running") + default: + } + + // Trigger scheduling loop + sched.RunNow() + + // Wait for scheduler to run job + <-jobReady + require.Equal(mapLen(sched.runningJobs), 1) + + runJob, ok = sched.runningJobs.Load(tj.name) + require.True(ok) + runId = runJob.(*runningJob).runId + + // Complete job + jobCh <- struct{}{} + + waitForRunStatus(t, repo, runId, string(job.Completed)) + + // Update job to run again with RunNow option + err = sched.UpdateJobNextRunInAtLeast(context.Background(), tj.name, 0, WithRunNow(true)) + require.NoError(err) + + // Wait for scheduler to run job + <-jobReady + require.Equal(mapLen(sched.runningJobs), 1) + + // Complete job + jobCh <- struct{}{} + + // Cleanup tests + close(testDone) + close(jobCh) +} + func waitForRunStatus(t *testing.T, repo *job.Repository, runId, status string) *job.Run { t.Helper() var run *job.Run diff --git a/internal/scheduler/options.go b/internal/scheduler/options.go index 65ad7dd9ae..d8e4816590 100644 --- a/internal/scheduler/options.go +++ b/internal/scheduler/options.go @@ -28,6 +28,7 @@ type options struct { withRunJobInterval time.Duration withMonitorInterval time.Duration withInterruptThreshold time.Duration + withRunNow bool } func getDefaultOptions() options { @@ -95,3 +96,12 @@ func WithNextRunIn(d time.Duration) Option { o.withNextRunIn = d } } + +// WithRunNow provides an option to trigger the scheduling loop after updating the next run time +// of a specific job. Note this does not guarantee the job will run on the scheduler that updated +// the job run time. +func WithRunNow(b bool) Option { + return func(o *options) { + o.withRunNow = b + } +} diff --git a/internal/scheduler/options_test.go b/internal/scheduler/options_test.go index 4453215b90..d7dd6f25e3 100644 --- a/internal/scheduler/options_test.go +++ b/internal/scheduler/options_test.go @@ -66,4 +66,12 @@ func Test_GetOpts(t *testing.T) { testOpts := getDefaultOptions() assert.Equal(opts, testOpts) }) + t.Run("WithRunNow", func(t *testing.T) { + assert := assert.New(t) + opts := getOpts(WithRunNow(true)) + testOpts := getDefaultOptions() + assert.NotEqual(opts, testOpts) + testOpts.withRunNow = true + assert.Equal(opts, testOpts) + }) } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index dcc7ecef83..6f2b5a764d 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -33,6 +33,7 @@ type Scheduler struct { runJobsInterval time.Duration monitorInterval time.Duration interruptThreshold time.Duration + runNow chan struct{} } // New creates a new Scheduler @@ -62,6 +63,7 @@ func New(serverId string, jobRepoFn jobRepoFactory, opt ...Option) (*Scheduler, runJobsInterval: opts.withRunJobInterval, monitorInterval: opts.withMonitorInterval, interruptThreshold: opts.withInterruptThreshold, + runNow: make(chan struct{}, 1), }, nil } @@ -101,8 +103,8 @@ func (s *Scheduler) RegisterJob(ctx context.Context, j Job, opt ...Option) error // the nextRunInAtLeast parameter or the current NextScheduledRun time value, which ever is sooner. // If nextRunInAtLeast == 0 the job will be available to run immediately. // -// All options are ignored. -func (s *Scheduler) UpdateJobNextRunInAtLeast(ctx context.Context, name string, nextRunInAtLeast time.Duration, _ ...Option) error { +// WithRunNow is the only supported option. +func (s *Scheduler) UpdateJobNextRunInAtLeast(ctx context.Context, name string, nextRunInAtLeast time.Duration, opt ...Option) error { const op = "scheduler.(Scheduler).UpdateJobNextRunInAtLeast" if name == "" { return errors.New(ctx, errors.InvalidParameter, op, "missing name") @@ -116,6 +118,11 @@ func (s *Scheduler) UpdateJobNextRunInAtLeast(ctx context.Context, name string, if err != nil { return errors.Wrap(ctx, err, op) } + + opts := getOpts(opt...) + if opts.withRunNow { + s.RunNow() + } return nil } @@ -165,9 +172,19 @@ func (s *Scheduler) Start(ctx context.Context, wg *sync.WaitGroup) error { return nil } +// RunNow attempts to trigger the scheduling loop, if the scheduling loop is actively running it will +// cause the loop to run again immediately after finishing. +func (s *Scheduler) RunNow() { + // Do not block on writing to runNow chan. + select { + case s.runNow <- struct{}{}: + default: + } +} + func (s *Scheduler) start(ctx context.Context) { const op = "scheduler.(Scheduler).start" - timer := time.NewTimer(s.runJobsInterval) + timer := time.NewTimer(0) var wg sync.WaitGroup for { select { @@ -177,30 +194,36 @@ func (s *Scheduler) start(ctx context.Context) { event.WriteSysEvent(ctx, op, "scheduling loop shutting down", "server id", s.serverId) return case <-timer.C: - repo, err := s.jobRepoFn() - if err != nil { - event.WriteError(ctx, op, err, event.WithInfoMsg("error creating job repo")) - break - } + case <-s.runNow: + } - runs, err := repo.RunJobs(ctx, s.serverId, job.WithRunJobsLimit(s.runJobsLimit)) - if err != nil { - event.WriteError(ctx, op, err, event.WithInfoMsg("error getting jobs to run from repo")) - break - } + s.schedule(ctx, &wg) + timer.Reset(s.runJobsInterval) + } +} - for _, r := range runs { - err := s.runJob(ctx, &wg, r) - if err != nil { - event.WriteError(ctx, op, err, event.WithInfoMsg("error starting job")) - if _, inner := repo.FailRun(ctx, r.PrivateId, 0, 0); inner != nil { - event.WriteError(ctx, op, inner, event.WithInfoMsg("error updating failed job run")) - } - } +func (s *Scheduler) schedule(ctx context.Context, wg *sync.WaitGroup) { + const op = "scheduler.(Scheduler).schedule" + repo, err := s.jobRepoFn() + if err != nil { + event.WriteError(ctx, op, err, event.WithInfoMsg("error creating job repo")) + return + } + + runs, err := repo.RunJobs(ctx, s.serverId, job.WithRunJobsLimit(s.runJobsLimit)) + if err != nil { + event.WriteError(ctx, op, err, event.WithInfoMsg("error getting jobs to run from repo")) + return + } + + for _, r := range runs { + err := s.runJob(ctx, wg, r) + if err != nil { + event.WriteError(ctx, op, err, event.WithInfoMsg("error starting job")) + if _, inner := repo.FailRun(ctx, r.PrivateId, 0, 0); inner != nil { + event.WriteError(ctx, op, inner, event.WithInfoMsg("error updating failed job run")) } } - - timer.Reset(s.runJobsInterval) } } diff --git a/internal/servers/controller/controller.go b/internal/servers/controller/controller.go index 74b1822c49..4844396ddd 100644 --- a/internal/servers/controller/controller.go +++ b/internal/servers/controller/controller.go @@ -250,12 +250,12 @@ func (c *Controller) Start() error { if err := c.registerJobs(); err != nil { return fmt.Errorf("error registering jobs: %w", err) } - if err := c.scheduler.Start(c.baseContext, c.schedulerWg); err != nil { - return fmt.Errorf("error starting scheduler: %w", err) - } if err := c.startListeners(c.baseContext); err != nil { return fmt.Errorf("error starting controller listeners: %w", err) } + if err := c.scheduler.Start(c.baseContext, c.schedulerWg); err != nil { + return fmt.Errorf("error starting scheduler: %w", err) + } c.tickerWg.Add(5) go func() { diff --git a/internal/tests/api/hosts/host_test.go b/internal/tests/api/hosts/host_test.go index 688fdee106..26a2b8193c 100644 --- a/internal/tests/api/hosts/host_test.go +++ b/internal/tests/api/hosts/host_test.go @@ -84,7 +84,7 @@ func comparableHostSlice(in []*hosts.Host) []hosts.Host { func TestPluginHosts(t *testing.T) { assert, require := assert.New(t), require.New(t) - tc := controller.NewTestController(t, &controller.TestControllerOpts{SchedulerRunJobInterval: 100 * time.Millisecond}) + tc := controller.NewTestController(t, nil) defer tc.Shutdown() client := tc.Client() @@ -94,6 +94,7 @@ func TestPluginHosts(t *testing.T) { hc, err := hostcatalogs.NewClient(client).Create(tc.Context(), "plugin", proj.GetPublicId(), hostcatalogs.WithPluginId("pl_1234567890")) + require.NoError(err) require.NotNil(hc) hset, err := hostsets.NewClient(client).Create(tc.Context(), hc.Item.Id, hostsets.WithAttributes(map[string]interface{}{ diff --git a/internal/tests/api/hostsets/host_set_test.go b/internal/tests/api/hostsets/host_set_test.go index 3fda81dd09..e94cf2af75 100644 --- a/internal/tests/api/hostsets/host_set_test.go +++ b/internal/tests/api/hostsets/host_set_test.go @@ -3,8 +3,11 @@ package hostsets_test import ( "fmt" "net/http" + "strings" "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/boundary/api" "github.com/hashicorp/boundary/api/hostcatalogs" "github.com/hashicorp/boundary/api/hosts" @@ -143,7 +146,17 @@ func TestList_Plugin(t *testing.T) { } ul, err = hClient.List(tc.Context(), hc.Item.Id) require.NoError(err) - assert.ElementsMatch(comparableSetSlice(expected), comparableSetSlice(ul.Items)) + assert.Empty( + cmp.Diff( + expected, + ul.Items, + cmpopts.IgnoreUnexported(hostsets.HostSet{}), + cmpopts.IgnoreFields(hostsets.HostSet{}, "Version", "UpdatedTime"), + cmpopts.SortSlices(func(x, y *hostsets.HostSet) bool { + return x.Id < y.Id + }), + ), + ) filterItem := ul.Items[3] ul, err = hClient.List(tc.Context(), hc.Item.Id, @@ -183,31 +196,41 @@ func TestCrud(t *testing.T) { require.NoError(err) require.NotNil(hc) - checkHost := func(t *testing.T, step string, h *hostsets.HostSet, err error, wantedName string, wantVersion uint32) { - t.Helper() - require.NoError(err, step) - assert.NotNil(h, "returned no resource", step) - gotName := "" - if h.Name != "" { - gotName = h.Name + retryableUpdate := func(c *hostsets.Client, hcId string, version uint32, opts ...hostsets.Option) (*hostsets.HostSetUpdateResult, bool) { + var retried bool + h, err := c.Update(tc.Context(), hcId, version, opts...) + if err != nil && strings.Contains(err.Error(), "set version mismatch") { + // Got a version mismatch, this happens because the sync set job runs in the background + // and can increment the version between operations in this test, try again + retried = true + h, err = c.Update(tc.Context(), hcId, version+1, opts...) } - assert.Equal(wantedName, gotName, step) - assert.Equal(wantVersion, h.Version) + require.NoError(err) + assert.NotNil(h) + return h, retried } hClient := hostsets.NewClient(client) h, err := hClient.Create(tc.Context(), hc.Item.Id, hostsets.WithName("foo")) - checkHost(t, "create", h.Item, err, "foo", 1) + require.NoError(err) + assert.Equal("foo", h.Item.Name) + assert.Equal(uint32(1), h.Item.Version) h, err = hClient.Read(tc.Context(), h.Item.Id) - checkHost(t, "read", h.Item, err, "foo", 1) + require.NoError(err) + assert.Equal("foo", h.Item.Name) + assert.Equal(uint32(1), h.Item.Version) h, err = hClient.Update(tc.Context(), h.Item.Id, h.Item.Version, hostsets.WithName("bar")) - checkHost(t, "update", h.Item, err, "bar", 2) + require.NoError(err) + assert.Equal("bar", h.Item.Name) + assert.Equal(uint32(2), h.Item.Version) h, err = hClient.Update(tc.Context(), h.Item.Id, h.Item.Version, hostsets.DefaultName()) - checkHost(t, "update", h.Item, err, "", 3) + require.NoError(err) + assert.Equal("", h.Item.Name) + assert.Equal(uint32(3), h.Item.Version) _, err = hClient.Delete(tc.Context(), h.Item.Id) assert.NoError(err) @@ -224,20 +247,31 @@ func TestCrud(t *testing.T) { h, err = hClient.Create(tc.Context(), c.Item.Id, hostsets.WithName("foo"), hostsets.WithAttributes(map[string]interface{}{"foo": "bar"}), hostsets.WithPreferredEndpoints([]string{"dns:test"})) - checkHost(t, "create", h.Item, err, "foo", 1) + require.NoError(err) + assert.Equal("foo", h.Item.Name) + assert.Equal(uint32(1), h.Item.Version) h, err = hClient.Read(tc.Context(), h.Item.Id) - checkHost(t, "read", h.Item, err, "foo", 1) + require.NoError(err) + assert.Equal("foo", h.Item.Name) + assert.Equal(uint32(1), h.Item.Version) - h, err = hClient.Update(tc.Context(), h.Item.Id, h.Item.Version, hostsets.WithName("bar"), + h, retried := retryableUpdate(hClient, h.Item.Id, h.Item.Version, hostsets.WithName("bar"), hostsets.WithAttributes(map[string]interface{}{"foo": nil, "key": "val"}), hostsets.WithPreferredEndpoints([]string{"dns:update"})) - checkHost(t, "update", h.Item, err, "bar", 2) + require.NoError(err) + assert.Equal("bar", h.Item.Name) + switch retried { + case true: + assert.Equal(uint32(3), h.Item.Version) + default: + assert.Equal(uint32(2), h.Item.Version) + } assert.Equal(h.Item.Attributes, map[string]interface{}{"key": "val"}) assert.Equal(h.Item.PreferredEndpoints, []string{"dns:update"}) - h, err = hClient.Update(tc.Context(), h.Item.Id, h.Item.Version, hostsets.WithSyncIntervalSeconds(42)) + h, _ = retryableUpdate(hClient, h.Item.Id, h.Item.Version, hostsets.WithSyncIntervalSeconds(42)) require.NoError(err) require.NotNil(h) assert.Equal(int32(42), h.Item.SyncIntervalSeconds)