From 7c996d0e0256fa183d09a8a63a9e32e9ad2f0e18 Mon Sep 17 00:00:00 2001 From: Johan Brandhorst-Satzkorn Date: Mon, 23 Oct 2023 15:27:29 -0700 Subject: [PATCH] internal/pagination: add new purge package The purge package contains a job that is used to purge records from the foo_deleted tables when they become older than 30 days. --- internal/daemon/controller/controller.go | 4 + internal/pagination/purge/purge.go | 53 ++++++++++ internal/pagination/purge/purge_job.go | 69 +++++++++++++ internal/pagination/purge/purge_test.go | 126 +++++++++++++++++++++++ internal/pagination/purge/query.go | 13 +++ 5 files changed, 265 insertions(+) create mode 100644 internal/pagination/purge/purge.go create mode 100644 internal/pagination/purge/purge_job.go create mode 100644 internal/pagination/purge/purge_test.go create mode 100644 internal/pagination/purge/query.go diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index e38dc889b3..9145f11a99 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -33,6 +33,7 @@ import ( "github.com/hashicorp/boundary/internal/iam" "github.com/hashicorp/boundary/internal/kms" kmsjob "github.com/hashicorp/boundary/internal/kms/job" + "github.com/hashicorp/boundary/internal/pagination/purge" "github.com/hashicorp/boundary/internal/plugin" "github.com/hashicorp/boundary/internal/plugin/loopback" "github.com/hashicorp/boundary/internal/ratelimit" @@ -596,6 +597,9 @@ func (c *Controller) registerJobs() error { if err := census.RegisterJob(c.baseContext, c.scheduler, c.conf.RawConfig.Reporting.License.Enabled, rw, rw); err != nil { return err } + if err := purge.RegisterJobs(c.baseContext, c.scheduler, rw, rw); err != nil { + return err + } return nil } diff --git a/internal/pagination/purge/purge.go b/internal/pagination/purge/purge.go new file mode 100644 index 0000000000..d02819d86a --- /dev/null +++ b/internal/pagination/purge/purge.go @@ -0,0 +1,53 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +// Package purge implements a scheduler job used to purge old +// rows from the deleted IDs tables kept for pagination purposes. +package purge + +import ( + "context" + "fmt" + + "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/scheduler" + "github.com/hashicorp/boundary/internal/util" +) + +// RegisterJobs registers the purge job for each deletion table with the provided scheduler. +func RegisterJobs(ctx context.Context, s *scheduler.Scheduler, r db.Reader, w db.Writer) error { + const op = "purge.RegisterJobs" + if s == nil { + return errors.New(ctx, errors.InvalidParameter, "nil scheduler", op, errors.WithoutEvent()) + } + if util.IsNil(r) { + return errors.New(ctx, errors.Internal, "nil DB reader", op, errors.WithoutEvent()) + } + if util.IsNil(w) { + return errors.New(ctx, errors.Internal, "nil DB writer", op, errors.WithoutEvent()) + } + + rows, err := r.Query(ctx, getDeletionTablesQuery, nil) + if err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg("unable to query for deletion tables")) + } + defer rows.Close() + + for rows.Next() { + var table string + err = rows.Scan(&table) + if err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for deletion tables")) + } + + purgeJob, err := newPurgeJob(ctx, w, table) + if err != nil { + return fmt.Errorf("error creating purge job: %w", err) + } + if err := s.RegisterJob(ctx, purgeJob); err != nil { + return errors.Wrap(ctx, err, op) + } + } + return nil +} diff --git a/internal/pagination/purge/purge_job.go b/internal/pagination/purge/purge_job.go new file mode 100644 index 0000000000..89e3e7e328 --- /dev/null +++ b/internal/pagination/purge/purge_job.go @@ -0,0 +1,69 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package purge + +import ( + "context" + "fmt" + "time" + + "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/scheduler" +) + +type purgeJob struct { + w db.Writer + table string + query string +} + +func newPurgeJob(ctx context.Context, w db.Writer, table string) (*purgeJob, error) { + const op = "purgeJob.newPurgeJob" + switch { + case w == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db.Writer") + case table == "": + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing table") + } + + query := fmt.Sprintf(deleteQueryTemplate, table) + return &purgeJob{ + w: w, + table: table, + query: query, + }, nil +} + +// Status reports the job’s current status. +func (c *purgeJob) Status() scheduler.JobStatus { + return scheduler.JobStatus{} +} + +// Run performs the required work depending on the implementation. +// The context is used to notify the job that it should exit early. +func (c *purgeJob) Run(ctx context.Context) error { + const op = "purge.(purgeJob).Run" + _, err := c.w.Exec(ctx, c.query, nil) + if err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("unable to prune table %s", c.query))) + } + return nil +} + +// NextRunIn returns the duration until the next job run should be scheduled. +// This job runs about once a day. +func (c *purgeJob) NextRunIn(_ context.Context) (time.Duration, error) { + return 24 * time.Hour, nil +} + +// Name is the unique name of the job. +func (c *purgeJob) Name() string { + return fmt.Sprintf("%s_items_table_purge", c.table) +} + +// Description is the human-readable description of the job. +func (c *purgeJob) Description() string { + return "Automatically deletes rows from the deleted items DB tables after 30 days, to ensure the tables don’t build up forever" +} diff --git a/internal/pagination/purge/purge_test.go b/internal/pagination/purge/purge_test.go new file mode 100644 index 0000000000..481779c07a --- /dev/null +++ b/internal/pagination/purge/purge_test.go @@ -0,0 +1,126 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package purge + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPurgeTables(t *testing.T) { + ctx := context.Background() + + conn, _ := db.TestSetup(t, "postgres") + rw := db.New(conn) + + db, err := conn.SqlDB(ctx) + if err != nil { + t.Errorf("error getting db connection %s", err) + } + + rows, err := db.Query("select get_deletion_tables()") + if err != nil { + t.Errorf("unable to query for deletion tables %s", err) + } + defer rows.Close() + + for rows.Next() { + var table string + err = rows.Scan(&table) + if err != nil { + t.Errorf("unable to scan rows for deletion tables %s", err) + } + _, err = db.Exec(fmt.Sprintf("insert into %s (public_id, delete_time) values ('p1234567890', $1)", table), time.Now()) + if err != nil { + t.Errorf("error updating %s %s", table, err) + } + _, err = db.Exec(fmt.Sprintf("insert into %s (public_id, delete_time) values ('p9876543210', $1)", table), time.Now().AddDate(0, -2, 0)) + if err != nil { + t.Errorf("error updating %s %s", table, err) + } + + query := fmt.Sprintf("delete from %s where delete_time < now() - interval '30 days'", table) + sJob := purgeJob{ + w: rw, + table: table, + query: query, + } + + err = sJob.Run(ctx) + require.NoError(t, err) + + var count int + err = db.QueryRowContext(ctx, fmt.Sprintf("select count(public_id) from %s", table)).Scan(&count) + if err != nil { + t.Errorf("error checking %s table %s", table, err) + } + require.Equal(t, 1, count) + } +} + +func TestNewPurgeJob(t *testing.T) { + ctx := context.Background() + + conn, _ := db.TestSetup(t, "postgres") + rw := db.New(conn) + + type args struct { + w db.Writer + table string + } + + tests := []struct { + name string + args args + wantIsErr errors.Code + wantErrMsg string + }{ + { + name: "valid", + args: args{ + w: rw, + table: "valid-table", + }, + }, + { + name: "nil-writer", + args: args{ + w: nil, + table: "valid-table", + }, + wantIsErr: errors.InvalidParameter, + wantErrMsg: "purgeJob.newPurgeJob: missing db.Writer: parameter violation: error #100", + }, + { + name: "no table", + args: args{ + w: rw, + table: "", + }, + wantIsErr: errors.InvalidParameter, + wantErrMsg: "purgeJob.newPurgeJob: missing table: parameter violation: error #100", + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + assert, require := assert.New(t), require.New(t) + got, err := newPurgeJob(ctx, tt.args.w, tt.args.table) + if tt.wantIsErr != 0 { + assert.Truef(errors.Match(errors.T(tt.wantIsErr), err), "Unexpected error %s", err) + assert.Equal(tt.wantErrMsg, err.Error()) + return + } + assert.NoError(err) + require.NotNil(got) + }) + } +} diff --git a/internal/pagination/purge/query.go b/internal/pagination/purge/query.go new file mode 100644 index 0000000000..966016ccd1 --- /dev/null +++ b/internal/pagination/purge/query.go @@ -0,0 +1,13 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package purge + +const ( + getDeletionTablesQuery = ` +select get_deletion_tables(); +` + deleteQueryTemplate = ` +delete from %s where delete_time < now() - interval '30 days' +` +)