update persisted storage bucket credential states from worker status

pull/4940/head
Damian Debkowski 2 years ago committed by Elim Tsiagbey
parent 4131c4b324
commit a20aae9879

@ -20,6 +20,7 @@ import (
"github.com/hashicorp/boundary/internal/session"
"github.com/hashicorp/boundary/internal/types/scope"
"github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/targets"
"github.com/hashicorp/boundary/sdk/pbs/plugin"
"github.com/hashicorp/go-bexpr"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -58,6 +59,10 @@ var (
// available in OSS and are a straight forward proxy with no additional
// fields needed.
getProtocolContext = noProtocolContext
// updateWorkerStorageBucketCredentialStatesFn will update the worker storage bucket
// credential state.
updateWorkerStorageBucketCredentialStatesFn = updateWorkerStorageBucketCredentialStates
)
// singleHopConnectionRoute returns a route consisting of the singlehop worker (the root worker id)
@ -159,6 +164,12 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques
event.WriteError(ctx, op, err, event.WithInfoMsg("error storing worker status"))
return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error storing worker status: %v", err)
}
// update storage states
if sbcStates := wStat.GetStorageBucketCredentialStates(); sbcStates != nil && wrk.GetPublicId() != "" {
updateWorkerStorageBucketCredentialStatesFn(ctx, serverRepo, wrk.GetPublicId(), sbcStates)
}
controllers, err := serverRepo.ListControllers(ctx, server.WithLiveness(time.Duration(ws.livenessTimeToStale.Load())))
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting current controllers"))
@ -746,3 +757,8 @@ func (ws *workerServiceServer) CloseConnection(ctx context.Context, req *pbs.Clo
return ret, nil
}
func updateWorkerStorageBucketCredentialStates(ctx context.Context, repo *server.Repository, workerId string, states map[string]*plugin.StorageBucketCredentialState) {
const op = "handlers.updateWorkerStorageBucketCredentialStates"
return
}

@ -6,6 +6,7 @@ package handlers
import (
"context"
"crypto/rand"
"fmt"
"sort"
"sync"
"sync/atomic"
@ -48,10 +49,11 @@ func TestStatus(t *testing.T) {
serverRepo, err := server.NewRepository(ctx, rw, rw, kms)
require.NoError(t, err)
_, err = serverRepo.UpsertController(ctx, &store.Controller{
c := &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
}
_, err = serverRepo.UpsertController(ctx, c)
require.NoError(t, err)
serversRepoFn := func() (*server.Repository, error) {
@ -114,7 +116,6 @@ func TestStatus(t *testing.T) {
tofu = session.TestTofu(t)
sess, _, err = repo.ActivateSession(ctx, sess.PublicId, sess.Version, tofu)
require.NoError(t, err)
require.NoError(t, err)
s := NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, nil, new(sync.Map), kms, new(atomic.Int64), fce)
require.NotNil(t, s)
@ -487,10 +488,11 @@ func TestStatusSessionClosed(t *testing.T) {
serverRepo, err := server.NewRepository(ctx, rw, rw, kms)
require.NoError(t, err)
_, err = serverRepo.UpsertController(ctx, &store.Controller{
c := &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
}
_, err = serverRepo.UpsertController(ctx, c)
require.NoError(t, err)
serversRepoFn := func() (*server.Repository, error) {
@ -646,6 +648,7 @@ func TestStatusSessionClosed(t *testing.T) {
assert.Equal(tc.wantErrMsg, err.Error())
return
}
fmt.Printf("want upstreams: %v, got upstreams: %v\n", tc.want.CalculatedUpstreams, got.CalculatedUpstreams)
assert.Empty(
cmp.Diff(
tc.want,
@ -677,11 +680,14 @@ func TestStatusDeadConnection(t *testing.T) {
kms := kms.TestKms(t, conn, wrapper)
org, prj := iam.TestScopes(t, iam.TestRepo(t, conn, wrapper))
serverRepo, _ := server.NewRepository(ctx, rw, rw, kms)
_, err := serverRepo.UpsertController(ctx, &store.Controller{
serverRepo, err := server.NewRepository(ctx, rw, rw, kms)
require.NoError(t, err)
c := &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
}
_, err = serverRepo.UpsertController(ctx, c)
require.NoError(t, err)
worker1 := server.TestKmsWorker(t, conn, wrapper)
@ -835,11 +841,14 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
serverRepo, err := server.NewRepository(ctx, rw, rw, kms)
require.NoError(t, err)
_, err = serverRepo.UpsertController(ctx, &store.Controller{
c := &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
}
_, err = serverRepo.UpsertController(ctx, c)
require.NoError(t, err)
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
@ -1036,11 +1045,16 @@ func TestStatusAuthorizedWorkers(t *testing.T) {
err := kmsCache.CreateKeys(context.Background(), scope.Global.String(), kms.WithRandomReader(rand.Reader))
require.NoError(t, err)
serverRepo, _ := server.NewRepository(ctx, rw, rw, kmsCache)
serverRepo.UpsertController(ctx, &store.Controller{
serverRepo, err := server.NewRepository(ctx, rw, rw, kmsCache)
require.NoError(t, err)
c := &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
}
_, err = serverRepo.UpsertController(ctx, c)
require.NoError(t, err)
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
@ -1253,11 +1267,16 @@ func TestWorkerOperationalStatus(t *testing.T) {
wrapper := db.TestWrapper(t)
kms := kms.TestKms(t, conn, wrapper)
serverRepo, _ := server.NewRepository(ctx, rw, rw, kms)
serverRepo.UpsertController(ctx, &store.Controller{
serverRepo, err := server.NewRepository(ctx, rw, rw, kms)
require.NoError(t, err)
c := &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
}
_, err = serverRepo.UpsertController(ctx, c)
require.NoError(t, err)
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
@ -1365,12 +1384,16 @@ func TestWorkerLocalStorageStateStatus(t *testing.T) {
wrapper := db.TestWrapper(t)
kms := kms.TestKms(t, conn, wrapper)
serverRepo, _ := server.NewRepository(ctx, rw, rw, kms)
_, err := serverRepo.UpsertController(ctx, &store.Controller{
serverRepo, err := server.NewRepository(ctx, rw, rw, kms)
require.NoError(t, err)
c := &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
}
_, err = serverRepo.UpsertController(ctx, c)
require.NoError(t, err)
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}

Loading…
Cancel
Save