feat: Report Local Storage State Status

- add `LocalStorageState` to worker status reports
- update worker service handler to process `LocalStorageState`. If is empty for workers older workers, we set the `LocalStorageState` to `unknown`
- update field mask paths to support `LocalStorageState`
pull/4607/head
Elim Tsiagbey 2 years ago
parent aea473c77b
commit 8506ed3e2c

@ -134,13 +134,20 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques
}
}
if wStat.LocalStorageState == "" {
// If this is an older worker (pre 0.15), it will not have LocalStorageState as part of it's status
// so we'll default to unknown.
wStat.LocalStorageState = server.UnknownLocalStorageState.String()
}
wConf := server.NewWorker(scope.Global.String(),
server.WithName(wStat.GetName()),
server.WithDescription(wStat.GetDescription()),
server.WithAddress(wStat.GetAddress()),
server.WithWorkerTags(workerTags...),
server.WithReleaseVersion(wStat.ReleaseVersion),
server.WithOperationalState(wStat.OperationalState))
server.WithOperationalState(wStat.OperationalState),
server.WithLocalStorageState(wStat.LocalStorageState))
opts := []server.Option{server.WithUpdateTags(req.GetUpdateTags())}
if wStat.GetPublicId() != "" {
opts = append(opts, server.WithPublicId(wStat.GetPublicId()))

@ -1357,3 +1357,167 @@ func TestWorkerOperationalStatus(t *testing.T) {
})
}
}
func TestWorkerLocalStorageStateStatus(t *testing.T) {
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
wrapper := db.TestWrapper(t)
kms := kms.TestKms(t, conn, wrapper)
serverRepo, _ := server.NewRepository(ctx, rw, rw, kms)
serverRepo.UpsertController(ctx, &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) {
return server.NewRepositoryStorage(ctx, rw, rw, kms)
}
sessionRepoFn := func(opt ...session.Option) (*session.Repository, error) {
return session.NewRepository(ctx, rw, rw, kms)
}
connRepoFn := func() (*session.ConnectionRepository, error) {
return session.NewConnectionRepository(ctx, rw, rw, kms)
}
fce := &fakeControllerExtension{
reader: rw,
writer: rw,
}
worker1 := server.TestKmsWorker(t, conn, wrapper)
s := NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, nil, new(sync.Map), kms, new(atomic.Int64), fce)
require.NotNil(t, s)
cases := []struct {
name string
wantErr bool
wantErrContains string
req *pbs.StatusRequest
wantState string
}{
{
name: "Available local storage worker",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
LocalStorageState: server.AvailableLocalStorageState.String(),
},
},
wantState: server.AvailableLocalStorageState.String(),
},
{
name: "Worker in low local storage",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
LocalStorageState: server.LowStorageLocalStorageState.String(),
},
},
wantState: server.LowStorageLocalStorageState.String(),
},
{
name: "Worker in critically low local storage",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
LocalStorageState: server.CriticallyLowStorageLocalStorageState.String(),
},
},
wantState: server.CriticallyLowStorageLocalStorageState.String(),
},
{
name: "Worker in out of space local storage",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
LocalStorageState: server.OutOfStorageLocalStorageState.String(),
},
},
wantState: server.OutOfStorageLocalStorageState.String(),
},
{
name: "Worker in not configured local storage",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
LocalStorageState: server.NotConfiguredLocalStorageState.String(),
},
},
wantState: server.NotConfiguredLocalStorageState.String(),
},
{
name: "No local storage state - default to unknown",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
ReleaseVersion: "Boundary v0.11.0",
},
},
wantState: server.UnknownLocalStorageState.String(),
},
{
name: "Old worker (empty release version) and no local storage state - default to active",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
},
wantState: server.UnknownLocalStorageState.String(),
},
{
name: "Worker with invalid local storage type",
wantErr: true,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
LocalStorageState: "invalid",
},
},
wantErrContains: "foreign key constraint",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
got, err := s.Status(ctx, tc.req)
if tc.wantErr {
require.Error(err)
assert.Contains(err.Error(), tc.wantErrContains)
return
}
require.NoError(err)
require.NotNil(got)
repoWorker, err := serverRepo.LookupWorkerByName(ctx, worker1.Name)
require.NoError(err)
assert.Equal(tc.wantState, repoWorker.LocalStorageState)
})
}
}

