internal/pagination: add new purge package

The purge package contains a job that is used to purge
records from the foo_deleted tables when they become
older than 30 days.
pull/4202/head
Johan Brandhorst-Satzkorn 3 years ago
parent 78541ad05e
commit 7c996d0e02

@ -33,6 +33,7 @@ import (
"github.com/hashicorp/boundary/internal/iam"
"github.com/hashicorp/boundary/internal/kms"
kmsjob "github.com/hashicorp/boundary/internal/kms/job"
"github.com/hashicorp/boundary/internal/pagination/purge"
"github.com/hashicorp/boundary/internal/plugin"
"github.com/hashicorp/boundary/internal/plugin/loopback"
"github.com/hashicorp/boundary/internal/ratelimit"
@ -596,6 +597,9 @@ func (c *Controller) registerJobs() error {
if err := census.RegisterJob(c.baseContext, c.scheduler, c.conf.RawConfig.Reporting.License.Enabled, rw, rw); err != nil {
return err
}
if err := purge.RegisterJobs(c.baseContext, c.scheduler, rw, rw); err != nil {
return err
}
return nil
}

@ -0,0 +1,53 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
// Package purge implements a scheduler job used to purge old
// rows from the deleted IDs tables kept for pagination purposes.
package purge
import (
"context"
"fmt"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/scheduler"
"github.com/hashicorp/boundary/internal/util"
)
// RegisterJobs registers the purge job for each deletion table with the provided scheduler.
func RegisterJobs(ctx context.Context, s *scheduler.Scheduler, r db.Reader, w db.Writer) error {
const op = "purge.RegisterJobs"
if s == nil {
return errors.New(ctx, errors.InvalidParameter, "nil scheduler", op, errors.WithoutEvent())
}
if util.IsNil(r) {
return errors.New(ctx, errors.Internal, "nil DB reader", op, errors.WithoutEvent())
}
if util.IsNil(w) {
return errors.New(ctx, errors.Internal, "nil DB writer", op, errors.WithoutEvent())
}
rows, err := r.Query(ctx, getDeletionTablesQuery, nil)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to query for deletion tables"))
}
defer rows.Close()
for rows.Next() {
var table string
err = rows.Scan(&table)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for deletion tables"))
}
purgeJob, err := newPurgeJob(ctx, w, table)
if err != nil {
return fmt.Errorf("error creating purge job: %w", err)
}
if err := s.RegisterJob(ctx, purgeJob); err != nil {
return errors.Wrap(ctx, err, op)
}
}
return nil
}

@ -0,0 +1,69 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package purge
import (
"context"
"fmt"
"time"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/scheduler"
)
type purgeJob struct {
w db.Writer
table string
query string
}
func newPurgeJob(ctx context.Context, w db.Writer, table string) (*purgeJob, error) {
const op = "purgeJob.newPurgeJob"
switch {
case w == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db.Writer")
case table == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing table")
}
query := fmt.Sprintf(deleteQueryTemplate, table)
return &purgeJob{
w: w,
table: table,
query: query,
}, nil
}
// Status reports the jobs current status.
func (c *purgeJob) Status() scheduler.JobStatus {
return scheduler.JobStatus{}
}
// Run performs the required work depending on the implementation.
// The context is used to notify the job that it should exit early.
func (c *purgeJob) Run(ctx context.Context) error {
const op = "purge.(purgeJob).Run"
_, err := c.w.Exec(ctx, c.query, nil)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("unable to prune table %s", c.query)))
}
return nil
}
// NextRunIn returns the duration until the next job run should be scheduled.
// This job runs about once a day.
func (c *purgeJob) NextRunIn(_ context.Context) (time.Duration, error) {
return 24 * time.Hour, nil
}
// Name is the unique name of the job.
func (c *purgeJob) Name() string {
return fmt.Sprintf("%s_items_table_purge", c.table)
}
// Description is the human-readable description of the job.
func (c *purgeJob) Description() string {
return "Automatically deletes rows from the deleted items DB tables after 30 days, to ensure the tables dont build up forever"
}

@ -0,0 +1,126 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package purge
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPurgeTables(t *testing.T) {
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
db, err := conn.SqlDB(ctx)
if err != nil {
t.Errorf("error getting db connection %s", err)
}
rows, err := db.Query("select get_deletion_tables()")
if err != nil {
t.Errorf("unable to query for deletion tables %s", err)
}
defer rows.Close()
for rows.Next() {
var table string
err = rows.Scan(&table)
if err != nil {
t.Errorf("unable to scan rows for deletion tables %s", err)
}
_, err = db.Exec(fmt.Sprintf("insert into %s (public_id, delete_time) values ('p1234567890', $1)", table), time.Now())
if err != nil {
t.Errorf("error updating %s %s", table, err)
}
_, err = db.Exec(fmt.Sprintf("insert into %s (public_id, delete_time) values ('p9876543210', $1)", table), time.Now().AddDate(0, -2, 0))
if err != nil {
t.Errorf("error updating %s %s", table, err)
}
query := fmt.Sprintf("delete from %s where delete_time < now() - interval '30 days'", table)
sJob := purgeJob{
w: rw,
table: table,
query: query,
}
err = sJob.Run(ctx)
require.NoError(t, err)
var count int
err = db.QueryRowContext(ctx, fmt.Sprintf("select count(public_id) from %s", table)).Scan(&count)
if err != nil {
t.Errorf("error checking %s table %s", table, err)
}
require.Equal(t, 1, count)
}
}
func TestNewPurgeJob(t *testing.T) {
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
type args struct {
w db.Writer
table string
}
tests := []struct {
name string
args args
wantIsErr errors.Code
wantErrMsg string
}{
{
name: "valid",
args: args{
w: rw,
table: "valid-table",
},
},
{
name: "nil-writer",
args: args{
w: nil,
table: "valid-table",
},
wantIsErr: errors.InvalidParameter,
wantErrMsg: "purgeJob.newPurgeJob: missing db.Writer: parameter violation: error #100",
},
{
name: "no table",
args: args{
w: rw,
table: "",
},
wantIsErr: errors.InvalidParameter,
wantErrMsg: "purgeJob.newPurgeJob: missing table: parameter violation: error #100",
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
got, err := newPurgeJob(ctx, tt.args.w, tt.args.table)
if tt.wantIsErr != 0 {
assert.Truef(errors.Match(errors.T(tt.wantIsErr), err), "Unexpected error %s", err)
assert.Equal(tt.wantErrMsg, err.Error())
return
}
assert.NoError(err)
require.NotNil(got)
})
}
}

@ -0,0 +1,13 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package purge
const (
getDeletionTablesQuery = `
select get_deletion_tables();
`
deleteQueryTemplate = `
delete from %s where delete_time < now() - interval '30 days'
`
)
Loading…
Cancel
Save