Merge pull request #1819 from hashicorp/louis-scheduler

feat(scheduler): Add support for run now option
pull/1822/head
Louis Ruch 4 years ago committed by GitHub
commit ac78ee6315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/kms"
"github.com/hashicorp/boundary/internal/oplog"
"github.com/hashicorp/boundary/internal/scheduler"
"github.com/hashicorp/go-secure-stdlib/parseutil"
vault "github.com/hashicorp/vault/api"
)
@ -724,8 +725,8 @@ func (r *Repository) DeleteCredentialStore(ctx context.Context, publicId string,
if rows > 0 {
// Schedule token revocation and credential store cleanup jobs to run immediately
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, tokenRevocationJobName, 0)
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, credentialStoreCleanupJobName, 0)
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, tokenRevocationJobName, 0, scheduler.WithRunNow(true))
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, credentialStoreCleanupJobName, 0, scheduler.WithRunNow(true))
}
return rows, nil
}

@ -191,7 +191,7 @@ func (r *SetSyncJob) syncSets(ctx context.Context, setIds []string) error {
return errors.New(ctx, errors.InvalidParameter, op, "no set ids")
}
// Fist, look up the sets corresponding to the set IDs
// First, look up the sets corresponding to the set IDs
var setAggs []*hostSetAgg
if err := r.reader.SearchWhere(ctx, &setAggs, "public_id in (?)", []interface{}{setIds}); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("can't retrieve sets %v", setIds)))

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/boundary/internal/observability/event"
"github.com/hashicorp/boundary/internal/oplog"
hostplugin "github.com/hashicorp/boundary/internal/plugin/host"
"github.com/hashicorp/boundary/internal/scheduler"
pb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostcatalogs"
pbset "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostsets"
plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin"
@ -503,7 +504,7 @@ func (r *Repository) UpdateCatalog(ctx context.Context, c *HostCatalog, version
}
if runSyncJob {
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0)
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0, scheduler.WithRunNow(true))
}
// Even if we didn't update any records, if we were able to find the record

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/boundary/internal/libs/patchstruct"
"github.com/hashicorp/boundary/internal/oplog"
hostplugin "github.com/hashicorp/boundary/internal/plugin/host"
"github.com/hashicorp/boundary/internal/scheduler"
pb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/hostsets"
plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin"
"google.golang.org/grpc/codes"
@ -164,7 +165,7 @@ func (r *Repository) CreateSet(ctx context.Context, scopeId string, s *HostSet,
}
// The set now exists in the plugin, sync it immediately.
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0)
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0, scheduler.WithRunNow(true))
plg, err := r.getPlugin(ctx, c.GetPluginId())
if err != nil {
@ -522,13 +523,15 @@ func (r *Repository) UpdateSet(ctx context.Context, scopeId string, s *HostSet,
switch {
case updateAttributes:
// Request a host sync since we have updated attributes.
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0)
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, 0, scheduler.WithRunNow(true))
case updateSyncInterval:
var schOpt []scheduler.Option
tilNextSync := time.Until(returnedSet.LastSyncTime.AsTime().Add(time.Duration(returnedSet.SyncIntervalSeconds) * time.Second))
if tilNextSync < 0 {
if tilNextSync <= 0 {
tilNextSync = 0
schOpt = append(schOpt, scheduler.WithRunNow(true))
}
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, tilNextSync)
_ = r.scheduler.UpdateJobNextRunInAtLeast(ctx, setSyncJobName, tilNextSync, schOpt...)
}
return returnedSet, hosts, plg, numUpdated, nil

