diff --git a/internal/daemon/cluster/handlers/worker_service.go b/internal/daemon/cluster/handlers/worker_service.go index aceecb9c83..528cf341f9 100644 --- a/internal/daemon/cluster/handlers/worker_service.go +++ b/internal/daemon/cluster/handlers/worker_service.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/boundary/internal/session" "github.com/hashicorp/boundary/internal/types/scope" "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/targets" + "github.com/hashicorp/boundary/sdk/pbs/plugin" "github.com/hashicorp/go-bexpr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -58,6 +59,10 @@ var ( // available in OSS and are a straight forward proxy with no additional // fields needed. getProtocolContext = noProtocolContext + + // updateWorkerStorageBucketCredentialStatesFn will update the worker storage bucket + // credential state. + updateWorkerStorageBucketCredentialStatesFn = updateWorkerStorageBucketCredentialStates ) // singleHopConnectionRoute returns a route consisting of the singlehop worker (the root worker id) @@ -159,6 +164,12 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques event.WriteError(ctx, op, err, event.WithInfoMsg("error storing worker status")) return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error storing worker status: %v", err) } + + // update storage states + if sbcStates := wStat.GetStorageBucketCredentialStates(); sbcStates != nil && wrk.GetPublicId() != "" { + updateWorkerStorageBucketCredentialStatesFn(ctx, serverRepo, wrk.GetPublicId(), sbcStates) + } + controllers, err := serverRepo.ListControllers(ctx, server.WithLiveness(time.Duration(ws.livenessTimeToStale.Load()))) if err != nil { event.WriteError(ctx, op, err, event.WithInfoMsg("error getting current controllers")) @@ -746,3 +757,8 @@ func (ws *workerServiceServer) CloseConnection(ctx context.Context, req *pbs.Clo return ret, nil } + +func updateWorkerStorageBucketCredentialStates(ctx context.Context, repo *server.Repository, workerId string, states map[string]*plugin.StorageBucketCredentialState) { + const op = "handlers.updateWorkerStorageBucketCredentialStates" + return +} diff --git a/internal/daemon/cluster/handlers/worker_service_status_test.go b/internal/daemon/cluster/handlers/worker_service_status_test.go index 96670b18c1..fdda29fbf9 100644 --- a/internal/daemon/cluster/handlers/worker_service_status_test.go +++ b/internal/daemon/cluster/handlers/worker_service_status_test.go @@ -6,6 +6,7 @@ package handlers import ( "context" "crypto/rand" + "fmt" "sort" "sync" "sync/atomic" @@ -48,10 +49,11 @@ func TestStatus(t *testing.T) { serverRepo, err := server.NewRepository(ctx, rw, rw, kms) require.NoError(t, err) - _, err = serverRepo.UpsertController(ctx, &store.Controller{ + c := &store.Controller{ PrivateId: "test_controller1", Address: "127.0.0.1", - }) + } + _, err = serverRepo.UpsertController(ctx, c) require.NoError(t, err) serversRepoFn := func() (*server.Repository, error) { @@ -114,7 +116,6 @@ func TestStatus(t *testing.T) { tofu = session.TestTofu(t) sess, _, err = repo.ActivateSession(ctx, sess.PublicId, sess.Version, tofu) require.NoError(t, err) - require.NoError(t, err) s := NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, nil, new(sync.Map), kms, new(atomic.Int64), fce) require.NotNil(t, s) @@ -487,10 +488,11 @@ func TestStatusSessionClosed(t *testing.T) { serverRepo, err := server.NewRepository(ctx, rw, rw, kms) require.NoError(t, err) - _, err = serverRepo.UpsertController(ctx, &store.Controller{ + c := &store.Controller{ PrivateId: "test_controller1", Address: "127.0.0.1", - }) + } + _, err = serverRepo.UpsertController(ctx, c) require.NoError(t, err) serversRepoFn := func() (*server.Repository, error) { @@ -646,6 +648,7 @@ func TestStatusSessionClosed(t *testing.T) { assert.Equal(tc.wantErrMsg, err.Error()) return } + fmt.Printf("want upstreams: %v, got upstreams: %v\n", tc.want.CalculatedUpstreams, got.CalculatedUpstreams) assert.Empty( cmp.Diff( tc.want, @@ -677,11 +680,14 @@ func TestStatusDeadConnection(t *testing.T) { kms := kms.TestKms(t, conn, wrapper) org, prj := iam.TestScopes(t, iam.TestRepo(t, conn, wrapper)) - serverRepo, _ := server.NewRepository(ctx, rw, rw, kms) - _, err := serverRepo.UpsertController(ctx, &store.Controller{ + serverRepo, err := server.NewRepository(ctx, rw, rw, kms) + require.NoError(t, err) + + c := &store.Controller{ PrivateId: "test_controller1", Address: "127.0.0.1", - }) + } + _, err = serverRepo.UpsertController(ctx, c) require.NoError(t, err) worker1 := server.TestKmsWorker(t, conn, wrapper) @@ -835,11 +841,14 @@ func TestStatusWorkerWithKeyId(t *testing.T) { serverRepo, err := server.NewRepository(ctx, rw, rw, kms) require.NoError(t, err) - _, err = serverRepo.UpsertController(ctx, &store.Controller{ + + c := &store.Controller{ PrivateId: "test_controller1", Address: "127.0.0.1", - }) + } + _, err = serverRepo.UpsertController(ctx, c) require.NoError(t, err) + serversRepoFn := func() (*server.Repository, error) { return serverRepo, nil } @@ -1036,11 +1045,16 @@ func TestStatusAuthorizedWorkers(t *testing.T) { err := kmsCache.CreateKeys(context.Background(), scope.Global.String(), kms.WithRandomReader(rand.Reader)) require.NoError(t, err) - serverRepo, _ := server.NewRepository(ctx, rw, rw, kmsCache) - serverRepo.UpsertController(ctx, &store.Controller{ + serverRepo, err := server.NewRepository(ctx, rw, rw, kmsCache) + require.NoError(t, err) + + c := &store.Controller{ PrivateId: "test_controller1", Address: "127.0.0.1", - }) + } + _, err = serverRepo.UpsertController(ctx, c) + require.NoError(t, err) + serversRepoFn := func() (*server.Repository, error) { return serverRepo, nil } @@ -1253,11 +1267,16 @@ func TestWorkerOperationalStatus(t *testing.T) { wrapper := db.TestWrapper(t) kms := kms.TestKms(t, conn, wrapper) - serverRepo, _ := server.NewRepository(ctx, rw, rw, kms) - serverRepo.UpsertController(ctx, &store.Controller{ + serverRepo, err := server.NewRepository(ctx, rw, rw, kms) + require.NoError(t, err) + + c := &store.Controller{ PrivateId: "test_controller1", Address: "127.0.0.1", - }) + } + _, err = serverRepo.UpsertController(ctx, c) + require.NoError(t, err) + serversRepoFn := func() (*server.Repository, error) { return serverRepo, nil } @@ -1365,12 +1384,16 @@ func TestWorkerLocalStorageStateStatus(t *testing.T) { wrapper := db.TestWrapper(t) kms := kms.TestKms(t, conn, wrapper) - serverRepo, _ := server.NewRepository(ctx, rw, rw, kms) - _, err := serverRepo.UpsertController(ctx, &store.Controller{ + serverRepo, err := server.NewRepository(ctx, rw, rw, kms) + require.NoError(t, err) + + c := &store.Controller{ PrivateId: "test_controller1", Address: "127.0.0.1", - }) + } + _, err = serverRepo.UpsertController(ctx, c) require.NoError(t, err) + serversRepoFn := func() (*server.Repository, error) { return serverRepo, nil }