@ -202,13 +202,14 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
result, err := client.Status(statusCtx, &pbs.StatusRequest{
Jobs: activeJobs,
WorkerStatus: &pb.ServerWorkerStatus{
Name: w.conf.RawConfig.Worker.Name,
Description: w.conf.RawConfig.Worker.Description,
Address: w.conf.RawConfig.Worker.PublicAddr,
Tags: tags,
KeyId: keyId,
ReleaseVersion: versionInfo.FullVersionNumber(false),
OperationalState: w.operationalState.Load().(server.OperationalState).String(),
Name: w.conf.RawConfig.Worker.Name,
Description: w.conf.RawConfig.Worker.Description,
Address: w.conf.RawConfig.Worker.PublicAddr,
Tags: tags,
KeyId: keyId,
ReleaseVersion: versionInfo.FullVersionNumber(false),
OperationalState: w.operationalState.Load().(server.OperationalState).String(),
LocalStorageState: w.localStorageState.Load().(server.LocalStorageState).String(),
},
ConnectedWorkerKeyIdentifiers: connectionState.AllKeyIds(),
ConnectedUnmappedWorkerKeyIdentifiers: connectionState.UnmappedKeyIds(),

@ -146,6 +146,7 @@ type Worker struct {
lastStatusSuccess *atomic.Value
workerStartTime time.Time
operationalState *atomic.Value
localStorageState *atomic.Value
upstreamConnectionState *atomic.Value
controllerMultihopConn *atomic.Value
@ -221,6 +222,7 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
WorkerAuthCurrentKeyId: new(ua.String),
operationalState: new(atomic.Value),
downstreamConnManager: cluster.NewDownstreamManager(),
localStorageState: new(atomic.Value),
successfulStatusGracePeriod: new(atomic.Int64),
statusCallTimeoutDuration: new(atomic.Int64),
upstreamConnectionState: new(atomic.Value),
@ -228,6 +230,7 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
}
w.operationalState.Store(server.UnknownOperationalState)
w.localStorageState.Store(server.UnknownLocalStorageState)
if reverseConnReceiverFactory != nil {
w.downstreamReceiver = reverseConnReceiverFactory()
@ -242,6 +245,10 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
conf.RawConfig.Worker = new(config.Worker)
}
if w.conf.RawConfig.Worker.RecordingStoragePath == "" {
w.localStorageState.Store(server.NotConfiguredLocalStorageState)
}
if w.conf.RawConfig.Worker.RecordingStoragePath != "" && recordingStorageFactory != nil {
pluginLogger, err := event.NewHclogLogger(ctx, w.conf.Server.Eventer)
if err != nil {

@ -16,7 +16,9 @@ import (
"github.com/hashicorp/boundary/internal/cmd/config"
"github.com/hashicorp/boundary/internal/daemon/worker/session"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/event"
"github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/hashicorp/boundary/internal/server"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/configutil/v2"
"github.com/hashicorp/go-secure-stdlib/listenerutil"
@ -121,13 +123,61 @@ func TestWorkerNew(t *testing.T) {
require.NotNil(t, w.nonceFn)
},
},
{
name: "worker recording storage path is not set",
in: &Config{
Server: &base.Server{
Listeners: []*base.ServerListener{
{Config: &listenerutil.ListenerConfig{Purpose: []string{"proxy"}}},
},
Eventer: &event.Eventer{},
},
RawConfig: &config.Config{
Worker: &config.Worker{},
SharedConfig: &configutil.SharedConfig{
DisableMlock: true,
},
},
},
expErr: false,
assertions: func(t *testing.T, w *Worker) {
assert.Equal(t, w.conf.RawConfig.Worker.RecordingStoragePath, "")
assert.Equal(t, w.localStorageState.Load().(server.LocalStorageState).String(), server.NotConfiguredLocalStorageState.String())
},
},
{
name: "worker recording storage path is set",
in: &Config{
Server: &base.Server{
Listeners: []*base.ServerListener{
{Config: &listenerutil.ListenerConfig{Purpose: []string{"proxy"}}},
},
Eventer: &event.Eventer{},
},
RawConfig: &config.Config{
Worker: &config.Worker{
RecordingStoragePath: "/tmp",
},
SharedConfig: &configutil.SharedConfig{
DisableMlock: true,
},
},
},
expErr: false,
assertions: func(t *testing.T, w *Worker) {
assert.Equal(t, w.conf.RawConfig.Worker.RecordingStoragePath, "/tmp")
assert.Equal(t, w.localStorageState.Load().(server.LocalStorageState).String(), server.UnknownLocalStorageState.String())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// New() panics if these aren't set
tt.in.Logger = hclog.Default()
tt.in.RawConfig = &config.Config{SharedConfig: &configutil.SharedConfig{DisableMlock: true}}
if tt.in.RawConfig == nil {
tt.in.RawConfig = &config.Config{SharedConfig: &configutil.SharedConfig{DisableMlock: true}}
}
w, err := New(context.Background(), tt.in)
if tt.expErr {

@ -305,6 +305,8 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker keyId and reported name are both empty; one is required")
case worker.OperationalState == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker operational state is empty")
case worker.LocalStorageState == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker local storage state is empty")
}
var workerId string
@ -349,7 +351,7 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt
// KMS-PKI) PKI-based workers to come via API only. We can't
// really guard on this in the DB so we need to be sure to not
// include it here.
n, err := w.Update(ctx, workerClone, []string{"address", "ReleaseVersion", "OperationalState"}, nil)
n, err := w.Update(ctx, workerClone, []string{"address", "ReleaseVersion", "OperationalState", "LocalStorageState"}, nil)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to update status of pki worker"))
}
@ -366,7 +368,7 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt
workerClone.Type = KmsWorkerType.String()
workerCreateConflict := &db.OnConflict{
Target: db.Columns{"public_id"},
Action: append(db.SetColumns([]string{"address", "release_version", "operational_state"}),
Action: append(db.SetColumns([]string{"address", "release_version", "operational_state", "local_storage_state"}),
db.SetColumnValues(map[string]any{"last_status_time": "now()"})...),
}
var withRowsAffected int64

@ -308,6 +308,19 @@ func TestUpsertWorkerStatus(t *testing.T) {
// Version does not change for status updates
assert.Equal(t, uint32(1), worker.Version)
assert.Equal(t, "shutdown", worker.GetOperationalState())
assert.Equal(t, server.UnknownLocalStorageState.String(), worker.GetLocalStorageState())
// update again with available local storage state
wStatus4 := server.NewWorker(scope.Global.String(),
server.WithAddress("new_address"), server.WithName("config_name1"),
server.WithOperationalState("shutdown"), server.WithReleaseVersion("Boundary v0.11.0"),
server.WithLocalStorageState("available"))
worker, err = repo.UpsertWorkerStatus(ctx, wStatus4)
require.NoError(t, err)
assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime())
// Version does not change for status updates
assert.Equal(t, uint32(1), worker.Version)
assert.Equal(t, server.AvailableLocalStorageState.String(), worker.GetLocalStorageState())
// Should no longer see this worker in listing if we exclude shutdown workers
workers, err = repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true))

@ -78,3 +78,29 @@ type Writer interface {
io.Writer
WriteAndClose([]byte) (int, error)
}
// LocalStorageState is represents the state of local storage.
type LocalStorageState string
const (
// AvailableLocalStorageState indicates local storage is
// (minimumAvailableDiskSpace * 1.25) above the minimum disk space threshold.
// This could indicates recovery from a low, critical, or out of disk space
// local storage state.
AvailableLocalStorageState LocalStorageState = "available"
// LowStorageLocalStorageState indicates local storage is below the minimum
// disk space threshold.
LowStorageLocalStorageState LocalStorageState = "low storage"
// CriticallyLowStorageLocalStorageState indicates local storage is below the
// half the minimum disk space threshold.
CriticallyLowStorageLocalStorageState LocalStorageState = "critically low storage"
// OutOfStorageLocalStorageState indicates local storage is below 1MB.
OutOfStorageLocalStorageState LocalStorageState = "out of storage"
// NotConfiguredLocalStorageState indicates the local storage path is not
// configured. Intervention from an admin might be necessary to help resolve
// the issue.
NotConfiguredLocalStorageState LocalStorageState = "not configured"
// UnknownLocalStorageState is the default state for local storage. It indicates
// the local storage state is unknown.
UnknownLocalStorageState LocalStorageState = "unknown"
)

Loading…
Cancel
Save