@ -518,6 +518,106 @@ func TestSchedulerFinalStatusUpdate(t *testing.T) {
close(jobStatus)
}
func TestSchedulerRunNow(t *testing.T) {
// do not use t.Parallel() since it relies on the sys eventer
require := require.New(t)
conn, _ := db.TestSetup(t, "postgres")
wrapper := db.TestWrapper(t)
rw := db.New(conn)
kmsCache := kms.TestKms(t, conn, wrapper)
iam.TestRepo(t, conn, wrapper)
event.TestEnableEventing(t, true)
testConfig := event.DefaultEventerConfig()
testLock := &sync.Mutex{}
testLogger := hclog.New(&hclog.LoggerOptions{
Mutex: testLock,
})
err := event.InitSysEventer(testLogger, testLock, "TestSchedulerWorkflow", event.WithEventerConfig(testConfig))
require.NoError(err)
// Create test scheduler that only runs jobs every hour
sched := TestScheduler(t, conn, wrapper, WithRunJobsLimit(10), WithRunJobsInterval(time.Hour))
jobCh := make(chan struct{})
jobReady := make(chan struct{})
testDone := make(chan struct{})
fn := func(_ context.Context) error {
select {
case <-testDone:
return nil
case jobReady <- struct{}{}:
}
<-jobCh
return nil
}
tj := testJob{name: "name", description: "desc", fn: fn, nextRunIn: time.Hour}
err = sched.RegisterJob(context.Background(), tj)
require.NoError(err)
baseCtx, baseCnl := context.WithCancel(context.Background())
defer baseCnl()
var wg sync.WaitGroup
err = sched.Start(baseCtx, &wg)
require.NoError(err)
// Wait for scheduler to run job
<-jobReady
require.Equal(mapLen(sched.runningJobs), 1)
runJob, ok := sched.runningJobs.Load(tj.name)
require.True(ok)
runId := runJob.(*runningJob).runId
// Complete job
jobCh <- struct{}{}
repo, err := job.NewRepository(rw, rw, kmsCache)
require.NoError(err)
waitForRunStatus(t, repo, runId, string(job.Completed))
// Update job to run immediately once scheduling loop is called
err = sched.UpdateJobNextRunInAtLeast(context.Background(), tj.name, 0)
require.NoError(err)
// Verify no job is not running
select {
case <-jobReady:
t.Fatal("expected not job to be running")
default:
}
// Trigger scheduling loop
sched.RunNow()
// Wait for scheduler to run job
<-jobReady
require.Equal(mapLen(sched.runningJobs), 1)
runJob, ok = sched.runningJobs.Load(tj.name)
require.True(ok)
runId = runJob.(*runningJob).runId
// Complete job
jobCh <- struct{}{}
waitForRunStatus(t, repo, runId, string(job.Completed))
// Update job to run again with RunNow option
err = sched.UpdateJobNextRunInAtLeast(context.Background(), tj.name, 0, WithRunNow(true))
require.NoError(err)
// Wait for scheduler to run job
<-jobReady
require.Equal(mapLen(sched.runningJobs), 1)
// Complete job
jobCh <- struct{}{}
// Cleanup tests
close(testDone)
close(jobCh)
}
func waitForRunStatus(t *testing.T, repo *job.Repository, runId, status string) *job.Run {
t.Helper()
var run *job.Run

@ -28,6 +28,7 @@ type options struct {
withRunJobInterval time.Duration
withMonitorInterval time.Duration
withInterruptThreshold time.Duration
withRunNow bool
}
func getDefaultOptions() options {
@ -95,3 +96,12 @@ func WithNextRunIn(d time.Duration) Option {
o.withNextRunIn = d
}
}
// WithRunNow provides an option to trigger the scheduling loop after updating the next run time
// of a specific job. Note this does not guarantee the job will run on the scheduler that updated
// the job run time.
func WithRunNow(b bool) Option {
return func(o *options) {
o.withRunNow = b
}
}

@ -66,4 +66,12 @@ func Test_GetOpts(t *testing.T) {
testOpts := getDefaultOptions()
assert.Equal(opts, testOpts)
})
t.Run("WithRunNow", func(t *testing.T) {
assert := assert.New(t)
opts := getOpts(WithRunNow(true))
testOpts := getDefaultOptions()
assert.NotEqual(opts, testOpts)
testOpts.withRunNow = true
assert.Equal(opts, testOpts)
})
}

