diff --git a/internal/daemon/cluster/handlers/worker_service.go b/internal/daemon/cluster/handlers/worker_service.go index e211922b60..aceecb9c83 100644 --- a/internal/daemon/cluster/handlers/worker_service.go +++ b/internal/daemon/cluster/handlers/worker_service.go @@ -10,7 +10,6 @@ import ( "sync/atomic" "time" - dcommon "github.com/hashicorp/boundary/internal/daemon/common" "github.com/hashicorp/boundary/internal/daemon/controller/common" "github.com/hashicorp/boundary/internal/daemon/controller/handlers" "github.com/hashicorp/boundary/internal/event" @@ -188,7 +187,7 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques event.WriteError(ctx, op, err, event.WithInfoMsg("error getting known connected worker ids")) return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error getting known connected worker ids: %v", err) } - authorizedDownstreams.WorkerPublicIds = dcommon.WorkerList(knownConnectedWorkers).PublicIds() + authorizedDownstreams.WorkerPublicIds = server.WorkerList(knownConnectedWorkers).PublicIds() } if len(req.GetConnectedUnmappedWorkerKeyIdentifiers()) > 0 { @@ -360,7 +359,7 @@ func (ws *workerServiceServer) ListHcpbWorkers(ctx context.Context, req *pbs.Lis return nil, status.Errorf(codes.Internal, "Error looking up workers: %v", err) } - managed, _ := dcommon.SeparateManagedWorkers(workers) + managed, _ := server.SeparateManagedWorkers(workers) resp := &pbs.ListHcpbWorkersResponse{} if len(managed) == 0 { return resp, nil diff --git a/internal/daemon/cluster/handlers/worker_service_test.go b/internal/daemon/cluster/handlers/worker_service_test.go index b93717d341..d4325ea99f 100644 --- a/internal/daemon/cluster/handlers/worker_service_test.go +++ b/internal/daemon/cluster/handlers/worker_service_test.go @@ -15,7 +15,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/hashicorp/boundary/internal/authtoken" credstatic "github.com/hashicorp/boundary/internal/credential/static" - dcommon "github.com/hashicorp/boundary/internal/daemon/common" "github.com/hashicorp/boundary/internal/db" pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" intglobals "github.com/hashicorp/boundary/internal/globals" @@ -587,28 +586,28 @@ func TestHcpbWorkers(t *testing.T) { } // Stale/unalive kms worker aren't expected... - server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}), + server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}), server.WithAddress("old.kms.1")) // Sleep + 500ms longer than the liveness duration. time.Sleep(time.Duration(liveDur.Load()) + time.Second) - server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}), + server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}), server.WithAddress("kms.1")) - server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}), + server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}), server.WithAddress("kms.2")) server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: "unrelated_tag", Value: "true"}), server.WithAddress("unrelated_tag.kms.1")) // Shutdown workers will be removed from routes and sessions, but still returned // to downstream workers - server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}), + server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}), server.WithAddress("shutdown.kms.3"), server.WithOperationalState(server.ShutdownOperationalState.String())) // PKI workers are also expected, if they have the managed worker tag serverRepo, err := serversRepoFn() require.NoError(err) var keyId string - server.TestPkiWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}), + server.TestPkiWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}), server.WithTestPkiWorkerAuthorizedKeyId(&keyId)) _, err = serverRepo.UpsertWorkerStatus(ctx, server.NewWorker(scope.Global.String(), server.WithAddress("pki.1")), server.WithKeyId(keyId)) require.NoError(err) diff --git a/internal/daemon/common/const.go b/internal/daemon/common/const.go index 9c3d41a916..b1df2ae0cc 100644 --- a/internal/daemon/common/const.go +++ b/internal/daemon/common/const.go @@ -6,5 +6,4 @@ package common const ( ReverseGrpcConnectionAlpnValue = "the-downstream-dialer-plays-an-uno-reverse-card" DataPlaneProxyAlpnValue = "i-herd-you-like-proxies-so-i-put-a-proxy-in-your-proxy" - ManagedWorkerTag = "boundary.cloud.hashicorp.com:managed" ) diff --git a/internal/daemon/controller/handler.go b/internal/daemon/controller/handler.go index 06c4efced9..a79ca11631 100644 --- a/internal/daemon/controller/handler.go +++ b/internal/daemon/controller/handler.go @@ -241,6 +241,7 @@ func (c *Controller) registerGrpcServices(s *grpc.Server) error { srs, err := session_recordings.NewServiceFn( c.baseContext, c.IamRepoFn, + c.ServersRepoFn, c.workerStatusGracePeriod, c.kms, c.conf.RawConfig.Controller.MaxPageSize, diff --git a/internal/daemon/controller/handlers/session_recordings/session_recording_service.go b/internal/daemon/controller/handlers/session_recordings/session_recording_service.go index f7b6c39c02..137b6faa1a 100644 --- a/internal/daemon/controller/handlers/session_recordings/session_recording_service.go +++ b/internal/daemon/controller/handlers/session_recordings/session_recording_service.go @@ -43,6 +43,7 @@ func init() { // NewServiceFn returns a storage bucket service which is not implemented in OSS var NewServiceFn = func(ctx context.Context, iamRepoFn common.IamRepoFactory, + serverRepoFn common.ServersRepoFactory, workerStatusGracePeriod *atomic.Int64, kms *kms.Kms, maxPageSize uint, diff --git a/internal/daemon/controller/handlers/targets/target_service.go b/internal/daemon/controller/handlers/targets/target_service.go index dab631a37d..48e5be7358 100644 --- a/internal/daemon/controller/handlers/targets/target_service.go +++ b/internal/daemon/controller/handlers/targets/target_service.go @@ -19,7 +19,6 @@ import ( "github.com/hashicorp/boundary/internal/alias" talias "github.com/hashicorp/boundary/internal/alias/target" "github.com/hashicorp/boundary/internal/credential" - wl "github.com/hashicorp/boundary/internal/daemon/common" "github.com/hashicorp/boundary/internal/daemon/controller/auth" "github.com/hashicorp/boundary/internal/daemon/controller/common" "github.com/hashicorp/boundary/internal/daemon/controller/handlers" @@ -749,12 +748,12 @@ func (s Service) RemoveTargetCredentialSources(ctx context.Context, req *pbs.Rem func AuthorizeSessionWithWorkerFilter( _ context.Context, t target.Target, - selectedWorkers wl.WorkerList, + selectedWorkers server.WorkerList, _ string, _ intglobals.ControllerExtension, _ common.Downstreamers, _ ...target.Option, -) (wl.WorkerList, *server.Worker, error) { +) (server.WorkerList, *server.Worker, error) { if len(selectedWorkers) > 0 { var eval *bexpr.Evaluator var err error @@ -1057,7 +1056,7 @@ func (s Service) AuthorizeSession(ctx context.Context, req *pbs.AuthorizeSession if err != nil { return nil, err } - sess, err = sessionRepo.CreateSession(ctx, wrapper, sess, wl.WorkerList(selectedWorkers).Addresses()) + sess, err = sessionRepo.CreateSession(ctx, wrapper, sess, server.WorkerList(selectedWorkers).Addresses()) if err != nil { return nil, err } @@ -1197,7 +1196,7 @@ func (s Service) AuthorizeSession(ctx context.Context, req *pbs.AuthorizeSession PrivateKey: sess.CertificatePrivateKey, HostId: hostId, Endpoint: endpointUrl.String(), - WorkerInfo: wl.WorkerList(selectedWorkers).WorkerInfos(), + WorkerInfo: server.WorkerList(selectedWorkers).WorkerInfos(), ConnectionLimit: t.GetSessionConnectionLimit(), DefaultClientPort: t.GetDefaultClientPort(), } diff --git a/internal/daemon/controller/handlers/targets/target_service_test.go b/internal/daemon/controller/handlers/targets/target_service_test.go index 61a4ad4257..37a95ffa15 100644 --- a/internal/daemon/controller/handlers/targets/target_service_test.go +++ b/internal/daemon/controller/handlers/targets/target_service_test.go @@ -9,7 +9,6 @@ import ( "fmt" "testing" - "github.com/hashicorp/boundary/internal/daemon/common" "github.com/hashicorp/boundary/internal/db" "github.com/hashicorp/boundary/internal/kms" "github.com/hashicorp/boundary/internal/server" @@ -29,7 +28,7 @@ func TestWorkerList_Addresses(t *testing.T) { "test4", } var workerInfos []*pb.WorkerInfo - var tested common.WorkerList + var tested server.WorkerList for _, a := range addresses { workerInfos = append(workerInfos, &pb.WorkerInfo{Address: a}) tested = append(tested, server.NewWorker(scope.Global.String(), diff --git a/internal/daemon/controller/handlers/workers/worker_service.go b/internal/daemon/controller/handlers/workers/worker_service.go index 5a6af04b3c..71703e5088 100644 --- a/internal/daemon/controller/handlers/workers/worker_service.go +++ b/internal/daemon/controller/handlers/workers/worker_service.go @@ -12,7 +12,6 @@ import ( "strings" "github.com/hashicorp/boundary/globals" - wl "github.com/hashicorp/boundary/internal/daemon/common" "github.com/hashicorp/boundary/internal/daemon/controller/auth" "github.com/hashicorp/boundary/internal/daemon/controller/common" "github.com/hashicorp/boundary/internal/daemon/controller/common/scopeids" @@ -331,7 +330,7 @@ func (s Service) DeleteWorker(ctx context.Context, req *pbs.DeleteWorkerRequest) return nil, err } - if wl.IsManagedWorker(w) { + if server.IsManagedWorker(w) { return nil, handlers.InvalidArgumentErrorf("Error in provided request.", map[string]string{"id": "Managed workers cannot be deleted."}) } @@ -368,7 +367,7 @@ func (s Service) UpdateWorker(ctx context.Context, req *pbs.UpdateWorkerRequest) // generated from the scope and name, it's functionally equivalent to // checking the type, but works for both KMS-PKI and old-style KMS workers. switch { - case wl.IsManagedWorker(w): + case server.IsManagedWorker(w): return nil, handlers.InvalidArgumentErrorf( "Error in provided request.", map[string]string{"id": "Managed workers cannot be updated."}, @@ -879,7 +878,7 @@ func (s Service) toProto(ctx context.Context, in *server.Worker, opt ...handlers } } // Managed workers cannot be deleted - if wl.IsManagedWorker(in) { + if server.IsManagedWorker(in) { allActions := out.AuthorizedActions out.AuthorizedActions = make([]string, 0, len(allActions)) for _, act := range allActions { @@ -1069,7 +1068,7 @@ func validateStringForDb(str string) string { return "must be non-empty." case len(str) > 512: return "must be within 512 characters." - case str == wl.ManagedWorkerTag: + case str == server.ManagedWorkerTag: return "cannot be the managed worker tag." default: return "" diff --git a/internal/daemon/controller/handlers/workers/worker_service_test.go b/internal/daemon/controller/handlers/workers/worker_service_test.go index a77d09a0c6..489daab099 100644 --- a/internal/daemon/controller/handlers/workers/worker_service_test.go +++ b/internal/daemon/controller/handlers/workers/worker_service_test.go @@ -13,7 +13,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/boundary/globals" - wl "github.com/hashicorp/boundary/internal/daemon/common" "github.com/hashicorp/boundary/internal/daemon/controller/auth" "github.com/hashicorp/boundary/internal/daemon/controller/common" "github.com/hashicorp/boundary/internal/daemon/controller/handlers" @@ -205,7 +204,7 @@ func TestGet(t *testing.T) { server.WithAddress("test managed pki worker address"), server.WithLocalStorageState(server.AvailableLocalStorageState.String()), server.WithWorkerTags(&server.Tag{ - Key: wl.ManagedWorkerTag, + Key: server.ManagedWorkerTag, Value: "true", })), server.WithUpdateTags(true), @@ -233,10 +232,10 @@ func TestGet(t *testing.T) { LastStatusTime: managedPkiWorker.GetLastStatusTime().GetTimestamp(), ReleaseVersion: managedPkiWorker.ReleaseVersion, CanonicalTags: map[string]*structpb.ListValue{ - wl.ManagedWorkerTag: structListValue(t, "true"), + server.ManagedWorkerTag: structListValue(t, "true"), }, ConfigTags: map[string]*structpb.ListValue{ - wl.ManagedWorkerTag: structListValue(t, "true"), + server.ManagedWorkerTag: structListValue(t, "true"), }, Type: PkiWorkerType, DirectlyConnectedDownstreamWorkers: connectedDownstreams, @@ -499,7 +498,7 @@ func TestDelete(t *testing.T) { Value: "bar", })) wManaged := server.TestKmsWorker(t, conn, wrap, server.WithWorkerTags(&server.Tag{ - Key: wl.ManagedWorkerTag, + Key: server.ManagedWorkerTag, Value: "bar", })) @@ -2059,7 +2058,7 @@ func TestService_AddWorkerTags(t *testing.T) { return &pbs.AddWorkerTagsRequest{ Id: worker.PublicId, Version: worker.Version, - ApiTags: map[string]*structpb.ListValue{wl.ManagedWorkerTag: {Values: []*structpb.Value{structpb.NewStringValue("value2")}}}, + ApiTags: map[string]*structpb.ListValue{server.ManagedWorkerTag: {Values: []*structpb.Value{structpb.NewStringValue("value2")}}}, } }(), wantErrContains: "Tag keys cannot be the managed worker tag.", @@ -2222,7 +2221,7 @@ func TestService_SetWorkerTags(t *testing.T) { return &pbs.SetWorkerTagsRequest{ Id: worker.PublicId, Version: worker.Version, - ApiTags: map[string]*structpb.ListValue{wl.ManagedWorkerTag: {Values: []*structpb.Value{structpb.NewStringValue("value2")}}}, + ApiTags: map[string]*structpb.ListValue{server.ManagedWorkerTag: {Values: []*structpb.Value{structpb.NewStringValue("value2")}}}, } }(), wantErrContains: "Tag keys cannot be the managed worker tag.", diff --git a/internal/daemon/controller/tickers.go b/internal/daemon/controller/tickers.go index ff7dcb5520..ebf5314d16 100644 --- a/internal/daemon/controller/tickers.go +++ b/internal/daemon/controller/tickers.go @@ -10,7 +10,6 @@ import ( "time" "github.com/hashicorp/boundary/internal/daemon/cluster" - "github.com/hashicorp/boundary/internal/daemon/common" "github.com/hashicorp/boundary/internal/errors" "github.com/hashicorp/boundary/internal/event" "github.com/hashicorp/boundary/internal/server" @@ -204,7 +203,7 @@ func (c *Controller) startWorkerConnectionMaintenanceTicking(cancelCtx context.C event.WriteError(cancelCtx, op, err, event.WithInfoMsg("couldn't get known workers from repo")) break } - connectionState.DisconnectMissingWorkers(common.WorkerList(knownWorker).PublicIds()) + connectionState.DisconnectMissingWorkers(server.WorkerList(knownWorker).PublicIds()) } if len(connectionState.UnmappedKeyIds()) > 0 { diff --git a/internal/daemon/common/worker_list.go b/internal/server/worker_list.go similarity index 65% rename from internal/daemon/common/worker_list.go rename to internal/server/worker_list.go index bf57b1492d..792b42fd58 100644 --- a/internal/daemon/common/worker_list.go +++ b/internal/server/worker_list.go @@ -1,23 +1,25 @@ // Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: BUSL-1.1 -package common +package server import ( stderrors "errors" "fmt" "github.com/hashicorp/boundary/internal/daemon/controller/handlers" - "github.com/hashicorp/boundary/internal/server" pb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/targets" + "github.com/hashicorp/boundary/sdk/pbs/plugin" "github.com/hashicorp/boundary/version" "github.com/hashicorp/go-bexpr" "github.com/mitchellh/pointerstructure" "google.golang.org/grpc/codes" ) +const ManagedWorkerTag = "boundary.cloud.hashicorp.com:managed" + // WorkerList is a helper type to make the selection of workers clearer and more declarative. -type WorkerList []*server.Worker +type WorkerList []*Worker // addresses converts the slice of workers to a slice of their addresses func (w WorkerList) Addresses() []string { @@ -50,7 +52,7 @@ func (w WorkerList) WorkerInfos() []*pb.WorkerInfo { // SupportsFeature returns a new WorkerList composed of all workers in this // WorkerList which supports the provided feature. func (w WorkerList) SupportsFeature(f version.Feature) WorkerList { - var ret []*server.Worker + var ret []*Worker for _, worker := range w { sv := version.FromVersionString(worker.GetReleaseVersion()).Semver() if version.SupportsFeature(sv, f) { @@ -63,7 +65,7 @@ func (w WorkerList) SupportsFeature(f version.Feature) WorkerList { // filtered returns a new workerList where all elements contained in it are the // ones which from the original workerList that pass the evaluator's evaluation. func (w WorkerList) Filtered(eval *bexpr.Evaluator) (WorkerList, error) { - var ret []*server.Worker + var ret []*Worker for _, worker := range w { filterInput := map[string]any{ "name": worker.GetName(), @@ -86,8 +88,8 @@ func (w WorkerList) Filtered(eval *bexpr.Evaluator) (WorkerList, error) { // unmanaged workers, respectively func SeparateManagedWorkers(workers WorkerList) (managedWorkers, nonManagedWorkers WorkerList) { // Build a set of managed and unmanaged workers - managedWorkers = make([]*server.Worker, 0, len(workers)) - nonManagedWorkers = make([]*server.Worker, 0, len(workers)) + managedWorkers = make([]*Worker, 0, len(workers)) + nonManagedWorkers = make([]*Worker, 0, len(workers)) for _, worker := range workers { if IsManagedWorker(worker) { managedWorkers = append(managedWorkers, worker) @@ -99,7 +101,7 @@ func SeparateManagedWorkers(workers WorkerList) (managedWorkers, nonManagedWorke } // IsManagedWorker indicates whether the given worker is managed -func IsManagedWorker(worker *server.Worker) bool { +func IsManagedWorker(worker *Worker) bool { return len(worker.CanonicalTags()[ManagedWorkerTag]) != 0 } @@ -110,16 +112,16 @@ func IsManagedWorker(worker *server.Worker) bool { // will return workers with Unknown local storage state. // Workers that do not support local storage state will be considered healthy. func FilterWorkersByLocalStorageState(workers WorkerList) (healthyWorkers WorkerList) { - availableWorkers := make([]*server.Worker, 0, len(workers)) - unknownWorkers := make([]*server.Worker, 0, len(workers)) + availableWorkers := make([]*Worker, 0, len(workers)) + unknownWorkers := make([]*Worker, 0, len(workers)) for _, worker := range workers { sv := version.FromVersionString(worker.GetReleaseVersion()).Semver() if version.SupportsFeature(sv, version.LocalStorageState) { ls := worker.GetLocalStorageState() - if ls == server.AvailableLocalStorageState.String() { + if ls == AvailableLocalStorageState.String() { availableWorkers = append(availableWorkers, worker) - } else if ls == server.UnknownLocalStorageState.String() { + } else if ls == UnknownLocalStorageState.String() { unknownWorkers = append(unknownWorkers, worker) } } else { @@ -132,3 +134,35 @@ func FilterWorkersByLocalStorageState(workers WorkerList) (healthyWorkers Worker } return unknownWorkers } + +// FilterStorageBucketCredentialStateFn is a function definition that is used to filter +// out workers that are considered to be in a unhealthy state. The function should return +// true for healthy workers. +type FilterStorageBucketCredentialStateFn func(*plugin.StorageBucketCredentialState) bool + +// FilterStorageBucketCredentialByWriteAccess will return true if the write state is missing or +// if the state is in an OK or UNKNOWN state. +func FilterStorageBucketCredentialByWriteAccess(sbcState *plugin.StorageBucketCredentialState) bool { + if sbcState == nil || sbcState.State == nil || sbcState.State.Write == nil { + return true + } + return sbcState.State.Write.State != plugin.StateType_STATE_TYPE_ERROR +} + +// FilterStorageBucketCredentialByReadAccess will return true if the read state is missing or +// if the state is in an OK or UNKNOWN state. +func FilterStorageBucketCredentialByReadAccess(sbcState *plugin.StorageBucketCredentialState) bool { + if sbcState == nil || sbcState.State == nil || sbcState.State.Read == nil { + return true + } + return sbcState.State.Read.State != plugin.StateType_STATE_TYPE_ERROR +} + +// FilterStorageBucketCredentialByDeleteAccess will return true if the delete state is missing or +// if the state is in an OK or UNKNOWN state. +func FilterStorageBucketCredentialByDeleteAccess(sbcState *plugin.StorageBucketCredentialState) bool { + if sbcState == nil || sbcState.State == nil || sbcState.State.Delete == nil { + return true + } + return sbcState.State.Delete.State != plugin.StateType_STATE_TYPE_ERROR +}