feat: workers filtered by sbc state

pull/4940/head
Damian Debkowski 2 years ago committed by Elim Tsiagbey
parent 130010a2ca
commit 00e4c5448b

@ -10,7 +10,6 @@ import (
"sync/atomic"
"time"
dcommon "github.com/hashicorp/boundary/internal/daemon/common"
"github.com/hashicorp/boundary/internal/daemon/controller/common"
"github.com/hashicorp/boundary/internal/daemon/controller/handlers"
"github.com/hashicorp/boundary/internal/event"
@ -188,7 +187,7 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting known connected worker ids"))
return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error getting known connected worker ids: %v", err)
}
authorizedDownstreams.WorkerPublicIds = dcommon.WorkerList(knownConnectedWorkers).PublicIds()
authorizedDownstreams.WorkerPublicIds = server.WorkerList(knownConnectedWorkers).PublicIds()
}
if len(req.GetConnectedUnmappedWorkerKeyIdentifiers()) > 0 {
@ -360,7 +359,7 @@ func (ws *workerServiceServer) ListHcpbWorkers(ctx context.Context, req *pbs.Lis
return nil, status.Errorf(codes.Internal, "Error looking up workers: %v", err)
}
managed, _ := dcommon.SeparateManagedWorkers(workers)
managed, _ := server.SeparateManagedWorkers(workers)
resp := &pbs.ListHcpbWorkersResponse{}
if len(managed) == 0 {
return resp, nil

@ -15,7 +15,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/boundary/internal/authtoken"
credstatic "github.com/hashicorp/boundary/internal/credential/static"
dcommon "github.com/hashicorp/boundary/internal/daemon/common"
"github.com/hashicorp/boundary/internal/db"
pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services"
intglobals "github.com/hashicorp/boundary/internal/globals"
@ -587,28 +586,28 @@ func TestHcpbWorkers(t *testing.T) {
}
// Stale/unalive kms worker aren't expected...
server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}),
server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}),
server.WithAddress("old.kms.1"))
// Sleep + 500ms longer than the liveness duration.
time.Sleep(time.Duration(liveDur.Load()) + time.Second)
server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}),
server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}),
server.WithAddress("kms.1"))
server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}),
server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}),
server.WithAddress("kms.2"))
server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: "unrelated_tag", Value: "true"}),
server.WithAddress("unrelated_tag.kms.1"))
// Shutdown workers will be removed from routes and sessions, but still returned
// to downstream workers
server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}),
server.TestKmsWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}),
server.WithAddress("shutdown.kms.3"), server.WithOperationalState(server.ShutdownOperationalState.String()))
// PKI workers are also expected, if they have the managed worker tag
serverRepo, err := serversRepoFn()
require.NoError(err)
var keyId string
server.TestPkiWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: dcommon.ManagedWorkerTag, Value: "true"}),
server.TestPkiWorker(t, conn, wrapper, server.WithWorkerTags(&server.Tag{Key: server.ManagedWorkerTag, Value: "true"}),
server.WithTestPkiWorkerAuthorizedKeyId(&keyId))
_, err = serverRepo.UpsertWorkerStatus(ctx, server.NewWorker(scope.Global.String(), server.WithAddress("pki.1")), server.WithKeyId(keyId))
require.NoError(err)

@ -6,5 +6,4 @@ package common
const (
ReverseGrpcConnectionAlpnValue = "the-downstream-dialer-plays-an-uno-reverse-card"
DataPlaneProxyAlpnValue = "i-herd-you-like-proxies-so-i-put-a-proxy-in-your-proxy"
ManagedWorkerTag = "boundary.cloud.hashicorp.com:managed"
)