@ -33,6 +33,7 @@ type Scheduler struct {
runJobsInterval time.Duration
monitorInterval time.Duration
interruptThreshold time.Duration
runNow chan struct{}
}
// New creates a new Scheduler
@ -62,6 +63,7 @@ func New(serverId string, jobRepoFn jobRepoFactory, opt ...Option) (*Scheduler,
runJobsInterval: opts.withRunJobInterval,
monitorInterval: opts.withMonitorInterval,
interruptThreshold: opts.withInterruptThreshold,
runNow: make(chan struct{}, 1),
}, nil
}
@ -101,8 +103,8 @@ func (s *Scheduler) RegisterJob(ctx context.Context, j Job, opt ...Option) error
// the nextRunInAtLeast parameter or the current NextScheduledRun time value, which ever is sooner.
// If nextRunInAtLeast == 0 the job will be available to run immediately.
//
// All options are ignored.
func (s *Scheduler) UpdateJobNextRunInAtLeast(ctx context.Context, name string, nextRunInAtLeast time.Duration, _ ...Option) error {
// WithRunNow is the only supported option.
func (s *Scheduler) UpdateJobNextRunInAtLeast(ctx context.Context, name string, nextRunInAtLeast time.Duration, opt ...Option) error {
const op = "scheduler.(Scheduler).UpdateJobNextRunInAtLeast"
if name == "" {
return errors.New(ctx, errors.InvalidParameter, op, "missing name")
@ -116,6 +118,11 @@ func (s *Scheduler) UpdateJobNextRunInAtLeast(ctx context.Context, name string,
if err != nil {
return errors.Wrap(ctx, err, op)
}
opts := getOpts(opt...)
if opts.withRunNow {
s.RunNow()
}
return nil
}
@ -165,9 +172,19 @@ func (s *Scheduler) Start(ctx context.Context, wg *sync.WaitGroup) error {
return nil
}
// RunNow attempts to trigger the scheduling loop, if the scheduling loop is actively running it will
// cause the loop to run again immediately after finishing.
func (s *Scheduler) RunNow() {
// Do not block on writing to runNow chan.
select {
case s.runNow <- struct{}{}:
default:
}
}
func (s *Scheduler) start(ctx context.Context) {
const op = "scheduler.(Scheduler).start"
timer := time.NewTimer(s.runJobsInterval)
timer := time.NewTimer(0)
var wg sync.WaitGroup
for {
select {
@ -177,30 +194,36 @@ func (s *Scheduler) start(ctx context.Context) {
event.WriteSysEvent(ctx, op, "scheduling loop shutting down", "server id", s.serverId)
return
case <-timer.C:
repo, err := s.jobRepoFn()
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error creating job repo"))
break
}
case <-s.runNow:
}
runs, err := repo.RunJobs(ctx, s.serverId, job.WithRunJobsLimit(s.runJobsLimit))
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting jobs to run from repo"))
break
}
s.schedule(ctx, &wg)
timer.Reset(s.runJobsInterval)
}
}
for _, r := range runs {
err := s.runJob(ctx, &wg, r)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error starting job"))
if _, inner := repo.FailRun(ctx, r.PrivateId, 0, 0); inner != nil {
event.WriteError(ctx, op, inner, event.WithInfoMsg("error updating failed job run"))
}
}
func (s *Scheduler) schedule(ctx context.Context, wg *sync.WaitGroup) {
const op = "scheduler.(Scheduler).schedule"
repo, err := s.jobRepoFn()
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error creating job repo"))
return
}
runs, err := repo.RunJobs(ctx, s.serverId, job.WithRunJobsLimit(s.runJobsLimit))
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting jobs to run from repo"))
return
}
for _, r := range runs {
err := s.runJob(ctx, wg, r)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error starting job"))
if _, inner := repo.FailRun(ctx, r.PrivateId, 0, 0); inner != nil {
event.WriteError(ctx, op, inner, event.WithInfoMsg("error updating failed job run"))
}
}
timer.Reset(s.runJobsInterval)
}
}

