From fbf56793c61f320eb66f21624ff7a12866d410e2 Mon Sep 17 00:00:00 2001 From: Michael Milton Date: Tue, 6 Jun 2023 14:26:01 -0400 Subject: [PATCH] Add snapshot job (#3282) * snapshot job * update census to include context, controller to include snapshot registry * no need to export runFn in snapshot --- internal/census/census_job.go | 18 ++++--- internal/daemon/controller/controller.go | 4 ++ internal/snapshot/snapshot.go | 38 +++++++++++++ internal/snapshot/snapshot_job.go | 69 ++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 8 deletions(-) create mode 100644 internal/snapshot/snapshot.go create mode 100644 internal/snapshot/snapshot_job.go diff --git a/internal/census/census_job.go b/internal/census/census_job.go index edaae70a61..55f62b1a79 100644 --- a/internal/census/census_job.go +++ b/internal/census/census_job.go @@ -18,10 +18,11 @@ var ( ) type censusJob struct { - r db.Reader - w db.Writer - optOut bool - agent any + r db.Reader + w db.Writer + optOut bool + agent any + eventCtx context.Context } func newCensusJob(ctx context.Context, optOut bool, r db.Reader, w db.Writer) (*censusJob, error) { @@ -34,10 +35,11 @@ func newCensusJob(ctx context.Context, optOut bool, r db.Reader, w db.Writer) (* } return &censusJob{ - r: r, - w: w, - optOut: optOut, - agent: nil, + r: r, + w: w, + optOut: optOut, + agent: nil, + eventCtx: ctx, }, nil } diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index a085aae888..7e6b2d09ae 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -41,6 +41,7 @@ import ( "github.com/hashicorp/boundary/internal/server" serversjob "github.com/hashicorp/boundary/internal/server/job" "github.com/hashicorp/boundary/internal/session" + "github.com/hashicorp/boundary/internal/snapshot" pluginstorage "github.com/hashicorp/boundary/internal/storage/plugin" "github.com/hashicorp/boundary/internal/target" "github.com/hashicorp/boundary/internal/types/scope" @@ -580,6 +581,9 @@ func (c *Controller) registerJobs() error { if err := cleaner.RegisterJob(c.baseContext, c.scheduler, rw); err != nil { return err } + if err := snapshot.RegisterJob(c.baseContext, c.scheduler, rw, rw); err != nil { + return err + } if err := census.RegisterJob(c.baseContext, c.scheduler, c.conf.RawConfig.Reporting.License.Enabled, rw, rw); err != nil { return err } diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go new file mode 100644 index 0000000000..077cab857d --- /dev/null +++ b/internal/snapshot/snapshot.go @@ -0,0 +1,38 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package snapshot + +import ( + "context" + "fmt" + + "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/scheduler" + "github.com/hashicorp/boundary/internal/util" +) + +// RegisterJob registers the snapshot job with the provided scheduler. +func RegisterJob(ctx context.Context, s *scheduler.Scheduler, r db.Reader, w db.Writer) error { + const op = "snapshot.RegisterJob" + if s == nil { + return errors.New(ctx, errors.InvalidParameter, "nil scheduler", op, errors.WithoutEvent()) + } + if util.IsNil(r) { + return errors.New(ctx, errors.Internal, "nil DB reader", op, errors.WithoutEvent()) + } + if util.IsNil(w) { + return errors.New(ctx, errors.Internal, "nil DB writer", op, errors.WithoutEvent()) + } + + snapshotJob, err := newSnapshotJob(ctx, r, w) + if err != nil { + return fmt.Errorf("error creating snapshot job: %w", err) + } + if err := s.RegisterJob(ctx, snapshotJob); err != nil { + return errors.Wrap(ctx, err, op) + } + + return nil +} diff --git a/internal/snapshot/snapshot_job.go b/internal/snapshot/snapshot_job.go new file mode 100644 index 0000000000..5ec0641cd5 --- /dev/null +++ b/internal/snapshot/snapshot_job.go @@ -0,0 +1,69 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package snapshot + +import ( + "context" + "time" + + "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/scheduler" +) + +var runFn = runInternal + +type snapshotJob struct { + r db.Reader + w db.Writer +} + +func newSnapshotJob(ctx context.Context, r db.Reader, w db.Writer) (*snapshotJob, error) { + const op = "snapshotJob.newSnapshotJob" + switch { + case r == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db.Reader") + case w == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db.Writer") + } + + return &snapshotJob{ + r: r, + w: w, + }, nil +} + +// Status reports the job’s current status. +func (c *snapshotJob) Status() scheduler.JobStatus { + return scheduler.JobStatus{} +} + +// Run performs the required work depending on the implementation. +// The context is used to notify the job that it should exit early. +func (c *snapshotJob) Run(ctx context.Context) error { + const op = "snapshot.(snapshotJob).Run" + err := runFn(ctx, c) + return err +} + +func runInternal(ctx context.Context, c *snapshotJob) error { + return nil +} + +// NextRunIn returns the duration until the next job run should be scheduled. +// We report as ready immediately after a successful run. This doesn't mean that +// this job will run immediately, only about as often as the configured scheduler interval. +func (c *snapshotJob) NextRunIn(_ context.Context) (time.Duration, error) { + return 0, nil +} + +// Name is the unique name of the job. +func (c *snapshotJob) Name() string { + return "job_run_snapshot" +} + +// Description is the human-readable description of the job. +func (c *snapshotJob) Description() string { + return "Creates and stores a snapshot from sessions warehouse to snapshot table" +}