From 9bc5e492f698ab242dbbcdfa76ed3f93ff2b65f7 Mon Sep 17 00:00:00 2001 From: Damian Debkowski Date: Wed, 17 Jul 2024 01:07:44 -0700 Subject: [PATCH] feat(sbc): add upsert worker storage bucket job --- internal/daemon/controller/controller.go | 4 +- internal/server/job/jobs.go | 24 ++++- internal/server/job/rotate_roots_job_test.go | 92 ------------------ .../job/upsert_worker_storage_bucket_job.go | 56 +++++++++++ internal/server/query.go | 10 ++ ..._worker_storage_bucket_credential_state.go | 70 ++++++++++++++ internal/server/worker_storage_bucket.go | 96 +++++++++++++++++++ internal/storage/plugin/repository.go | 19 ++-- internal/storage/plugin/repository_test.go | 4 +- .../storage_bucket_credential_state.go | 33 ------- 10 files changed, 272 insertions(+), 136 deletions(-) create mode 100644 internal/server/job/upsert_worker_storage_bucket_job.go create mode 100644 internal/server/repository_worker_storage_bucket_credential_state.go create mode 100644 internal/server/worker_storage_bucket.go delete mode 100644 internal/storage/storagebucketcredential/storage_bucket_credential_state.go diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index c15e9f2603..5dc3b3fb16 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -410,7 +410,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { return plugin.NewRepository(ctx, dbase, dbase, c.kms) } c.PluginStorageBucketRepoFn = func() (*pluginstorage.Repository, error) { - return pluginstorage.NewRepository(ctx, dbase, dbase, c.kms) + return pluginstorage.NewRepository(ctx, dbase, dbase, c.kms, c.scheduler) } c.AuthTokenRepoFn = func() (*authtoken.Repository, error) { return authtoken.NewRepository(ctx, dbase, dbase, c.kms, @@ -618,7 +618,7 @@ func (c *Controller) registerJobs() error { serversjob.WithRotationFrequency(c.conf.TestOverrideWorkerAuthCaCertificateLifetime/2), ) } - if err := serversjob.RegisterJobs(c.baseContext, c.scheduler, rw, rw, c.kms, serverJobOpts...); err != nil { + if err := serversjob.RegisterJobs(c.baseContext, c.scheduler, rw, rw, c.kms, c.ControllerExtension, c.workerStatusGracePeriod, serverJobOpts...); err != nil { return err } if err := kmsjob.RegisterJobs(c.baseContext, c.scheduler, c.kms); err != nil { diff --git a/internal/server/job/jobs.go b/internal/server/job/jobs.go index c5c0720171..8815076a60 100644 --- a/internal/server/job/jobs.go +++ b/internal/server/job/jobs.go @@ -6,15 +6,26 @@ package servers import ( "context" "reflect" + "sync/atomic" "github.com/hashicorp/boundary/internal/db" "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/globals" "github.com/hashicorp/boundary/internal/kms" "github.com/hashicorp/boundary/internal/scheduler" ) // RegisterJobs registers the rotate roots job with the provided scheduler. -func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, r db.Reader, w db.Writer, kms *kms.Kms, opt ...Option) error { +func RegisterJobs( + ctx context.Context, + scheduler *scheduler.Scheduler, + r db.Reader, + w db.Writer, + kms *kms.Kms, + controllerExt globals.ControllerExtension, + workerStatusGracePeriod *atomic.Int64, + opt ...Option, +) error { const op = "server.(Jobs).RegisterJobs" if isNil(scheduler) { @@ -29,6 +40,9 @@ func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, r db.Read if kms == nil { return errors.New(ctx, errors.InvalidParameter, op, "missing kms") } + if isNil(workerStatusGracePeriod) { + return errors.New(ctx, errors.InvalidParameter, op, "missing worker status grace period") + } rotateRootsJob, err := newRotateRootsJob(ctx, r, w, kms, opt...) if err != nil { @@ -38,6 +52,14 @@ func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, r db.Read return errors.Wrap(ctx, err, op) } + usbJob, err := NewUpsertWorkerStorageBucketJobFn(ctx, r, w, kms, controllerExt, workerStatusGracePeriod, scheduler) + if err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg("error creating upsert worker storage bucket job")) + } + if err := scheduler.RegisterJob(ctx, usbJob); err != nil { + return errors.Wrap(ctx, err, op) + } + return nil } diff --git a/internal/server/job/rotate_roots_job_test.go b/internal/server/job/rotate_roots_job_test.go index fb61ca9d83..cbf2154078 100644 --- a/internal/server/job/rotate_roots_job_test.go +++ b/internal/server/job/rotate_roots_job_test.go @@ -12,7 +12,6 @@ import ( "github.com/hashicorp/boundary/internal/db" "github.com/hashicorp/boundary/internal/errors" "github.com/hashicorp/boundary/internal/kms" - "github.com/hashicorp/boundary/internal/scheduler" "github.com/hashicorp/boundary/internal/server" "github.com/hashicorp/boundary/internal/types/scope" "github.com/hashicorp/nodeenrollment/types" @@ -131,94 +130,3 @@ func TestRotateRootsJobFailure(t *testing.T) { err = got.Run(ctx) require.Error(err) } - -func TestRegisterRotateRootsJob(t *testing.T) { - require, assert := require.New(t), assert.New(t) - ctx := context.Background() - wrapper := db.TestWrapper(t) - conn, _ := db.TestSetup(t, "postgres") - rw := db.New(conn) - kmsCache := kms.TestKms(t, conn, wrapper) - err := kmsCache.CreateKeys(context.Background(), scope.Global.String(), kms.WithRandomReader(rand.Reader)) - require.NoError(err) - - sched := scheduler.TestScheduler(t, conn, wrapper) - - type args struct { - s *scheduler.Scheduler - w db.Writer - r db.Reader - kms *kms.Kms - } - tests := []struct { - name string - args args - options []server.Option - wantLimit int - wantErr bool - wantErrCode errors.Code - }{ - { - name: "nil scheduler", - args: args{ - w: rw, - r: rw, - kms: kmsCache, - }, - wantErr: true, - wantErrCode: errors.InvalidParameter, - }, - { - name: "nil writer", - args: args{ - s: sched, - r: rw, - kms: kmsCache, - }, - wantErr: true, - wantErrCode: errors.InvalidParameter, - }, - { - name: "nil reader", - args: args{ - s: sched, - w: rw, - kms: kmsCache, - }, - wantErr: true, - wantErrCode: errors.InvalidParameter, - }, - { - name: "nil kms", - args: args{ - s: sched, - w: rw, - r: rw, - }, - wantErr: true, - wantErrCode: errors.InvalidParameter, - }, - { - name: "valid", - args: args{ - s: sched, - w: rw, - r: rw, - kms: kmsCache, - }, - wantLimit: db.DefaultLimit, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := RegisterJobs(ctx, tt.args.s, tt.args.r, tt.args.w, tt.args.kms) - if tt.wantErr { - require.Error(err) - assert.Truef(errors.Match(errors.T(tt.wantErrCode), err), "Unexpected error %s", err) - return - } - require.NoError(err) - }) - } -} diff --git a/internal/server/job/upsert_worker_storage_bucket_job.go b/internal/server/job/upsert_worker_storage_bucket_job.go new file mode 100644 index 0000000000..efc8a293f7 --- /dev/null +++ b/internal/server/job/upsert_worker_storage_bucket_job.go @@ -0,0 +1,56 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package servers + +import ( + "context" + "sync/atomic" + "time" + + "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/globals" + "github.com/hashicorp/boundary/internal/kms" + "github.com/hashicorp/boundary/internal/scheduler" + "github.com/hashicorp/boundary/internal/server" +) + +var NewUpsertWorkerStorageBucketJobFn = newUpsertWorkerStorageBucketJob + +type upsertWorkerStorageBucketJob struct{} + +func newUpsertWorkerStorageBucketJob(_ context.Context, + _ db.Reader, + _ db.Writer, + _ *kms.Kms, + _ globals.ControllerExtension, + _ *atomic.Int64, + _ *scheduler.Scheduler, +) (scheduler.Job, error) { + return &upsertWorkerStorageBucketJob{}, nil +} + +// Status reports the job’s current status. +func (usb *upsertWorkerStorageBucketJob) Status() scheduler.JobStatus { return scheduler.JobStatus{} } + +// Run performs the required work depending on the implementation. +// The context is used to notify the job that it should exit early. +func (usb *upsertWorkerStorageBucketJob) Run(_ context.Context) error { return nil } + +// NextRunIn returns the duration until the next job run should be scheduled. +// Upsert Worker Storage Bucket will run every 24 hours unless we know there are +// more to storage buckets to upserted, then sooner +func (usb *upsertWorkerStorageBucketJob) NextRunIn(_ context.Context) (time.Duration, error) { + return 24 * time.Hour, nil +} + +// Name is the unique name of the job. +func (usb *upsertWorkerStorageBucketJob) Name() string { + return server.UpsertWorkerStorageBucketJobName +} + +// Description is the human-readable description of the job. +func (usb *upsertWorkerStorageBucketJob) Description() string { + return "Upserts storage buckets of workers that are out of date with the latest storage bucket version. " + + "This ensures active workers that are using outdated storage buckets are updated to the latest version." +} diff --git a/internal/server/query.go b/internal/server/query.go index 05a3acc276..d78953b0e5 100644 --- a/internal/server/query.go +++ b/internal/server/query.go @@ -4,6 +4,16 @@ package server const ( + getStorageBucketCredentialStatesByWorkerId = ` + select spsb.public_id as storage_bucket_id, + wsbcs.permission_type, wsbcs.state, + wsbcs.checked_at, wsbcs.error_details + from worker_storage_bucket_credential_state wsbcs + join storage_plugin_storage_bucket spsb + on spsb.storage_bucket_credential_id = wsbcs.storage_bucket_credential_id + where wsbcs.worker_id = @worker_id; + ` + deleteWhereCreateTimeSql = `create_time < ?` deleteTagsByWorkerIdSql = ` diff --git a/internal/server/repository_worker_storage_bucket_credential_state.go b/internal/server/repository_worker_storage_bucket_credential_state.go new file mode 100644 index 0000000000..cf14e4fe4c --- /dev/null +++ b/internal/server/repository_worker_storage_bucket_credential_state.go @@ -0,0 +1,70 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package server + +import ( + "context" + "database/sql" + + "github.com/hashicorp/boundary/internal/db/timestamp" + "github.com/hashicorp/boundary/internal/errors" + plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin" +) + +// ListWorkerStorageBucketCredentialState returns a list of storage bucket credential states for the given worker. +func (r *Repository) ListWorkerStorageBucketCredentialState(ctx context.Context, workerId string) (map[string]*plgpb.StorageBucketCredentialState, error) { + const op = "server.(Repository).ListWorkerStorageBucketCredentialState" + if workerId == "" { + return nil, errors.New(ctx, errors.InvalidParameter, op, "empty worker id") + } + type remoteStorageState struct { + StorageBucketId string + PermissionType string + State string + CheckedAt *timestamp.Timestamp + ErrorDetails string + } + rows, err := r.reader.Query(ctx, getStorageBucketCredentialStatesByWorkerId, []any{sql.Named("worker_id", workerId)}) + if err != nil && !errors.Match(errors.T(errors.RecordNotFound), err) { + return nil, errors.Wrap(ctx, err, op) + } + defer rows.Close() + remoteStorageStates := map[string]*plgpb.StorageBucketCredentialState{} + for rows.Next() { + if err := rows.Err(); err != nil { + return nil, errors.Wrap(ctx, err, op) + } + var row remoteStorageState + if err := r.reader.ScanRows(ctx, rows, &row); err != nil { + return nil, errors.Wrap(ctx, err, op, errors.WithMsg("failed to fetch remote storage state")) + } + s, ok := remoteStorageStates[row.StorageBucketId] + if !ok { + s = &plgpb.StorageBucketCredentialState{ + State: &plgpb.Permissions{}, + } + } + state, err := ParseStateType(row.State) + if err != nil { + return nil, errors.Wrap(ctx, err, op) + } + permissionState := &plgpb.Permission{ + State: state, + ErrorDetails: row.ErrorDetails, + CheckedAt: row.CheckedAt.GetTimestamp(), + } + switch row.PermissionType { + case PermissionTypeWrite.String(): + s.State.Write = permissionState + case PermissionTypeRead.String(): + s.State.Read = permissionState + case PermissionTypeDelete.String(): + s.State.Delete = permissionState + default: + return nil, errors.New(ctx, errors.Internal, op, "unknown permission type") + } + remoteStorageStates[row.StorageBucketId] = s + } + return remoteStorageStates, nil +} diff --git a/internal/server/worker_storage_bucket.go b/internal/server/worker_storage_bucket.go new file mode 100644 index 0000000000..51064aaeea --- /dev/null +++ b/internal/server/worker_storage_bucket.go @@ -0,0 +1,96 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package server + +import ( + "context" + + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/storage/storagebucketcredential" + "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/plugins" + "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/storagebuckets" + wrapping "github.com/hashicorp/go-kms-wrapping/v2" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +const UpsertWorkerStorageBucketJobName = "upsert_worker_storage_bucket" + +type UpdateStorageBucketCredential struct { + StorageBucketId string `gorm:"primary_key"` + Version int32 + CtSecrets []byte + KeyId string + StorageBucketScopeId string + StorageBucketName string + StorageBucketDescription string + StorageBucketBucketName string + StorageBucketBucketPrefix string + StorageBucketWorkerFilter string + StorageBucketAttributes []byte + PluginId string + PluginName string + PluginDescription string +} + +// TableName returns the table name for gorm +func (owsbc *UpdateStorageBucketCredential) TableName() string { + return "update_worker_storage_bucket_credential" +} + +// ToPluginStorageBucket re-formats an storage bucket into the proto used for storage plugin requests +func ToPluginStorageBucket(ctx context.Context, usb *UpdateStorageBucketCredential, wrapper wrapping.Wrapper) (*storagebuckets.StorageBucket, error) { + const op = "server.ToPluginStorageBucket" + switch { + case usb == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "nil update storage bucket credential") + } + + sb := &storagebuckets.StorageBucket{ + Id: usb.StorageBucketId, + ScopeId: usb.StorageBucketScopeId, + PluginId: usb.PluginId, + Name: wrapperspb.String(usb.StorageBucketBucketName), + Description: wrapperspb.String(usb.StorageBucketDescription), + BucketName: usb.StorageBucketBucketName, + BucketPrefix: usb.StorageBucketBucketPrefix, + WorkerFilter: usb.StorageBucketWorkerFilter, + Plugin: &plugins.PluginInfo{ + Id: usb.PluginId, + Name: usb.PluginName, + Description: usb.PluginDescription, + }, + } + if usb.StorageBucketAttributes != nil { + attrs := &structpb.Struct{} + if err := proto.Unmarshal(usb.StorageBucketAttributes, attrs); err != nil { + return nil, errors.Wrap(ctx, err, op, errors.WithMsg("unable to unmarshal attributes")) + } + sb.Attributes = attrs + } + if usb.CtSecrets != nil { + allocFn, ok := storagebucketcredential.SubtypeRegistry.AllocFunc(storagebucketcredential.ManagedSecretSubtype) + if !ok { + return nil, errors.New(ctx, errors.InvalidParameter, op, "unable to allocate storage bucket credential") + } + sbc := allocFn() + + sbc.SetKeyId(usb.KeyId) + sbc.SetStorageBucketId(usb.StorageBucketId) + sbc.SetCtSecrets(usb.CtSecrets) + + if sbc.Decrypt(ctx, wrapper) != nil { + return nil, errors.New(ctx, errors.Decrypt, op, "error decrypting secrets") + } + + secrets := &structpb.Struct{} + if err := proto.Unmarshal(sbc.GetSecrets(), secrets); err != nil { + return nil, errors.Wrap(ctx, err, op, errors.WithMsg("unable to unmarshal secrets")) + } + + sb.Secrets = secrets + } + return sb, nil +} diff --git a/internal/storage/plugin/repository.go b/internal/storage/plugin/repository.go index c20ab8c720..854b58ff94 100644 --- a/internal/storage/plugin/repository.go +++ b/internal/storage/plugin/repository.go @@ -9,17 +9,19 @@ import ( "github.com/hashicorp/boundary/internal/db" "github.com/hashicorp/boundary/internal/errors" "github.com/hashicorp/boundary/internal/kms" + "github.com/hashicorp/boundary/internal/scheduler" ) type Repository struct { - reader db.Reader - writer db.Writer - kms *kms.Kms + reader db.Reader + writer db.Writer + kms *kms.Kms + scheduler *scheduler.Scheduler } // NewRepository creates a new Repository. The returned repository is not // safe for concurrent go routines to access it -func NewRepository(ctx context.Context, r db.Reader, w db.Writer, kms *kms.Kms) (*Repository, error) { +func NewRepository(ctx context.Context, r db.Reader, w db.Writer, kms *kms.Kms, scheduler *scheduler.Scheduler) (*Repository, error) { const op = "plugin.NewRepository" switch { case r == nil: @@ -28,11 +30,14 @@ func NewRepository(ctx context.Context, r db.Reader, w db.Writer, kms *kms.Kms) return nil, errors.New(ctx, errors.InvalidParameter, op, "nil db.Writer") case kms == nil: return nil, errors.New(ctx, errors.InvalidParameter, op, "nil kms") + case scheduler == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "scheduler") } return &Repository{ - reader: r, - writer: w, - kms: kms, + reader: r, + writer: w, + kms: kms, + scheduler: scheduler, }, nil } diff --git a/internal/storage/plugin/repository_test.go b/internal/storage/plugin/repository_test.go index 3e32730ce6..ee0a69985d 100644 --- a/internal/storage/plugin/repository_test.go +++ b/internal/storage/plugin/repository_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/boundary/internal/db" "github.com/hashicorp/boundary/internal/errors" "github.com/hashicorp/boundary/internal/kms" + "github.com/hashicorp/boundary/internal/scheduler" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -19,6 +20,7 @@ func TestRepository_NewRepository(t *testing.T) { rw := db.New(conn) wrapper := db.TestWrapper(t) kmsCache := kms.TestKms(t, conn, wrapper) + sche := scheduler.TestScheduler(t, conn, wrapper) tests := []struct { name string @@ -74,7 +76,7 @@ func TestRepository_NewRepository(t *testing.T) { require, assert := require.New(t), assert.New(t) ctx := context.Background() - repo, err := NewRepository(ctx, tt.in.reader, tt.in.writer, tt.in.kms) + repo, err := NewRepository(ctx, tt.in.reader, tt.in.writer, tt.in.kms, sche) if tt.wantErrContains != "" { require.ErrorContains(err, tt.wantErrContains) return diff --git a/internal/storage/storagebucketcredential/storage_bucket_credential_state.go b/internal/storage/storagebucketcredential/storage_bucket_credential_state.go deleted file mode 100644 index 7c2270fec2..0000000000 --- a/internal/storage/storagebucketcredential/storage_bucket_credential_state.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package storagebucketcredential - -import ( - "github.com/hashicorp/boundary/internal/server/store" -) - -// NewWorkerStorageBucketCredentialState returns a new WorkerStorageBucketCredentialState. -func NewWorkerStorageBucketCredentialState() *WorkerStorageBucketCredentialState { - return &WorkerStorageBucketCredentialState{ - WorkerStorageBucketCredentialState: &store.WorkerStorageBucketCredentialState{}, - } -} - -type WorkerStorageBucketCredentialState struct { - *store.WorkerStorageBucketCredentialState - tableName string `gorm:"-"` -} - -// TableName returns the table name. -func (sbc *WorkerStorageBucketCredentialState) TableName() string { - if sbc.tableName != "" { - return sbc.tableName - } - return "worker_storage_bucket_credential_state" -} - -// SetTableName sets the table name. -func (sbc *WorkerStorageBucketCredentialState) SetTableName(n string) { - sbc.tableName = n -}