From f9eab71a4a3badabf45a979ddb5a39fa24fd2863 Mon Sep 17 00:00:00 2001 From: Timothy Messier Date: Fri, 20 May 2022 19:55:45 +0000 Subject: [PATCH] feat(session): Add periodic job to delete terminated sessions --- internal/daemon/controller/controller.go | 2 +- .../session/job_delete_terminated_sessions.go | 75 +++++++++++++ .../job_delete_terminated_sessions_test.go | 106 ++++++++++++++++++ internal/session/jobs.go | 20 +++- 4 files changed, 200 insertions(+), 3 deletions(-) create mode 100644 internal/session/job_delete_terminated_sessions.go create mode 100644 internal/session/job_delete_terminated_sessions_test.go diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index c89386c92a..6c651a21f1 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -353,7 +353,7 @@ func (c *Controller) registerJobs() error { if err := pluginhost.RegisterJobs(c.baseContext, c.scheduler, rw, rw, c.kms, c.conf.HostPlugins); err != nil { return err } - if err := session.RegisterJobs(c.baseContext, c.scheduler, rw, c.conf.StatusGracePeriodDuration); err != nil { + if err := session.RegisterJobs(c.baseContext, c.scheduler, rw, rw, c.kms, c.conf.StatusGracePeriodDuration); err != nil { return err } diff --git a/internal/session/job_delete_terminated_sessions.go b/internal/session/job_delete_terminated_sessions.go new file mode 100644 index 0000000000..4318a9e12a --- /dev/null +++ b/internal/session/job_delete_terminated_sessions.go @@ -0,0 +1,75 @@ +package session + +import ( + "context" + "time" + + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/scheduler" +) + +type deleteTerminatedJob struct { + repo *Repository + + // the amount of time that a session must be in the terminated + // state for it to be deleted. + threshold time.Duration + + // the number of sessions deleted in the most recent run + deletedInRun int +} + +func newDeleteTerminatedJob(ctx context.Context, repo *Repository, threshold time.Duration) (*deleteTerminatedJob, error) { + const op = "session.newDeleteTerminatedJob" + switch { + case repo == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing repository") + } + + return &deleteTerminatedJob{ + repo: repo, + threshold: threshold, + }, nil +} + +// Status reports the job’s current status. The status is periodically persisted by +// the scheduler when a job is running, and will be used to verify a job is making progress. +func (d *deleteTerminatedJob) Status() scheduler.JobStatus { + return scheduler.JobStatus{ + Completed: d.deletedInRun, + Total: d.deletedInRun, + } +} + +// Run performs the required work depending on the implementation. +// The context is used to notify the job that it should exit early. +func (d *deleteTerminatedJob) Run(ctx context.Context) error { + const op = "session.(deleteTerminatedJob).Run" + d.deletedInRun = 0 + var err error + + d.deletedInRun, err = d.repo.deleteSessionsTerminatedBefore(ctx, d.threshold) + if err != nil { + return errors.Wrap(ctx, err, op) + } + return nil +} + +// NextRunIn returns the duration until the next job run should be scheduled. This +// method is invoked after a run has successfully completed and the next run time +// is being persisted by the scheduler. If an error is returned, the error will be logged +// but the duration returned will still be used in scheduling. If a zero duration is returned +// the job will be scheduled to run again immediately. +func (d *deleteTerminatedJob) NextRunIn(_ context.Context) (time.Duration, error) { + return time.Minute * 30, nil +} + +// Name is the unique name of the job. +func (d *deleteTerminatedJob) Name() string { + return "delete_terminated_sessions" +} + +// Description is the human readable description of the job. +func (d *deleteTerminatedJob) Description() string { + return "Delete terminated sessions that were terminated " +} diff --git a/internal/session/job_delete_terminated_sessions_test.go b/internal/session/job_delete_terminated_sessions_test.go new file mode 100644 index 0000000000..40fc58d922 --- /dev/null +++ b/internal/session/job_delete_terminated_sessions_test.go @@ -0,0 +1,106 @@ +package session + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/iam" + "github.com/hashicorp/boundary/internal/kms" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDeleteTermiantedSessionsJob(t *testing.T) { + ctx := context.Background() + conn, _ := db.TestSetup(t, "postgres") + rw := db.New(conn) + wrapper := db.TestWrapper(t) + iamRepo := iam.TestRepo(t, conn, wrapper) + kms := kms.TestKms(t, conn, wrapper) + repo, err := NewRepository(rw, rw, kms) + require.NoError(t, err) + composedOf := TestSessionParams(t, conn, wrapper, iamRepo) + require.NoError(t, err) + + cases := []struct { + sessionCount int + terminateCount int + threshold time.Duration + expected int + }{ + { + 0, + 0, + time.Nanosecond, + 0, + }, + { + 1, + 1, + time.Nanosecond, + 1, + }, + { + 1, + 1, + time.Hour, + 0, + }, + { + 10, + 10, + time.Nanosecond, + 10, + }, + { + 10, + 4, + time.Nanosecond, + 4, + }, + { + 10, + 0, + time.Nanosecond, + 0, + }, + { + 10, + 10, + time.Hour, + 0, + }, + } + + for _, tc := range cases { + t.Run(fmt.Sprintf("%d_%d_%s", tc.sessionCount, tc.terminateCount, tc.threshold), func(t *testing.T) { + t.Cleanup(func() { + sdb, err := conn.SqlDB(ctx) + require.NoError(t, err) + _, err = sdb.Exec(`delete from session;`) + require.NoError(t, err) + }) + + for i := 0; i < tc.sessionCount; i++ { + s := TestSession(t, conn, wrapper, composedOf) + if i < tc.terminateCount { + _, err = repo.CancelSession(ctx, s.PublicId, s.Version) + require.NoError(t, err) + } + + } + c, err := repo.TerminateCompletedSessions(ctx) + require.NoError(t, err) + assert.Equal(t, tc.terminateCount, c) + + job, err := newDeleteTerminatedJob(ctx, repo, tc.threshold) + require.NoError(t, err) + err = job.Run(ctx) + require.NoError(t, err) + assert.Equal(t, tc.expected, job.deletedInRun) + }) + } +} diff --git a/internal/session/jobs.go b/internal/session/jobs.go index f99d7ed06f..70f1e113b2 100644 --- a/internal/session/jobs.go +++ b/internal/session/jobs.go @@ -6,12 +6,16 @@ import ( "time" "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/kms" "github.com/hashicorp/boundary/internal/scheduler" ) -// RegisterJobs registers plugin host related jobs with the provided scheduler. -func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, gracePeriod time.Duration) error { +const deleteTerminatedThreshold = time.Hour + +// RegisterJobs registers session related jobs with the provided scheduler. +func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, gracePeriod time.Duration) error { const op = "session.RegisterJobs" + sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(w, gracePeriod) if err != nil { return fmt.Errorf("error creating session cleanup job: %w", err) @@ -20,5 +24,17 @@ func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writ return fmt.Errorf("error registering session cleanup job: %w", err) } + repo, err := NewRepository(r, w, k) + if err != nil { + return fmt.Errorf("error creating repository: %w", err) + } + deleteTerminatedJob, err := newDeleteTerminatedJob(ctx, repo, deleteTerminatedThreshold) + if err != nil { + return fmt.Errorf("error creating delete terminated session job: %w", err) + } + if err = scheduler.RegisterJob(ctx, deleteTerminatedJob); err != nil { + return fmt.Errorf("error registering delete terminated session job: %w", err) + } + return nil }