diff --git a/internal/daemon/cluster/handlers/worker_service.go b/internal/daemon/cluster/handlers/worker_service.go index 98cfd4939e..e211922b60 100644 --- a/internal/daemon/cluster/handlers/worker_service.go +++ b/internal/daemon/cluster/handlers/worker_service.go @@ -134,13 +134,20 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques } } + if wStat.LocalStorageState == "" { + // If this is an older worker (pre 0.15), it will not have LocalStorageState as part of it's status + // so we'll default to unknown. + wStat.LocalStorageState = server.UnknownLocalStorageState.String() + } + wConf := server.NewWorker(scope.Global.String(), server.WithName(wStat.GetName()), server.WithDescription(wStat.GetDescription()), server.WithAddress(wStat.GetAddress()), server.WithWorkerTags(workerTags...), server.WithReleaseVersion(wStat.ReleaseVersion), - server.WithOperationalState(wStat.OperationalState)) + server.WithOperationalState(wStat.OperationalState), + server.WithLocalStorageState(wStat.LocalStorageState)) opts := []server.Option{server.WithUpdateTags(req.GetUpdateTags())} if wStat.GetPublicId() != "" { opts = append(opts, server.WithPublicId(wStat.GetPublicId())) diff --git a/internal/daemon/cluster/handlers/worker_service_status_test.go b/internal/daemon/cluster/handlers/worker_service_status_test.go index 08b7c05e07..8992b50336 100644 --- a/internal/daemon/cluster/handlers/worker_service_status_test.go +++ b/internal/daemon/cluster/handlers/worker_service_status_test.go @@ -1357,3 +1357,167 @@ func TestWorkerOperationalStatus(t *testing.T) { }) } } + +func TestWorkerLocalStorageStateStatus(t *testing.T) { + ctx := context.Background() + conn, _ := db.TestSetup(t, "postgres") + rw := db.New(conn) + wrapper := db.TestWrapper(t) + kms := kms.TestKms(t, conn, wrapper) + + serverRepo, _ := server.NewRepository(ctx, rw, rw, kms) + serverRepo.UpsertController(ctx, &store.Controller{ + PrivateId: "test_controller1", + Address: "127.0.0.1", + }) + serversRepoFn := func() (*server.Repository, error) { + return serverRepo, nil + } + workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) { + return server.NewRepositoryStorage(ctx, rw, rw, kms) + } + sessionRepoFn := func(opt ...session.Option) (*session.Repository, error) { + return session.NewRepository(ctx, rw, rw, kms) + } + connRepoFn := func() (*session.ConnectionRepository, error) { + return session.NewConnectionRepository(ctx, rw, rw, kms) + } + fce := &fakeControllerExtension{ + reader: rw, + writer: rw, + } + + worker1 := server.TestKmsWorker(t, conn, wrapper) + + s := NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, nil, new(sync.Map), kms, new(atomic.Int64), fce) + require.NotNil(t, s) + + cases := []struct { + name string + wantErr bool + wantErrContains string + req *pbs.StatusRequest + wantState string + }{ + { + name: "Available local storage worker", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + LocalStorageState: server.AvailableLocalStorageState.String(), + }, + }, + wantState: server.AvailableLocalStorageState.String(), + }, + { + name: "Worker in low local storage", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + LocalStorageState: server.LowStorageLocalStorageState.String(), + }, + }, + wantState: server.LowStorageLocalStorageState.String(), + }, + { + name: "Worker in critically low local storage", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + LocalStorageState: server.CriticallyLowStorageLocalStorageState.String(), + }, + }, + wantState: server.CriticallyLowStorageLocalStorageState.String(), + }, + { + name: "Worker in out of space local storage", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + LocalStorageState: server.OutOfStorageLocalStorageState.String(), + }, + }, + wantState: server.OutOfStorageLocalStorageState.String(), + }, + { + name: "Worker in not configured local storage", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + LocalStorageState: server.NotConfiguredLocalStorageState.String(), + }, + }, + wantState: server.NotConfiguredLocalStorageState.String(), + }, + { + name: "No local storage state - default to unknown", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + ReleaseVersion: "Boundary v0.11.0", + }, + }, + wantState: server.UnknownLocalStorageState.String(), + }, + { + name: "Old worker (empty release version) and no local storage state - default to active", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + }, + }, + wantState: server.UnknownLocalStorageState.String(), + }, + { + name: "Worker with invalid local storage type", + wantErr: true, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + LocalStorageState: "invalid", + }, + }, + wantErrContains: "foreign key constraint", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert, require := assert.New(t), require.New(t) + got, err := s.Status(ctx, tc.req) + if tc.wantErr { + require.Error(err) + assert.Contains(err.Error(), tc.wantErrContains) + return + } + require.NoError(err) + require.NotNil(got) + repoWorker, err := serverRepo.LookupWorkerByName(ctx, worker1.Name) + require.NoError(err) + assert.Equal(tc.wantState, repoWorker.LocalStorageState) + }) + } +} diff --git a/internal/daemon/worker/status.go b/internal/daemon/worker/status.go index 8b809fe997..d39971a8e8 100644 --- a/internal/daemon/worker/status.go +++ b/internal/daemon/worker/status.go @@ -202,13 +202,14 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess result, err := client.Status(statusCtx, &pbs.StatusRequest{ Jobs: activeJobs, WorkerStatus: &pb.ServerWorkerStatus{ - Name: w.conf.RawConfig.Worker.Name, - Description: w.conf.RawConfig.Worker.Description, - Address: w.conf.RawConfig.Worker.PublicAddr, - Tags: tags, - KeyId: keyId, - ReleaseVersion: versionInfo.FullVersionNumber(false), - OperationalState: w.operationalState.Load().(server.OperationalState).String(), + Name: w.conf.RawConfig.Worker.Name, + Description: w.conf.RawConfig.Worker.Description, + Address: w.conf.RawConfig.Worker.PublicAddr, + Tags: tags, + KeyId: keyId, + ReleaseVersion: versionInfo.FullVersionNumber(false), + OperationalState: w.operationalState.Load().(server.OperationalState).String(), + LocalStorageState: w.localStorageState.Load().(server.LocalStorageState).String(), }, ConnectedWorkerKeyIdentifiers: connectionState.AllKeyIds(), ConnectedUnmappedWorkerKeyIdentifiers: connectionState.UnmappedKeyIds(), diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index 4150ba92aa..a2b140c105 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -146,6 +146,7 @@ type Worker struct { lastStatusSuccess *atomic.Value workerStartTime time.Time operationalState *atomic.Value + localStorageState *atomic.Value upstreamConnectionState *atomic.Value controllerMultihopConn *atomic.Value @@ -221,6 +222,7 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { WorkerAuthCurrentKeyId: new(ua.String), operationalState: new(atomic.Value), downstreamConnManager: cluster.NewDownstreamManager(), + localStorageState: new(atomic.Value), successfulStatusGracePeriod: new(atomic.Int64), statusCallTimeoutDuration: new(atomic.Int64), upstreamConnectionState: new(atomic.Value), @@ -228,6 +230,7 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { } w.operationalState.Store(server.UnknownOperationalState) + w.localStorageState.Store(server.UnknownLocalStorageState) if reverseConnReceiverFactory != nil { w.downstreamReceiver = reverseConnReceiverFactory() @@ -242,6 +245,10 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { conf.RawConfig.Worker = new(config.Worker) } + if w.conf.RawConfig.Worker.RecordingStoragePath == "" { + w.localStorageState.Store(server.NotConfiguredLocalStorageState) + } + if w.conf.RawConfig.Worker.RecordingStoragePath != "" && recordingStorageFactory != nil { pluginLogger, err := event.NewHclogLogger(ctx, w.conf.Server.Eventer) if err != nil { diff --git a/internal/daemon/worker/worker_test.go b/internal/daemon/worker/worker_test.go index 4abc1c140d..c623af6008 100644 --- a/internal/daemon/worker/worker_test.go +++ b/internal/daemon/worker/worker_test.go @@ -16,7 +16,9 @@ import ( "github.com/hashicorp/boundary/internal/cmd/config" "github.com/hashicorp/boundary/internal/daemon/worker/session" "github.com/hashicorp/boundary/internal/db" + "github.com/hashicorp/boundary/internal/event" "github.com/hashicorp/boundary/internal/gen/controller/servers/services" + "github.com/hashicorp/boundary/internal/server" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-secure-stdlib/configutil/v2" "github.com/hashicorp/go-secure-stdlib/listenerutil" @@ -121,13 +123,61 @@ func TestWorkerNew(t *testing.T) { require.NotNil(t, w.nonceFn) }, }, + { + name: "worker recording storage path is not set", + in: &Config{ + Server: &base.Server{ + Listeners: []*base.ServerListener{ + {Config: &listenerutil.ListenerConfig{Purpose: []string{"proxy"}}}, + }, + Eventer: &event.Eventer{}, + }, + RawConfig: &config.Config{ + Worker: &config.Worker{}, + SharedConfig: &configutil.SharedConfig{ + DisableMlock: true, + }, + }, + }, + expErr: false, + assertions: func(t *testing.T, w *Worker) { + assert.Equal(t, w.conf.RawConfig.Worker.RecordingStoragePath, "") + assert.Equal(t, w.localStorageState.Load().(server.LocalStorageState).String(), server.NotConfiguredLocalStorageState.String()) + }, + }, + { + name: "worker recording storage path is set", + in: &Config{ + Server: &base.Server{ + Listeners: []*base.ServerListener{ + {Config: &listenerutil.ListenerConfig{Purpose: []string{"proxy"}}}, + }, + Eventer: &event.Eventer{}, + }, + RawConfig: &config.Config{ + Worker: &config.Worker{ + RecordingStoragePath: "/tmp", + }, + SharedConfig: &configutil.SharedConfig{ + DisableMlock: true, + }, + }, + }, + expErr: false, + assertions: func(t *testing.T, w *Worker) { + assert.Equal(t, w.conf.RawConfig.Worker.RecordingStoragePath, "/tmp") + assert.Equal(t, w.localStorageState.Load().(server.LocalStorageState).String(), server.UnknownLocalStorageState.String()) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // New() panics if these aren't set tt.in.Logger = hclog.Default() - tt.in.RawConfig = &config.Config{SharedConfig: &configutil.SharedConfig{DisableMlock: true}} + if tt.in.RawConfig == nil { + tt.in.RawConfig = &config.Config{SharedConfig: &configutil.SharedConfig{DisableMlock: true}} + } w, err := New(context.Background(), tt.in) if tt.expErr { diff --git a/internal/server/repository_worker.go b/internal/server/repository_worker.go index 08f890dbcc..ba5725db33 100644 --- a/internal/server/repository_worker.go +++ b/internal/server/repository_worker.go @@ -305,6 +305,8 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt return nil, errors.New(ctx, errors.InvalidParameter, op, "worker keyId and reported name are both empty; one is required") case worker.OperationalState == "": return nil, errors.New(ctx, errors.InvalidParameter, op, "worker operational state is empty") + case worker.LocalStorageState == "": + return nil, errors.New(ctx, errors.InvalidParameter, op, "worker local storage state is empty") } var workerId string @@ -349,7 +351,7 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt // KMS-PKI) PKI-based workers to come via API only. We can't // really guard on this in the DB so we need to be sure to not // include it here. - n, err := w.Update(ctx, workerClone, []string{"address", "ReleaseVersion", "OperationalState"}, nil) + n, err := w.Update(ctx, workerClone, []string{"address", "ReleaseVersion", "OperationalState", "LocalStorageState"}, nil) if err != nil { return errors.Wrap(ctx, err, op, errors.WithMsg("unable to update status of pki worker")) } @@ -366,7 +368,7 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt workerClone.Type = KmsWorkerType.String() workerCreateConflict := &db.OnConflict{ Target: db.Columns{"public_id"}, - Action: append(db.SetColumns([]string{"address", "release_version", "operational_state"}), + Action: append(db.SetColumns([]string{"address", "release_version", "operational_state", "local_storage_state"}), db.SetColumnValues(map[string]any{"last_status_time": "now()"})...), } var withRowsAffected int64 diff --git a/internal/server/repository_worker_test.go b/internal/server/repository_worker_test.go index 7ec57b7176..89c69671fe 100644 --- a/internal/server/repository_worker_test.go +++ b/internal/server/repository_worker_test.go @@ -308,6 +308,19 @@ func TestUpsertWorkerStatus(t *testing.T) { // Version does not change for status updates assert.Equal(t, uint32(1), worker.Version) assert.Equal(t, "shutdown", worker.GetOperationalState()) + assert.Equal(t, server.UnknownLocalStorageState.String(), worker.GetLocalStorageState()) + + // update again with available local storage state + wStatus4 := server.NewWorker(scope.Global.String(), + server.WithAddress("new_address"), server.WithName("config_name1"), + server.WithOperationalState("shutdown"), server.WithReleaseVersion("Boundary v0.11.0"), + server.WithLocalStorageState("available")) + worker, err = repo.UpsertWorkerStatus(ctx, wStatus4) + require.NoError(t, err) + assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime()) + // Version does not change for status updates + assert.Equal(t, uint32(1), worker.Version) + assert.Equal(t, server.AvailableLocalStorageState.String(), worker.GetLocalStorageState()) // Should no longer see this worker in listing if we exclude shutdown workers workers, err = repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true)) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index fa6237646f..c1a5884490 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -78,3 +78,29 @@ type Writer interface { io.Writer WriteAndClose([]byte) (int, error) } + +// LocalStorageState is represents the state of local storage. +type LocalStorageState string + +const ( + // AvailableLocalStorageState indicates local storage is + // (minimumAvailableDiskSpace * 1.25) above the minimum disk space threshold. + // This could indicates recovery from a low, critical, or out of disk space + // local storage state. + AvailableLocalStorageState LocalStorageState = "available" + // LowStorageLocalStorageState indicates local storage is below the minimum + // disk space threshold. + LowStorageLocalStorageState LocalStorageState = "low storage" + // CriticallyLowStorageLocalStorageState indicates local storage is below the + // half the minimum disk space threshold. + CriticallyLowStorageLocalStorageState LocalStorageState = "critically low storage" + // OutOfStorageLocalStorageState indicates local storage is below 1MB. + OutOfStorageLocalStorageState LocalStorageState = "out of storage" + // NotConfiguredLocalStorageState indicates the local storage path is not + // configured. Intervention from an admin might be necessary to help resolve + // the issue. + NotConfiguredLocalStorageState LocalStorageState = "not configured" + // UnknownLocalStorageState is the default state for local storage. It indicates + // the local storage state is unknown. + UnknownLocalStorageState LocalStorageState = "unknown" +)