feat(session): Add periodic job to delete terminated sessions

pull/2160/head
Timothy Messier 4 years ago
parent de2421cc25
commit f9eab71a4a
No known key found for this signature in database
GPG Key ID: EFD2F184F7600572

@ -353,7 +353,7 @@ func (c *Controller) registerJobs() error {
if err := pluginhost.RegisterJobs(c.baseContext, c.scheduler, rw, rw, c.kms, c.conf.HostPlugins); err != nil {
return err
}
if err := session.RegisterJobs(c.baseContext, c.scheduler, rw, c.conf.StatusGracePeriodDuration); err != nil {
if err := session.RegisterJobs(c.baseContext, c.scheduler, rw, rw, c.kms, c.conf.StatusGracePeriodDuration); err != nil {
return err
}

@ -0,0 +1,75 @@
package session
import (
"context"
"time"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/scheduler"
)
type deleteTerminatedJob struct {
repo *Repository
// the amount of time that a session must be in the terminated
// state for it to be deleted.
threshold time.Duration
// the number of sessions deleted in the most recent run
deletedInRun int
}
func newDeleteTerminatedJob(ctx context.Context, repo *Repository, threshold time.Duration) (*deleteTerminatedJob, error) {
const op = "session.newDeleteTerminatedJob"
switch {
case repo == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing repository")
}
return &deleteTerminatedJob{
repo: repo,
threshold: threshold,
}, nil
}
// Status reports the jobs current status. The status is periodically persisted by
// the scheduler when a job is running, and will be used to verify a job is making progress.
func (d *deleteTerminatedJob) Status() scheduler.JobStatus {
return scheduler.JobStatus{
Completed: d.deletedInRun,
Total: d.deletedInRun,
}
}
// Run performs the required work depending on the implementation.
// The context is used to notify the job that it should exit early.
func (d *deleteTerminatedJob) Run(ctx context.Context) error {
const op = "session.(deleteTerminatedJob).Run"
d.deletedInRun = 0
var err error
d.deletedInRun, err = d.repo.deleteSessionsTerminatedBefore(ctx, d.threshold)
if err != nil {
return errors.Wrap(ctx, err, op)
}
return nil
}
// NextRunIn returns the duration until the next job run should be scheduled. This
// method is invoked after a run has successfully completed and the next run time
// is being persisted by the scheduler. If an error is returned, the error will be logged
// but the duration returned will still be used in scheduling. If a zero duration is returned
// the job will be scheduled to run again immediately.
func (d *deleteTerminatedJob) NextRunIn(_ context.Context) (time.Duration, error) {
return time.Minute * 30, nil
}
// Name is the unique name of the job.
func (d *deleteTerminatedJob) Name() string {
return "delete_terminated_sessions"
}
// Description is the human readable description of the job.
func (d *deleteTerminatedJob) Description() string {
return "Delete terminated sessions that were terminated "
}

@ -0,0 +1,106 @@
package session
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/iam"
"github.com/hashicorp/boundary/internal/kms"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDeleteTermiantedSessionsJob(t *testing.T) {
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
wrapper := db.TestWrapper(t)
iamRepo := iam.TestRepo(t, conn, wrapper)
kms := kms.TestKms(t, conn, wrapper)
repo, err := NewRepository(rw, rw, kms)
require.NoError(t, err)
composedOf := TestSessionParams(t, conn, wrapper, iamRepo)
require.NoError(t, err)
cases := []struct {
sessionCount int
terminateCount int
threshold time.Duration
expected int
}{
{
0,
0,
time.Nanosecond,
0,
},
{
1,
1,
time.Nanosecond,
1,
},
{
1,
1,
time.Hour,
0,
},
{
10,
10,
time.Nanosecond,
10,
},
{
10,
4,
time.Nanosecond,
4,
},
{
10,
0,
time.Nanosecond,
0,
},
{
10,
10,
time.Hour,
0,
},
}
for _, tc := range cases {
t.Run(fmt.Sprintf("%d_%d_%s", tc.sessionCount, tc.terminateCount, tc.threshold), func(t *testing.T) {
t.Cleanup(func() {
sdb, err := conn.SqlDB(ctx)
require.NoError(t, err)
_, err = sdb.Exec(`delete from session;`)
require.NoError(t, err)
})
for i := 0; i < tc.sessionCount; i++ {
s := TestSession(t, conn, wrapper, composedOf)
if i < tc.terminateCount {
_, err = repo.CancelSession(ctx, s.PublicId, s.Version)
require.NoError(t, err)
}
}
c, err := repo.TerminateCompletedSessions(ctx)
require.NoError(t, err)
assert.Equal(t, tc.terminateCount, c)
job, err := newDeleteTerminatedJob(ctx, repo, tc.threshold)
require.NoError(t, err)
err = job.Run(ctx)
require.NoError(t, err)
assert.Equal(t, tc.expected, job.deletedInRun)
})
}
}

@ -6,12 +6,16 @@ import (
"time"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/kms"
"github.com/hashicorp/boundary/internal/scheduler"
)
// RegisterJobs registers plugin host related jobs with the provided scheduler.
func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, gracePeriod time.Duration) error {
const deleteTerminatedThreshold = time.Hour
// RegisterJobs registers session related jobs with the provided scheduler.
func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, gracePeriod time.Duration) error {
const op = "session.RegisterJobs"
sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(w, gracePeriod)
if err != nil {
return fmt.Errorf("error creating session cleanup job: %w", err)
@ -20,5 +24,17 @@ func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writ
return fmt.Errorf("error registering session cleanup job: %w", err)
}
repo, err := NewRepository(r, w, k)
if err != nil {
return fmt.Errorf("error creating repository: %w", err)
}
deleteTerminatedJob, err := newDeleteTerminatedJob(ctx, repo, deleteTerminatedThreshold)
if err != nil {
return fmt.Errorf("error creating delete terminated session job: %w", err)
}
if err = scheduler.RegisterJob(ctx, deleteTerminatedJob); err != nil {
return fmt.Errorf("error registering delete terminated session job: %w", err)
}
return nil
}

Loading…
Cancel
Save