@ -241,6 +241,7 @@ func (c *Controller) registerGrpcServices(s *grpc.Server) error {
srs, err := session_recordings.NewServiceFn(
c.baseContext,
c.IamRepoFn,
c.ServersRepoFn,
c.workerStatusGracePeriod,
c.kms,
c.conf.RawConfig.Controller.MaxPageSize,

@ -43,6 +43,7 @@ func init() {
// NewServiceFn returns a storage bucket service which is not implemented in OSS
var NewServiceFn = func(ctx context.Context,
iamRepoFn common.IamRepoFactory,
serverRepoFn common.ServersRepoFactory,
workerStatusGracePeriod *atomic.Int64,
kms *kms.Kms,
maxPageSize uint,

@ -19,7 +19,6 @@ import (
"github.com/hashicorp/boundary/internal/alias"
talias "github.com/hashicorp/boundary/internal/alias/target"
"github.com/hashicorp/boundary/internal/credential"
wl "github.com/hashicorp/boundary/internal/daemon/common"
"github.com/hashicorp/boundary/internal/daemon/controller/auth"
"github.com/hashicorp/boundary/internal/daemon/controller/common"
"github.com/hashicorp/boundary/internal/daemon/controller/handlers"
@ -749,12 +748,12 @@ func (s Service) RemoveTargetCredentialSources(ctx context.Context, req *pbs.Rem
func AuthorizeSessionWithWorkerFilter(
_ context.Context,
t target.Target,
selectedWorkers wl.WorkerList,
selectedWorkers server.WorkerList,
_ string,
_ intglobals.ControllerExtension,
_ common.Downstreamers,
_ ...target.Option,
) (wl.WorkerList, *server.Worker, error) {
) (server.WorkerList, *server.Worker, error) {
if len(selectedWorkers) > 0 {
var eval *bexpr.Evaluator
var err error
@ -1057,7 +1056,7 @@ func (s Service) AuthorizeSession(ctx context.Context, req *pbs.AuthorizeSession
if err != nil {
return nil, err
}
sess, err = sessionRepo.CreateSession(ctx, wrapper, sess, wl.WorkerList(selectedWorkers).Addresses())
sess, err = sessionRepo.CreateSession(ctx, wrapper, sess, server.WorkerList(selectedWorkers).Addresses())
if err != nil {
return nil, err
}
@ -1197,7 +1196,7 @@ func (s Service) AuthorizeSession(ctx context.Context, req *pbs.AuthorizeSession
PrivateKey: sess.CertificatePrivateKey,
HostId: hostId,
Endpoint: endpointUrl.String(),
WorkerInfo: wl.WorkerList(selectedWorkers).WorkerInfos(),
WorkerInfo: server.WorkerList(selectedWorkers).WorkerInfos(),
ConnectionLimit: t.GetSessionConnectionLimit(),
DefaultClientPort: t.GetDefaultClientPort(),
}

@ -9,7 +9,6 @@ import (
"fmt"
"testing"
"github.com/hashicorp/boundary/internal/daemon/common"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/kms"
"github.com/hashicorp/boundary/internal/server"
@ -29,7 +28,7 @@ func TestWorkerList_Addresses(t *testing.T) {
"test4",
}
var workerInfos []*pb.WorkerInfo
var tested common.WorkerList
var tested server.WorkerList
for _, a := range addresses {
workerInfos = append(workerInfos, &pb.WorkerInfo{Address: a})
tested = append(tested, server.NewWorker(scope.Global.String(),

@ -12,7 +12,6 @@ import (
"strings"
"github.com/hashicorp/boundary/globals"
wl "github.com/hashicorp/boundary/internal/daemon/common"
"github.com/hashicorp/boundary/internal/daemon/controller/auth"
"github.com/hashicorp/boundary/internal/daemon/controller/common"
"github.com/hashicorp/boundary/internal/daemon/controller/common/scopeids"
@ -331,7 +330,7 @@ func (s Service) DeleteWorker(ctx context.Context, req *pbs.DeleteWorkerRequest)
return nil, err
}
if wl.IsManagedWorker(w) {
if server.IsManagedWorker(w) {
return nil, handlers.InvalidArgumentErrorf("Error in provided request.", map[string]string{"id": "Managed workers cannot be deleted."})
}
@ -368,7 +367,7 @@ func (s Service) UpdateWorker(ctx context.Context, req *pbs.UpdateWorkerRequest)
// generated from the scope and name, it's functionally equivalent to
// checking the type, but works for both KMS-PKI and old-style KMS workers.
switch {
case wl.IsManagedWorker(w):
case server.IsManagedWorker(w):
return nil, handlers.InvalidArgumentErrorf(
"Error in provided request.",
map[string]string{"id": "Managed workers cannot be updated."},
@ -879,7 +878,7 @@ func (s Service) toProto(ctx context.Context, in *server.Worker, opt ...handlers
}
}
// Managed workers cannot be deleted
if wl.IsManagedWorker(in) {
if server.IsManagedWorker(in) {
allActions := out.AuthorizedActions
out.AuthorizedActions = make([]string, 0, len(allActions))
for _, act := range allActions {
@ -1069,7 +1068,7 @@ func validateStringForDb(str string) string {
return "must be non-empty."
case len(str) > 512:
return "must be within 512 characters."
case str == wl.ManagedWorkerTag:
case str == server.ManagedWorkerTag:
return "cannot be the managed worker tag."
default:
return ""

@ -13,7 +13,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/boundary/globals"
wl "github.com/hashicorp/boundary/internal/daemon/common"
"github.com/hashicorp/boundary/internal/daemon/controller/auth"
"github.com/hashicorp/boundary/internal/daemon/controller/common"
"github.com/hashicorp/boundary/internal/daemon/controller/handlers"
@ -205,7 +204,7 @@ func TestGet(t *testing.T) {
server.WithAddress("test managed pki worker address"),
server.WithLocalStorageState(server.AvailableLocalStorageState.String()),
server.WithWorkerTags(&server.Tag{
Key: wl.ManagedWorkerTag,
Key: server.ManagedWorkerTag,
Value: "true",
})),
server.WithUpdateTags(true),
@ -233,10 +232,10 @@ func TestGet(t *testing.T) {
LastStatusTime: managedPkiWorker.GetLastStatusTime().GetTimestamp(),
ReleaseVersion: managedPkiWorker.ReleaseVersion,
CanonicalTags: map[string]*structpb.ListValue{
wl.ManagedWorkerTag: structListValue(t, "true"),
server.ManagedWorkerTag: structListValue(t, "true"),
},
ConfigTags: map[string]*structpb.ListValue{
wl.ManagedWorkerTag: structListValue(t, "true"),
server.ManagedWorkerTag: structListValue(t, "true"),
},
Type: PkiWorkerType,
DirectlyConnectedDownstreamWorkers: connectedDownstreams,
@ -499,7 +498,7 @@ func TestDelete(t *testing.T) {
Value: "bar",
}))
wManaged := server.TestKmsWorker(t, conn, wrap, server.WithWorkerTags(&server.Tag{
Key: wl.ManagedWorkerTag,
Key: server.ManagedWorkerTag,
Value: "bar",
}))
@ -2059,7 +2058,7 @@ func TestService_AddWorkerTags(t *testing.T) {
return &pbs.AddWorkerTagsRequest{
Id: worker.PublicId,
Version: worker.Version,
ApiTags: map[string]*structpb.ListValue{wl.ManagedWorkerTag: {Values: []*structpb.Value{structpb.NewStringValue("value2")}}},
ApiTags: map[string]*structpb.ListValue{server.ManagedWorkerTag: {Values: []*structpb.Value{structpb.NewStringValue("value2")}}},
}
}(),
wantErrContains: "Tag keys cannot be the managed worker tag.",
@ -2222,7 +2221,7 @@ func TestService_SetWorkerTags(t *testing.T) {
return &pbs.SetWorkerTagsRequest{
Id: worker.PublicId,
Version: worker.Version,
ApiTags: map[string]*structpb.ListValue{wl.ManagedWorkerTag: {Values: []*structpb.Value{structpb.NewStringValue("value2")}}},
ApiTags: map[string]*structpb.ListValue{server.ManagedWorkerTag: {Values: []*structpb.Value{structpb.NewStringValue("value2")}}},
}
}(),
wantErrContains: "Tag keys cannot be the managed worker tag.",

@ -10,7 +10,6 @@ import (
"time"
"github.com/hashicorp/boundary/internal/daemon/cluster"
"github.com/hashicorp/boundary/internal/daemon/common"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/event"
"github.com/hashicorp/boundary/internal/server"
@ -204,7 +203,7 @@ func (c *Controller) startWorkerConnectionMaintenanceTicking(cancelCtx context.C
event.WriteError(cancelCtx, op, err, event.WithInfoMsg("couldn't get known workers from repo"))
break
}
connectionState.DisconnectMissingWorkers(common.WorkerList(knownWorker).PublicIds())
connectionState.DisconnectMissingWorkers(server.WorkerList(knownWorker).PublicIds())
}
if len(connectionState.UnmappedKeyIds()) > 0 {

@ -1,23 +1,25 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package common
package server
import (
stderrors "errors"
"fmt"
"github.com/hashicorp/boundary/internal/daemon/controller/handlers"
"github.com/hashicorp/boundary/internal/server"
pb "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/targets"
"github.com/hashicorp/boundary/sdk/pbs/plugin"
"github.com/hashicorp/boundary/version"
"github.com/hashicorp/go-bexpr"
"github.com/mitchellh/pointerstructure"
"google.golang.org/grpc/codes"
)
const ManagedWorkerTag = "boundary.cloud.hashicorp.com:managed"
// WorkerList is a helper type to make the selection of workers clearer and more declarative.
type WorkerList []*server.Worker
type WorkerList []*Worker
// addresses converts the slice of workers to a slice of their addresses
func (w WorkerList) Addresses() []string {
@ -50,7 +52,7 @@ func (w WorkerList) WorkerInfos() []*pb.WorkerInfo {
// SupportsFeature returns a new WorkerList composed of all workers in this
// WorkerList which supports the provided feature.
func (w WorkerList) SupportsFeature(f version.Feature) WorkerList {
var ret []*server.Worker
var ret []*Worker
for _, worker := range w {
sv := version.FromVersionString(worker.GetReleaseVersion()).Semver()
if version.SupportsFeature(sv, f) {
@ -63,7 +65,7 @@ func (w WorkerList) SupportsFeature(f version.Feature) WorkerList {
// filtered returns a new workerList where all elements contained in it are the
// ones which from the original workerList that pass the evaluator's evaluation.
func (w WorkerList) Filtered(eval *bexpr.Evaluator) (WorkerList, error) {
var ret []*server.Worker
var ret []*Worker
for _, worker := range w {
filterInput := map[string]any{
"name": worker.GetName(),
@ -86,8 +88,8 @@ func (w WorkerList) Filtered(eval *bexpr.Evaluator) (WorkerList, error) {
// unmanaged workers, respectively
func SeparateManagedWorkers(workers WorkerList) (managedWorkers, nonManagedWorkers WorkerList) {
// Build a set of managed and unmanaged workers
managedWorkers = make([]*server.Worker, 0, len(workers))
nonManagedWorkers = make([]*server.Worker, 0, len(workers))
managedWorkers = make([]*Worker, 0, len(workers))
nonManagedWorkers = make([]*Worker, 0, len(workers))
for _, worker := range workers {
if IsManagedWorker(worker) {
managedWorkers = append(managedWorkers, worker)
@ -99,7 +101,7 @@ func SeparateManagedWorkers(workers WorkerList) (managedWorkers, nonManagedWorke
}
// IsManagedWorker indicates whether the given worker is managed
func IsManagedWorker(worker *server.Worker) bool {
func IsManagedWorker(worker *Worker) bool {
return len(worker.CanonicalTags()[ManagedWorkerTag]) != 0
}
@ -110,16 +112,16 @@ func IsManagedWorker(worker *server.Worker) bool {
// will return workers with Unknown local storage state.
// Workers that do not support local storage state will be considered healthy.
func FilterWorkersByLocalStorageState(workers WorkerList) (healthyWorkers WorkerList) {
availableWorkers := make([]*server.Worker, 0, len(workers))
unknownWorkers := make([]*server.Worker, 0, len(workers))
availableWorkers := make([]*Worker, 0, len(workers))
unknownWorkers := make([]*Worker, 0, len(workers))
for _, worker := range workers {
sv := version.FromVersionString(worker.GetReleaseVersion()).Semver()
if version.SupportsFeature(sv, version.LocalStorageState) {
ls := worker.GetLocalStorageState()
if ls == server.AvailableLocalStorageState.String() {
if ls == AvailableLocalStorageState.String() {
availableWorkers = append(availableWorkers, worker)
} else if ls == server.UnknownLocalStorageState.String() {
} else if ls == UnknownLocalStorageState.String() {
unknownWorkers = append(unknownWorkers, worker)
}
} else {
@ -132,3 +134,35 @@ func FilterWorkersByLocalStorageState(workers WorkerList) (healthyWorkers Worker
}
return unknownWorkers
}
// FilterStorageBucketCredentialStateFn is a function definition that is used to filter
// out workers that are considered to be in a unhealthy state. The function should return
// true for healthy workers.
type FilterStorageBucketCredentialStateFn func(*plugin.StorageBucketCredentialState) bool
// FilterStorageBucketCredentialByWriteAccess will return true if the write state is missing or
// if the state is in an OK or UNKNOWN state.
func FilterStorageBucketCredentialByWriteAccess(sbcState *plugin.StorageBucketCredentialState) bool {
if sbcState == nil || sbcState.State == nil || sbcState.State.Write == nil {
return true
}
return sbcState.State.Write.State != plugin.StateType_STATE_TYPE_ERROR
}
// FilterStorageBucketCredentialByReadAccess will return true if the read state is missing or
// if the state is in an OK or UNKNOWN state.
func FilterStorageBucketCredentialByReadAccess(sbcState *plugin.StorageBucketCredentialState) bool {
if sbcState == nil || sbcState.State == nil || sbcState.State.Read == nil {
return true
}
return sbcState.State.Read.State != plugin.StateType_STATE_TYPE_ERROR
}
// FilterStorageBucketCredentialByDeleteAccess will return true if the delete state is missing or
// if the state is in an OK or UNKNOWN state.
func FilterStorageBucketCredentialByDeleteAccess(sbcState *plugin.StorageBucketCredentialState) bool {
if sbcState == nil || sbcState.State == nil || sbcState.State.Delete == nil {
return true
}
return sbcState.State.Delete.State != plugin.StateType_STATE_TYPE_ERROR
}
Loading…
Cancel
Save