@ -250,12 +250,12 @@ func (c *Controller) Start() error {
if err := c.registerJobs(); err != nil {
return fmt.Errorf("error registering jobs: %w", err)
}
if err := c.scheduler.Start(c.baseContext, c.schedulerWg); err != nil {
return fmt.Errorf("error starting scheduler: %w", err)
}
if err := c.startListeners(c.baseContext); err != nil {
return fmt.Errorf("error starting controller listeners: %w", err)
}
if err := c.scheduler.Start(c.baseContext, c.schedulerWg); err != nil {
return fmt.Errorf("error starting scheduler: %w", err)
}
c.tickerWg.Add(5)
go func() {

@ -84,7 +84,7 @@ func comparableHostSlice(in []*hosts.Host) []hosts.Host {
func TestPluginHosts(t *testing.T) {
assert, require := assert.New(t), require.New(t)
tc := controller.NewTestController(t, &controller.TestControllerOpts{SchedulerRunJobInterval: 100 * time.Millisecond})
tc := controller.NewTestController(t, nil)
defer tc.Shutdown()
client := tc.Client()
@ -94,6 +94,7 @@ func TestPluginHosts(t *testing.T) {
hc, err := hostcatalogs.NewClient(client).Create(tc.Context(), "plugin", proj.GetPublicId(),
hostcatalogs.WithPluginId("pl_1234567890"))
require.NoError(err)
require.NotNil(hc)
hset, err := hostsets.NewClient(client).Create(tc.Context(), hc.Item.Id, hostsets.WithAttributes(map[string]interface{}{

@ -3,8 +3,11 @@ package hostsets_test
import (
"fmt"
"net/http"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/boundary/api"
"github.com/hashicorp/boundary/api/hostcatalogs"
"github.com/hashicorp/boundary/api/hosts"
@ -143,7 +146,17 @@ func TestList_Plugin(t *testing.T) {
}
ul, err = hClient.List(tc.Context(), hc.Item.Id)
require.NoError(err)
assert.ElementsMatch(comparableSetSlice(expected), comparableSetSlice(ul.Items))
assert.Empty(
cmp.Diff(
expected,
ul.Items,
cmpopts.IgnoreUnexported(hostsets.HostSet{}),
cmpopts.IgnoreFields(hostsets.HostSet{}, "Version", "UpdatedTime"),
cmpopts.SortSlices(func(x, y *hostsets.HostSet) bool {
return x.Id < y.Id
}),
),
)
filterItem := ul.Items[3]
ul, err = hClient.List(tc.Context(), hc.Item.Id,
@ -183,31 +196,41 @@ func TestCrud(t *testing.T) {
require.NoError(err)
require.NotNil(hc)
checkHost := func(t *testing.T, step string, h *hostsets.HostSet, err error, wantedName string, wantVersion uint32) {
t.Helper()
require.NoError(err, step)
assert.NotNil(h, "returned no resource", step)
gotName := ""
if h.Name != "" {
gotName = h.Name
retryableUpdate := func(c *hostsets.Client, hcId string, version uint32, opts ...hostsets.Option) (*hostsets.HostSetUpdateResult, bool) {
var retried bool
h, err := c.Update(tc.Context(), hcId, version, opts...)
if err != nil && strings.Contains(err.Error(), "set version mismatch") {
// Got a version mismatch, this happens because the sync set job runs in the background
// and can increment the version between operations in this test, try again
retried = true
h, err = c.Update(tc.Context(), hcId, version+1, opts...)
}
assert.Equal(wantedName, gotName, step)
assert.Equal(wantVersion, h.Version)
require.NoError(err)
assert.NotNil(h)
return h, retried
}
hClient := hostsets.NewClient(client)
h, err := hClient.Create(tc.Context(), hc.Item.Id, hostsets.WithName("foo"))
checkHost(t, "create", h.Item, err, "foo", 1)
require.NoError(err)
assert.Equal("foo", h.Item.Name)
assert.Equal(uint32(1), h.Item.Version)
h, err = hClient.Read(tc.Context(), h.Item.Id)
checkHost(t, "read", h.Item, err, "foo", 1)
require.NoError(err)
assert.Equal("foo", h.Item.Name)
assert.Equal(uint32(1), h.Item.Version)
h, err = hClient.Update(tc.Context(), h.Item.Id, h.Item.Version, hostsets.WithName("bar"))
checkHost(t, "update", h.Item, err, "bar", 2)
require.NoError(err)
assert.Equal("bar", h.Item.Name)
assert.Equal(uint32(2), h.Item.Version)
h, err = hClient.Update(tc.Context(), h.Item.Id, h.Item.Version, hostsets.DefaultName())
checkHost(t, "update", h.Item, err, "", 3)
require.NoError(err)
assert.Equal("", h.Item.Name)
assert.Equal(uint32(3), h.Item.Version)
_, err = hClient.Delete(tc.Context(), h.Item.Id)
assert.NoError(err)
@ -224,20 +247,31 @@ func TestCrud(t *testing.T) {
h, err = hClient.Create(tc.Context(), c.Item.Id, hostsets.WithName("foo"),
hostsets.WithAttributes(map[string]interface{}{"foo": "bar"}), hostsets.WithPreferredEndpoints([]string{"dns:test"}))
checkHost(t, "create", h.Item, err, "foo", 1)
require.NoError(err)
assert.Equal("foo", h.Item.Name)
assert.Equal(uint32(1), h.Item.Version)
h, err = hClient.Read(tc.Context(), h.Item.Id)
checkHost(t, "read", h.Item, err, "foo", 1)
require.NoError(err)
assert.Equal("foo", h.Item.Name)
assert.Equal(uint32(1), h.Item.Version)
h, err = hClient.Update(tc.Context(), h.Item.Id, h.Item.Version, hostsets.WithName("bar"),
h, retried := retryableUpdate(hClient, h.Item.Id, h.Item.Version, hostsets.WithName("bar"),
hostsets.WithAttributes(map[string]interface{}{"foo": nil, "key": "val"}),
hostsets.WithPreferredEndpoints([]string{"dns:update"}))
checkHost(t, "update", h.Item, err, "bar", 2)
require.NoError(err)
assert.Equal("bar", h.Item.Name)
switch retried {
case true:
assert.Equal(uint32(3), h.Item.Version)
default:
assert.Equal(uint32(2), h.Item.Version)
}
assert.Equal(h.Item.Attributes, map[string]interface{}{"key": "val"})
assert.Equal(h.Item.PreferredEndpoints, []string{"dns:update"})
h, err = hClient.Update(tc.Context(), h.Item.Id, h.Item.Version, hostsets.WithSyncIntervalSeconds(42))
h, _ = retryableUpdate(hClient, h.Item.Id, h.Item.Version, hostsets.WithSyncIntervalSeconds(42))
require.NoError(err)
require.NotNil(h)
assert.Equal(int32(42), h.Item.SyncIntervalSeconds)

Loading…
Cancel
Save