diff --git a/internal/daemon/cluster/downstreammanager.go b/internal/daemon/cluster/downstreammanager.go new file mode 100644 index 0000000000..3c63970d07 --- /dev/null +++ b/internal/daemon/cluster/downstreammanager.go @@ -0,0 +1,67 @@ +package cluster + +import ( + "net" + "sync" +) + +// DownstreamManager associates downstream worker key identifiers with the +// connections to a specific server. +// It is safe to access DownstreamManager concurrently. +type DownstreamManager struct { + workerConnections map[string][]net.Conn + l sync.RWMutex +} + +func NewDownstreamManager() *DownstreamManager { + return &DownstreamManager{ + workerConnections: make(map[string][]net.Conn), + } +} + +// addConnection adds a connection associated with the provided downstream +// worker key identifier. +func (m *DownstreamManager) addConnection(id string, c net.Conn) { + m.l.Lock() + defer m.l.Unlock() + m.workerConnections[id] = append(m.workerConnections[id], c) +} + +// Disconnect closes all connections associated with the provided worker key +// identifier. +func (m *DownstreamManager) Disconnect(id string) { + m.l.Lock() + defer m.l.Unlock() + for _, c := range m.workerConnections[id] { + c.Close() + } + delete(m.workerConnections, id) +} + +// Connected returns a slice of worker key identifiers for all workers that are +// currently being tracked by this downstream manager. +func (m *DownstreamManager) Connected() []string { + m.l.RLock() + defer m.l.RUnlock() + var r []string + for k, v := range m.workerConnections { + if len(v) > 0 { + r = append(r, k) + } + } + return r +} + +// DisconnectUnauthorized calls disconnects for all ids which are present in +// the connected but not in the authorized slice of key ids. +func DisconnectUnauthorized(dm *DownstreamManager, connected, authorized []string) { + am := make(map[string]struct{}, len(authorized)) + for _, i := range authorized { + am[i] = struct{}{} + } + for _, i := range connected { + if _, found := am[i]; !found { + dm.Disconnect(i) + } + } +} diff --git a/internal/daemon/cluster/downstreammanager_test.go b/internal/daemon/cluster/downstreammanager_test.go new file mode 100644 index 0000000000..8ed767e54b --- /dev/null +++ b/internal/daemon/cluster/downstreammanager_test.go @@ -0,0 +1,97 @@ +package cluster + +import ( + "io" + "net" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewDownstreamManager(t *testing.T) { + dm := NewDownstreamManager() + assert.Empty(t, dm.Connected()) + + t.Run("single connection", func(t *testing.T) { + w1, g1 := net.Pipe() + dm.addConnection("1", w1) + assert.Equal(t, []string{"1"}, dm.Connected()) + + dm.Disconnect("1") + assert.ErrorIs(t, w1.SetReadDeadline(time.Now()), io.ErrClosedPipe) + assert.ErrorIs(t, g1.SetReadDeadline(time.Now()), io.ErrClosedPipe) + + assert.Empty(t, dm.Connected()) + }) + + t.Run("multiple connection with single name", func(t *testing.T) { + w1, g1 := net.Pipe() + dm.addConnection("1", w1) + assert.Equal(t, []string{"1"}, dm.Connected()) + w2, g2 := net.Pipe() + dm.addConnection("1", w2) + assert.Equal(t, []string{"1"}, dm.Connected()) + + assert.NoError(t, w1.SetReadDeadline(time.Now())) + assert.NoError(t, g1.SetReadDeadline(time.Now())) + assert.NoError(t, w2.SetReadDeadline(time.Now())) + assert.NoError(t, g2.SetReadDeadline(time.Now())) + + dm.Disconnect("1") + assert.ErrorIs(t, w1.SetReadDeadline(time.Now()), io.ErrClosedPipe) + assert.ErrorIs(t, g1.SetReadDeadline(time.Now()), io.ErrClosedPipe) + assert.ErrorIs(t, w2.SetReadDeadline(time.Now()), io.ErrClosedPipe) + assert.ErrorIs(t, g2.SetReadDeadline(time.Now()), io.ErrClosedPipe) + + assert.Empty(t, dm.Connected()) + }) + + t.Run("multiple connection with different names", func(t *testing.T) { + w1, g1 := net.Pipe() + dm.addConnection("1", w1) + assert.Equal(t, []string{"1"}, dm.Connected()) + w2, g2 := net.Pipe() + dm.addConnection("2", w2) + got := dm.Connected() + sort.Strings(got) + assert.Equal(t, []string{"1", "2"}, got) + + dm.Disconnect("1") + assert.ErrorIs(t, w1.SetReadDeadline(time.Now()), io.ErrClosedPipe) + assert.ErrorIs(t, g1.SetReadDeadline(time.Now()), io.ErrClosedPipe) + + assert.NoError(t, w2.SetReadDeadline(time.Now())) + assert.NoError(t, g2.SetReadDeadline(time.Now())) + + assert.Equal(t, []string{"2"}, dm.Connected()) + + dm.Disconnect("2") + assert.ErrorIs(t, w2.SetReadDeadline(time.Now()), io.ErrClosedPipe) + assert.ErrorIs(t, g2.SetReadDeadline(time.Now()), io.ErrClosedPipe) + }) +} + +func TestDisconnectUnauthorized(t *testing.T) { + dm := NewDownstreamManager() + w1, _ := net.Pipe() + dm.addConnection("w1", w1) + w2a, _ := net.Pipe() + dm.addConnection("w2", w2a) + w2b, _ := net.Pipe() + dm.addConnection("w2", w2b) + w3, _ := net.Pipe() + dm.addConnection("w3", w3) + + assert.NoError(t, w1.SetReadDeadline(time.Now())) + assert.NoError(t, w2a.SetReadDeadline(time.Now())) + assert.NoError(t, w2b.SetReadDeadline(time.Now())) + assert.NoError(t, w3.SetReadDeadline(time.Now())) + + DisconnectUnauthorized(dm, dm.Connected(), []string{"w3"}) + assert.ErrorIs(t, w1.SetReadDeadline(time.Now()), io.ErrClosedPipe) + assert.ErrorIs(t, w2a.SetReadDeadline(time.Now()), io.ErrClosedPipe) + assert.ErrorIs(t, w2b.SetReadDeadline(time.Now()), io.ErrClosedPipe) + assert.NoError(t, w3.SetReadDeadline(time.Now())) +} diff --git a/internal/daemon/cluster/handlers/worker_service.go b/internal/daemon/cluster/handlers/worker_service.go index ec43ab51a9..6aceba5cbc 100644 --- a/internal/daemon/cluster/handlers/worker_service.go +++ b/internal/daemon/cluster/handlers/worker_service.go @@ -29,6 +29,7 @@ type workerServiceServer struct { pbs.UnsafeSessionServiceServer serversRepoFn common.ServersRepoFactory + workerAuthRepoFn common.WorkerAuthRepoStorageFactory sessionRepoFn session.RepositoryFactory connectionRepoFn common.ConnectionRepoFactory updateTimes *sync.Map @@ -42,6 +43,7 @@ var ( func NewWorkerServiceServer( serversRepoFn common.ServersRepoFactory, + workerAuthRepoFn common.WorkerAuthRepoStorageFactory, sessionRepoFn session.RepositoryFactory, connectionRepoFn common.ConnectionRepoFactory, updateTimes *sync.Map, @@ -49,6 +51,7 @@ func NewWorkerServiceServer( ) *workerServiceServer { return &workerServiceServer{ serversRepoFn: serversRepoFn, + workerAuthRepoFn: workerAuthRepoFn, sessionRepoFn: sessionRepoFn, connectionRepoFn: connectionRepoFn, updateTimes: updateTimes, @@ -135,9 +138,22 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques } responseControllers = append(responseControllers, thisController) } + + workerAuthRepo, err := ws.workerAuthRepoFn() + if err != nil { + event.WriteError(ctx, op, err, event.WithInfoMsg("error getting worker auth repo")) + return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error acquiring repo to lookup worker auth info: %v", err) + } + authorizedWorkers, err := workerAuthRepo.FilterToAuthorizedWorkerKeyIds(ctx, req.GetConnectedWorkerKeyIdentifiers()) + if err != nil { + event.WriteError(ctx, op, err, event.WithInfoMsg("error getting authorizable worker key ids")) + return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error getting authorized worker key ids: %v", err) + } + ret := &pbs.StatusResponse{ CalculatedUpstreams: responseControllers, WorkerId: wrk.GetPublicId(), + AuthorizedWorkers: &pbs.AuthorizedWorkerList{WorkerKeyIdentifiers: authorizedWorkers}, } stateReport := make([]*session.StateReport, 0, len(req.GetJobs())) diff --git a/internal/daemon/cluster/handlers/worker_service_status_test.go b/internal/daemon/cluster/handlers/worker_service_status_test.go index 72c97172cd..249c7bccc9 100644 --- a/internal/daemon/cluster/handlers/worker_service_status_test.go +++ b/internal/daemon/cluster/handlers/worker_service_status_test.go @@ -2,6 +2,8 @@ package handlers_test import ( "context" + "crypto/rand" + "sort" "sync" "testing" @@ -20,6 +22,7 @@ import ( "github.com/hashicorp/boundary/internal/session" "github.com/hashicorp/boundary/internal/target" "github.com/hashicorp/boundary/internal/target/tcp" + "github.com/hashicorp/boundary/internal/types/scope" "github.com/hashicorp/nodeenrollment" "github.com/hashicorp/nodeenrollment/registration" "github.com/hashicorp/nodeenrollment/rotation" @@ -47,6 +50,9 @@ func TestStatus(t *testing.T) { serversRepoFn := func() (*server.Repository, error) { return serverRepo, nil } + workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) { + return server.NewRepositoryStorage(ctx, rw, rw, kms) + } sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) { return session.NewRepository(ctx, rw, rw, kms, opts...) } @@ -89,7 +95,7 @@ func TestStatus(t *testing.T) { require.NoError(t, err) require.NoError(t, err) - s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) + s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) require.NotNil(t, s) connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId) @@ -119,7 +125,8 @@ func TestStatus(t *testing.T) { Address: "127.0.0.1", }, }, - WorkerId: worker1.PublicId, + WorkerId: worker1.PublicId, + AuthorizedWorkers: &pbs.AuthorizedWorkerList{}, }, }, { @@ -158,7 +165,8 @@ func TestStatus(t *testing.T) { Address: "127.0.0.1", }, }, - WorkerId: worker1.PublicId, + WorkerId: worker1.PublicId, + AuthorizedWorkers: &pbs.AuthorizedWorkerList{}, }, }, { @@ -209,6 +217,7 @@ func TestStatus(t *testing.T) { pbs.Job_SessionInfo{}, pbs.SessionJobInfo{}, pbs.Connection{}, + pbs.AuthorizedWorkerList{}, ), cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"), ), @@ -233,6 +242,9 @@ func TestStatusSessionClosed(t *testing.T) { serversRepoFn := func() (*server.Repository, error) { return serverRepo, nil } + workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) { + return server.NewRepositoryStorage(ctx, rw, rw, kms) + } sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) { return session.NewRepository(ctx, rw, rw, kms, opts...) } @@ -287,7 +299,7 @@ func TestStatusSessionClosed(t *testing.T) { sess2, _, err = repo.ActivateSession(ctx, sess2.PublicId, sess2.Version, tofu2) require.NoError(t, err) - s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) + s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) require.NotNil(t, s) connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId) @@ -355,7 +367,8 @@ func TestStatusSessionClosed(t *testing.T) { RequestType: pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE, }, }, - WorkerId: worker1.PublicId, + WorkerId: worker1.PublicId, + AuthorizedWorkers: &pbs.AuthorizedWorkerList{}, }, }, } @@ -387,6 +400,7 @@ func TestStatusSessionClosed(t *testing.T) { pbs.Job_SessionInfo{}, pbs.SessionJobInfo{}, pbs.Connection{}, + pbs.AuthorizedWorkerList{}, ), cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"), ), @@ -414,6 +428,9 @@ func TestStatusDeadConnection(t *testing.T) { serversRepoFn := func() (*server.Repository, error) { return serverRepo, nil } + workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) { + return server.NewRepositoryStorage(ctx, rw, rw, kms) + } sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) { return session.NewRepository(ctx, rw, rw, kms, opts...) } @@ -466,7 +483,7 @@ func TestStatusDeadConnection(t *testing.T) { sess2, _, err = repo.ActivateSession(ctx, sess2.PublicId, sess2.Version, tofu2) require.NoError(t, err) - s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) + s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) require.NotNil(t, s) connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId) @@ -508,7 +525,8 @@ func TestStatusDeadConnection(t *testing.T) { Address: "127.0.0.1", }, }, - WorkerId: worker1.PublicId, + WorkerId: worker1.PublicId, + AuthorizedWorkers: &pbs.AuthorizedWorkerList{}, } got, err := s.Status(ctx, req) @@ -525,6 +543,7 @@ func TestStatusDeadConnection(t *testing.T) { pbs.Job_SessionInfo{}, pbs.SessionJobInfo{}, pbs.Connection{}, + pbs.AuthorizedWorkerList{}, ), cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"), ), @@ -554,6 +573,9 @@ func TestStatusWorkerWithKeyId(t *testing.T) { serversRepoFn := func() (*server.Repository, error) { return serverRepo, nil } + workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) { + return server.NewRepositoryStorage(ctx, rw, rw, kms) + } sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) { return session.NewRepository(ctx, rw, rw, kms, opts...) } @@ -619,7 +641,7 @@ func TestStatusWorkerWithKeyId(t *testing.T) { require.NoError(t, err) require.NoError(t, err) - s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) + s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) require.NotNil(t, s) connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId) @@ -648,7 +670,8 @@ func TestStatusWorkerWithKeyId(t *testing.T) { Address: "127.0.0.1", }, }, - WorkerId: worker1.PublicId, + WorkerId: worker1.PublicId, + AuthorizedWorkers: &pbs.AuthorizedWorkerList{}, }, }, { @@ -686,7 +709,8 @@ func TestStatusWorkerWithKeyId(t *testing.T) { Address: "127.0.0.1", }, }, - WorkerId: worker1.PublicId, + WorkerId: worker1.PublicId, + AuthorizedWorkers: &pbs.AuthorizedWorkerList{}, }, }, } @@ -716,6 +740,154 @@ func TestStatusWorkerWithKeyId(t *testing.T) { pbs.Job_SessionInfo{}, pbs.SessionJobInfo{}, pbs.Connection{}, + pbs.AuthorizedWorkerList{}, + ), + cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"), + ), + ) + }) + } +} + +func TestStatusAuthorizedWorkers(t *testing.T) { + ctx := context.Background() + conn, _ := db.TestSetup(t, "postgres") + rw := db.New(conn) + wrapper := db.TestWrapper(t) + kmsCache := kms.TestKms(t, conn, wrapper) + + err := kmsCache.CreateKeys(context.Background(), scope.Global.String(), kms.WithRandomReader(rand.Reader)) + require.NoError(t, err) + + serverRepo, _ := server.NewRepository(rw, rw, kmsCache) + serverRepo.UpsertController(ctx, &store.Controller{ + PrivateId: "test_controller1", + Address: "127.0.0.1", + }) + serversRepoFn := func() (*server.Repository, error) { + return serverRepo, nil + } + workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) { + return server.NewRepositoryStorage(ctx, rw, rw, kmsCache) + } + sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) { + return session.NewRepository(ctx, rw, rw, kmsCache, opts...) + } + connRepoFn := func() (*session.ConnectionRepository, error) { + return session.NewConnectionRepository(ctx, rw, rw, kmsCache) + } + + worker1 := server.TestKmsWorker(t, conn, wrapper) + var w1KeyId, w2KeyId string + _ = server.TestPkiWorker(t, conn, wrapper, server.WithTestPkiWorkerAuthorizedKeyId(&w1KeyId)) + _ = server.TestPkiWorker(t, conn, wrapper, server.WithTestPkiWorkerAuthorizedKeyId(&w2KeyId)) + + s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kmsCache) + require.NotNil(t, s) + + cases := []struct { + name string + wantErr bool + wantErrMsg string + req *pbs.StatusRequest + want *pbs.StatusResponse + }{ + { + name: "No downstreams", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + }, + ConnectedWorkerKeyIdentifiers: []string{}, + }, + want: &pbs.StatusResponse{ + CalculatedUpstreams: []*pbs.UpstreamServer{ + { + Type: pbs.UpstreamServer_TYPE_CONTROLLER, + Address: "127.0.0.1", + }, + }, + WorkerId: worker1.PublicId, + AuthorizedWorkers: &pbs.AuthorizedWorkerList{}, + }, + }, + { + name: "Unauthorized ConnectedWorkers", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + }, + }, + want: &pbs.StatusResponse{ + CalculatedUpstreams: []*pbs.UpstreamServer{ + { + Type: pbs.UpstreamServer_TYPE_CONTROLLER, + Address: "127.0.0.1", + }, + }, + WorkerId: worker1.PublicId, + AuthorizedWorkers: &pbs.AuthorizedWorkerList{}, + }, + }, + { + name: "Some authorized connected downstreams", + wantErr: false, + req: &pbs.StatusRequest{ + WorkerStatus: &pb.ServerWorkerStatus{ + PublicId: worker1.GetPublicId(), + Name: worker1.GetName(), + Address: worker1.GetAddress(), + }, + ConnectedWorkerKeyIdentifiers: []string{w1KeyId, w2KeyId, "unknown"}, + }, + want: &pbs.StatusResponse{ + CalculatedUpstreams: []*pbs.UpstreamServer{ + { + Type: pbs.UpstreamServer_TYPE_CONTROLLER, + Address: "127.0.0.1", + }, + }, + WorkerId: worker1.PublicId, + AuthorizedWorkers: &pbs.AuthorizedWorkerList{ + WorkerKeyIdentifiers: []string{w1KeyId, w2KeyId}, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + got, err := s.Status(ctx, tc.req) + if tc.wantErr { + require.Error(err) + assert.Equal(got, &pbs.StatusResponse{}) + assert.Equal(tc.wantErrMsg, err.Error()) + return + } + sort.Strings(got.GetAuthorizedWorkers().GetWorkerKeyIdentifiers()) + sort.Strings(tc.want.GetAuthorizedWorkers().GetWorkerKeyIdentifiers()) + assert.Empty( + cmp.Diff( + tc.want, + got, + cmpopts.IgnoreUnexported( + pbs.StatusResponse{}, + pb.ServerWorkerStatus{}, + pbs.UpstreamServer{}, + pbs.JobChangeRequest{}, + pbs.Job{}, + pbs.Job_SessionInfo{}, + pbs.SessionJobInfo{}, + pbs.Connection{}, + pbs.AuthorizedWorkerList{}, ), cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"), ), @@ -739,6 +911,9 @@ func TestWorkerOperationalStatus(t *testing.T) { serversRepoFn := func() (*server.Repository, error) { return serverRepo, nil } + workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) { + return server.NewRepositoryStorage(ctx, rw, rw, kms) + } sessionRepoFn := func(opt ...session.Option) (*session.Repository, error) { return session.NewRepository(ctx, rw, rw, kms) } @@ -748,7 +923,7 @@ func TestWorkerOperationalStatus(t *testing.T) { worker1 := server.TestKmsWorker(t, conn, wrapper) - s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) + s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms) require.NotNil(t, s) cases := []struct { diff --git a/internal/daemon/cluster/handlers/worker_service_test.go b/internal/daemon/cluster/handlers/worker_service_test.go index 44382f4cce..a02f99af99 100644 --- a/internal/daemon/cluster/handlers/worker_service_test.go +++ b/internal/daemon/cluster/handlers/worker_service_test.go @@ -39,6 +39,9 @@ func TestLookupSession(t *testing.T) { serversRepoFn := func() (*server.Repository, error) { return server.NewRepository(rw, rw, kms) } + workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) { + return server.NewRepositoryStorage(ctx, rw, rw, kms) + } sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) { return session.NewRepository(ctx, rw, rw, kms, opts...) } @@ -122,7 +125,7 @@ func TestLookupSession(t *testing.T) { err = repo.AddSessionCredentials(ctx, sessWithCreds.ProjectId, sessWithCreds.GetPublicId(), workerCreds) require.NoError(t, err) - s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connectionRepoFn, new(sync.Map), kms) + s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connectionRepoFn, new(sync.Map), kms) require.NotNil(t, s) cases := []struct { @@ -214,6 +217,9 @@ func TestHcpbWorkers(t *testing.T) { serversRepoFn := func() (*server.Repository, error) { return server.NewRepository(rw, rw, kmsCache) } + workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) { + return server.NewRepositoryStorage(ctx, rw, rw, kmsCache) + } sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) { return session.NewRepository(ctx, rw, rw, kmsCache, opts...) } @@ -231,7 +237,7 @@ func TestHcpbWorkers(t *testing.T) { server.TestPkiWorker(t, conn, wrapper, opt...) } - s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connectionRepoFn, new(sync.Map), kmsCache) + s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connectionRepoFn, new(sync.Map), kmsCache) require.NotNil(t, s) res, err := s.ListHcpbWorkers(ctx, &pbs.ListHcpbWorkersRequest{}) diff --git a/internal/daemon/cluster/tracking_listener.go b/internal/daemon/cluster/tracking_listener.go new file mode 100644 index 0000000000..2edb08ec74 --- /dev/null +++ b/internal/daemon/cluster/tracking_listener.go @@ -0,0 +1,81 @@ +package cluster + +import ( + "context" + "crypto/tls" + "net" + + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/observability/event" + nodee "github.com/hashicorp/nodeenrollment" +) + +// trackingListener tracks a nodee connection in a DownstreamManager when a +// worker has connected successfully. +type trackingListener struct { + ctx context.Context + ln net.Listener + dsm *DownstreamManager +} + +// NewTrackingListener returns a listener which adds all connections made +// through it to the provided DownstreamManager. The net.Conn returned by +// Accept is only tracked if it is a *tls.Conn and was created by the +// nodeenrollment library. +func NewTrackingListener(ctx context.Context, l net.Listener, m *DownstreamManager) (net.Listener, error) { + const op = "common.NewTrackingListener" + switch { + case m == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "nil DownstreamManager") + case l == nil: + return nil, errors.New(ctx, errors.InvalidParameter, op, "nil base listener") + } + return &trackingListener{ + ctx: ctx, + ln: l, + dsm: m, + }, nil +} + +// Accept satisfies the net.Listener interface. If the the wrapped listener +// must return a tls.Conn or an error is returned. If the tls.Conn has no +// PeerCertificates then no error is returned and the Conn is not added to the +// DownstreamManager. +func (e *trackingListener) Accept() (net.Conn, error) { + const op = "cluster.(trackingListener).Accept" + conn, err := e.ln.Accept() + if err != nil || conn == nil { + return conn, err + } + + tlsConn, ok := conn.(*tls.Conn) + if !ok { + return conn, err + } + + if tlsConn == nil || len(tlsConn.ConnectionState().PeerCertificates) == 0 { + return conn, nil + } + + keyId, err := nodee.KeyIdFromPkix(tlsConn.ConnectionState().PeerCertificates[0].SubjectKeyId) + if err != nil { + // Create an error so it gets written out to the event log + _ = errors.Wrap(e.ctx, err, op) + } + if keyId == "" { + // No key id means there is nothing to track. + return conn, nil + } + e.dsm.addConnection(keyId, conn) + event.WriteSysEvent(e.ctx, op, "tracking worker connection", "key_id", keyId) + + return conn, nil +} + +func (e *trackingListener) Close() error { + return e.ln.Close() +} + +func (e *trackingListener) Addr() net.Addr { + return e.ln.Addr() +} diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index 361a20aea9..8b342a6acc 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/boundary/internal/cmd/config" credstatic "github.com/hashicorp/boundary/internal/credential/static" "github.com/hashicorp/boundary/internal/credential/vault" + "github.com/hashicorp/boundary/internal/daemon/cluster" "github.com/hashicorp/boundary/internal/daemon/controller/common" "github.com/hashicorp/boundary/internal/daemon/controller/handlers/health" "github.com/hashicorp/boundary/internal/daemon/controller/internal/metric" @@ -132,6 +133,8 @@ type Controller struct { // Used to signal the Health Service to start // replying to queries with "503 Service Unavailable". HealthService *health.Service + + pkiConnManager *cluster.DownstreamManager } func New(ctx context.Context, conf *Config) (*Controller, error) { @@ -146,6 +149,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { workerStatusUpdateTimes: new(sync.Map), enabledPlugins: conf.Server.EnabledPlugins, apiListeners: make([]*base.ServerListener, 0), + pkiConnManager: cluster.NewDownstreamManager(), } if downstreamRouterFactory != nil { @@ -385,7 +389,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { func (c *Controller) Start() error { const op = "controller.(Controller).Start" - if c.started.Load() { + if c.started.Swap(true) { event.WriteSysEvent(context.TODO(), op, "already started, skipping") return nil } @@ -424,10 +428,9 @@ func (c *Controller) Start() error { defer c.tickerWg.Done() c.startCloseExpiredPendingTokens(c.baseContext) }() - go func() { - defer c.tickerWg.Done() - c.started.Store(true) - }() + if err := c.startWorkerConnectionMaintenanceTicking(c.baseContext, c.tickerWg, c.pkiConnManager); err != nil { + return errors.Wrap(c.baseContext, err, op) + } if c.downstreamRoutes != nil { c.tickerWg.Add(2) diff --git a/internal/daemon/controller/listeners.go b/internal/daemon/controller/listeners.go index 1895c7a631..32b6af46ce 100644 --- a/internal/daemon/controller/listeners.go +++ b/internal/daemon/controller/listeners.go @@ -12,6 +12,7 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/hashicorp/boundary/internal/cmd/base" + "github.com/hashicorp/boundary/internal/daemon/cluster" "github.com/hashicorp/boundary/internal/daemon/common" "github.com/hashicorp/boundary/internal/daemon/controller/internal/metric" "github.com/hashicorp/boundary/internal/errors" @@ -157,12 +158,20 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error if err != nil { return nil, fmt.Errorf("%s: error creating eventing listener: %w", op, err) } + + // This wraps the normal pki worker connections with a listener which adds + // the worker key id of the connections to the controller's pkiConnManager. + pkiWorkerTrackingListener, err := cluster.NewTrackingListener(c.baseContext, eventingAuthedListener, c.pkiConnManager) + if err != nil { + return nil, fmt.Errorf("%s: error creating pki worker tracking listener: %w", op, err) + } + // Create a multiplexer to unify connections between PKI and KMS multiplexingAuthedListener, err := nodeenet.NewMultiplexingListener(c.baseContext, nodeeAuthedListener.Addr()) if err != nil { return nil, fmt.Errorf("error instantiating authed multiplexed listener: %w", err) } - if err := multiplexingAuthedListener.IngressListener(eventingAuthedListener); err != nil { + if err := multiplexingAuthedListener.IngressListener(pkiWorkerTrackingListener); err != nil { return nil, fmt.Errorf("error adding authed evented listener to multiplexed listener: %w", err) } // Connections that came in on the cluster listener that are not authed via @@ -178,12 +187,20 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error if err != nil { return nil, fmt.Errorf("error instantiating reverse gprc connection split listener: %w", err) } + + // This wraps the reverse grpc pki worker connections with a listener which + // adds the worker key id of the connections to the pkiConnManager. + revPkiWorkerTrackingListener, err := cluster.NewTrackingListener(c.baseContext, reverseGrpcListener, c.pkiConnManager) + if err != nil { + return nil, fmt.Errorf("%s: error creating reverse grpc pki worker tracking listener: %w", op, err) + } + // Create a multiplexer to unify reverse grpc connections between PKI and KMS multiplexingReverseGrpcListener, err := nodeenet.NewMultiplexingListener(c.baseContext, reverseGrpcListener.Addr()) if err != nil { return nil, fmt.Errorf("error instantiating reverse grpc multiplexed listener: %w", err) } - if err := multiplexingReverseGrpcListener.IngressListener(reverseGrpcListener); err != nil { + if err := multiplexingReverseGrpcListener.IngressListener(revPkiWorkerTrackingListener); err != nil { return nil, fmt.Errorf("error adding reverse grpc listener to multiplexed listener: %w", err) } diff --git a/internal/daemon/controller/rpc_registration.go b/internal/daemon/controller/rpc_registration.go index 055f6c0a51..0910fdbd1f 100644 --- a/internal/daemon/controller/rpc_registration.go +++ b/internal/daemon/controller/rpc_registration.go @@ -33,8 +33,8 @@ func registerControllerServerCoordinationService(ctx context.Context, c *Control return fmt.Errorf("%s: server is nil", op) } - workerService := handlers.NewWorkerServiceServer(c.ServersRepoFn, c.SessionRepoFn, c.ConnectionRepoFn, - c.workerStatusUpdateTimes, c.kms) + workerService := handlers.NewWorkerServiceServer(c.ServersRepoFn, c.WorkerAuthRepoStorageFn, + c.SessionRepoFn, c.ConnectionRepoFn, c.workerStatusUpdateTimes, c.kms) pbs.RegisterServerCoordinationServiceServer(server, workerService) return nil } @@ -51,8 +51,8 @@ func registerControllerSessionService(ctx context.Context, c *Controller, server return fmt.Errorf("%s: server is nil", op) } - workerService := handlers.NewWorkerServiceServer(c.ServersRepoFn, c.SessionRepoFn, c.ConnectionRepoFn, - c.workerStatusUpdateTimes, c.kms) + workerService := handlers.NewWorkerServiceServer(c.ServersRepoFn, c.WorkerAuthRepoStorageFn, + c.SessionRepoFn, c.ConnectionRepoFn, c.workerStatusUpdateTimes, c.kms) pbs.RegisterSessionServiceServer(server, workerService) return nil } diff --git a/internal/daemon/controller/tickers.go b/internal/daemon/controller/tickers.go index 5ccc02b61f..af663b23c5 100644 --- a/internal/daemon/controller/tickers.go +++ b/internal/daemon/controller/tickers.go @@ -3,8 +3,10 @@ package controller import ( "context" "math/rand" + "sync" "time" + "github.com/hashicorp/boundary/internal/daemon/cluster" "github.com/hashicorp/boundary/internal/server/store" "github.com/hashicorp/boundary/internal/errors" @@ -13,8 +15,9 @@ import ( // In the future we could make this configurable const ( - statusInterval = 10 * time.Second - terminationInterval = 1 * time.Minute + workerConnectionMaintenanceInterval = 3 * time.Second + statusInterval = 10 * time.Second + terminationInterval = 1 * time.Minute ) // This is exported so it can be tweaked in tests @@ -155,3 +158,51 @@ func (c *Controller) startCloseExpiredPendingTokens(cancelCtx context.Context) { } } } + +func (c *Controller) startWorkerConnectionMaintenanceTicking(cancelCtx context.Context, wg *sync.WaitGroup, m *cluster.DownstreamManager) error { + const op = "controller.(Controller).startWorkerConnectionMaintenanceTicking" + switch { + case m == nil: + return errors.New(cancelCtx, errors.InvalidParameter, op, "DownstreamManager is nil") + case wg == nil: + return errors.New(cancelCtx, errors.InvalidParameter, op, "wait group is nil") + } + go func() { + defer wg.Done() + r := rand.New(rand.NewSource(time.Now().UnixNano())) + getRandomInterval := func() time.Duration { + // 0 to 0.5 adjustment to the base + f := r.Float64() / 2 + // Half a chance to be faster, not slower + if r.Float32() > 0.5 { + f = -1 * f + } + return workerConnectionMaintenanceInterval + time.Duration(f*float64(workerConnectionMaintenanceInterval)) + } + timer := time.NewTimer(0) + for { + select { + case <-cancelCtx.Done(): + event.WriteSysEvent(cancelCtx, op, "context done, shutting down") + return + + case <-timer.C: + repo, err := c.WorkerAuthRepoStorageFn() + if err != nil { + event.WriteError(cancelCtx, op, err, event.WithInfoMsg("error fetching repository for cluster connection maintenance")) + break + } + wKeyIds := m.Connected() + authorized, err := repo.FilterToAuthorizedWorkerKeyIds(cancelCtx, wKeyIds) + if err != nil { + event.WriteError(cancelCtx, op, err, event.WithInfoMsg("couldn't get authorized workers from repo")) + break + } + cluster.DisconnectUnauthorized(m, wKeyIds, authorized) + } + + timer.Reset(getRandomInterval()) + } + }() + return nil +} diff --git a/internal/daemon/worker/listeners.go b/internal/daemon/worker/listeners.go index 5db68d6ec5..e091f738ef 100644 --- a/internal/daemon/worker/listeners.go +++ b/internal/daemon/worker/listeners.go @@ -13,6 +13,7 @@ import ( "time" "github.com/hashicorp/boundary/internal/cmd/base" + "github.com/hashicorp/boundary/internal/daemon/cluster" "github.com/hashicorp/boundary/internal/daemon/common" "github.com/hashicorp/boundary/internal/daemon/worker/internal/metric" "github.com/hashicorp/boundary/internal/daemon/worker/session" @@ -172,6 +173,13 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger, return nil, fmt.Errorf("error instantiating non-worker split listener: %w", err) } + // This wraps the reverse grpc pki worker connections with a listener which + // adds the worker key id of the connections to the worker's pkiConnManager. + revPkiWorkerTrackingListener, err := cluster.NewTrackingListener(w.baseContext, reverseGrpcListener, w.pkiConnManager) + if err != nil { + return nil, fmt.Errorf("%s: error creating reverse grpc pki worker tracking listener: %w", op, err) + } + statsHandler, err := metric.InstrumentClusterStatsHandler(w.baseContext) if err != nil { return nil, errors.Wrap(w.baseContext, err, op) @@ -197,11 +205,18 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger, return nil, fmt.Errorf("%s: error creating eventing listener: %w", op, err) } + // This wraps the normal pki worker connections with a listener which adds + // the worker key id of the connections to the worker's pkiConnManager. + pkiWorkerTrackingListener, err := cluster.NewTrackingListener(cancelCtx, eventingListener, w.pkiConnManager) + if err != nil { + return nil, fmt.Errorf("%s: error creating pki worker tracking listener: %w", op, err) + } + return func() { go w.workerAuthSplitListener.Start() go httpServer.Serve(proxyListener) - go ln.GrpcServer.Serve(eventingListener) - go handleSecondaryConnection(cancelCtx, reverseGrpcListener, w.downstreamRoutes, -1) + go ln.GrpcServer.Serve(pkiWorkerTrackingListener) + go handleSecondaryConnection(cancelCtx, revPkiWorkerTrackingListener, w.downstreamRoutes, -1) }, nil } diff --git a/internal/daemon/worker/status.go b/internal/daemon/worker/status.go index b8eeb10327..97ed25470e 100644 --- a/internal/daemon/worker/status.go +++ b/internal/daemon/worker/status.go @@ -7,6 +7,7 @@ import ( "math/rand" "time" + "github.com/hashicorp/boundary/internal/daemon/cluster" "github.com/hashicorp/boundary/internal/daemon/worker/common" "github.com/hashicorp/boundary/internal/daemon/worker/session" pb "github.com/hashicorp/boundary/internal/gen/controller/servers" @@ -164,6 +165,7 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess event.WithInfoMsg("error making status request to controller")) } versionInfo := version.Get() + connectedWorkerKeyIds := w.pkiConnManager.Connected() result, err := client.Status(statusCtx, &pbs.StatusRequest{ Jobs: activeJobs, WorkerStatus: &pb.ServerWorkerStatus{ @@ -175,7 +177,8 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess ReleaseVersion: versionInfo.FullVersionNumber(false), OperationalState: w.operationalState.Load().(server.OperationalState).String(), }, - UpdateTags: w.updateTags.Load(), + ConnectedWorkerKeyIdentifiers: connectedWorkerKeyIds, + UpdateTags: w.updateTags.Load(), }) if err != nil { event.WriteError(cancelCtx, op, err, event.WithInfoMsg("error making status request to controller")) @@ -213,6 +216,10 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess } w.updateTags.Store(false) + + if authorized := result.GetAuthorizedWorkers(); authorized != nil { + cluster.DisconnectUnauthorized(w.pkiConnManager, connectedWorkerKeyIds, authorized.GetWorkerKeyIdentifiers()) + } var addrs []string // This may be empty if we are in a multiple hop scenario if len(result.CalculatedUpstreams) > 0 { diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index 9b000bcfad..effa043bcd 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/boundary/globals" "github.com/hashicorp/boundary/internal/cmd/base" "github.com/hashicorp/boundary/internal/cmd/config" + "github.com/hashicorp/boundary/internal/daemon/cluster" "github.com/hashicorp/boundary/internal/daemon/worker/internal/metric" "github.com/hashicorp/boundary/internal/daemon/worker/session" "github.com/hashicorp/boundary/internal/errors" @@ -133,6 +134,8 @@ type Worker struct { TestOverrideAuthRotationPeriod time.Duration statusLock sync.Mutex + + pkiConnManager *cluster.DownstreamManager } func New(conf *Config) (*Worker, error) { @@ -155,6 +158,7 @@ func New(conf *Config) (*Worker, error) { nonceFn: base62.Random, WorkerAuthCurrentKeyId: new(ua.String), operationalState: new(atomic.Value), + pkiConnManager: cluster.NewDownstreamManager(), } if downstreamRouterFactory != nil { diff --git a/internal/daemon/worker/workerdisconnect_test.go b/internal/daemon/worker/workerdisconnect_test.go new file mode 100644 index 0000000000..4199046776 --- /dev/null +++ b/internal/daemon/worker/workerdisconnect_test.go @@ -0,0 +1,90 @@ +package worker + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/boundary/internal/cmd/config" + "github.com/hashicorp/boundary/internal/daemon/controller" + "github.com/hashicorp/boundary/internal/server" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/connectivity" +) + +func TestDeleteConnectedWorkers(t *testing.T) { + ctx := context.Background() + logger := hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + }) + conf, err := config.DevController() + require.NoError(t, err) + c := controller.NewTestController(t, &controller.TestControllerOpts{ + Config: conf, + Logger: logger.Named("controller"), + }) + t.Cleanup(c.Shutdown) + _, directPkiWorker, multiHoppedPkiWorker := NewTestMultihopWorkers(t, logger, c.Context(), c.ClusterAddrs(), + c.Config().WorkerAuthKms, c.Controller().ServersRepoFn, nil, nil) + + serverRepo, err := c.Controller().ServersRepoFn() + require.NoError(t, err) + workers, err := serverRepo.ListWorkers(ctx, []string{"global"}, server.WithLiveness(-1)) + require.NoError(t, err) + var childWorker *server.Worker + var pkiWorker *server.Worker + for _, w := range workers { + if w.Type == "pki" && w.GetAddress() == multiHoppedPkiWorker.ProxyAddrs()[0] { + childWorker = w + } + if w.Type == "pki" && w.GetAddress() == directPkiWorker.ProxyAddrs()[0] { + pkiWorker = w + } + } + require.NotNil(t, childWorker) + require.NotNil(t, pkiWorker) + + cases := []struct { + name string + workerId string + testWorker *TestWorker + }{ + { + name: "multi hop worker", + workerId: childWorker.GetPublicId(), + testWorker: multiHoppedPkiWorker, + }, + { + name: "directly connected worker", + workerId: pkiWorker.GetPublicId(), + testWorker: directPkiWorker, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + prevState := tc.testWorker.Worker().GrpcClientConn.GetState() + require.NotEqual(t, connectivity.TransientFailure, prevState) + require.NotEqual(t, connectivity.Shutdown, prevState) + _, err = serverRepo.DeleteWorker(ctx, tc.workerId) + require.NoError(t, err) + stateChangeCtx, cancel := context.WithTimeout(ctx, 4*time.Second) + defer cancel() + for { + tc.testWorker.Worker().GrpcClientConn.ResetConnectBackoff() + if !tc.testWorker.Worker().GrpcClientConn.WaitForStateChange(stateChangeCtx, prevState) { + assert.Fail(t, "State didn't change before context timed out") + break + } + newState := tc.testWorker.Worker().GrpcClientConn.GetState() + t.Logf("Changed from previous state: %s to new state: %s", prevState, newState) + if newState == connectivity.Shutdown || newState == connectivity.TransientFailure { + break + } + prevState = newState + } + }) + } +} diff --git a/internal/gen/controller/servers/services/server_coordination_service.pb.go b/internal/gen/controller/servers/services/server_coordination_service.pb.go index 07735d1bb6..19d3b27b40 100644 --- a/internal/gen/controller/servers/services/server_coordination_service.pb.go +++ b/internal/gen/controller/servers/services/server_coordination_service.pb.go @@ -604,6 +604,9 @@ type StatusRequest struct { // is easier and going the other route doesn't provide much benefit -- if you // get access to the key and spoof the connection, you're already compromised. WorkerStatus *servers.ServerWorkerStatus `protobuf:"bytes,40,opt,name=worker_status,json=workerStatus,proto3" json:"worker_status,omitempty"` + // The worker key identifiers presented by all downstreams connected to this + // worker + ConnectedWorkerKeyIdentifiers []string `protobuf:"bytes,50,rep,name=connected_worker_key_identifiers,json=connectedWorkerKeyIdentifiers,proto3" json:"connected_worker_key_identifiers,omitempty"` } func (x *StatusRequest) Reset() { @@ -659,6 +662,13 @@ func (x *StatusRequest) GetWorkerStatus() *servers.ServerWorkerStatus { return nil } +func (x *StatusRequest) GetConnectedWorkerKeyIdentifiers() []string { + if x != nil { + return x.ConnectedWorkerKeyIdentifiers + } + return nil +} + type JobChangeRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -714,6 +724,53 @@ func (x *JobChangeRequest) GetRequestType() CHANGETYPE { return CHANGETYPE_CHANGETYPE_UNSPECIFIED } +type AuthorizedWorkerList struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + WorkerKeyIdentifiers []string `protobuf:"bytes,1,rep,name=worker_key_identifiers,json=workerKeyIdentifiers,proto3" json:"worker_key_identifiers,omitempty"` +} + +func (x *AuthorizedWorkerList) Reset() { + *x = AuthorizedWorkerList{} + if protoimpl.UnsafeEnabled { + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AuthorizedWorkerList) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AuthorizedWorkerList) ProtoMessage() {} + +func (x *AuthorizedWorkerList) ProtoReflect() protoreflect.Message { + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AuthorizedWorkerList.ProtoReflect.Descriptor instead. +func (*AuthorizedWorkerList) Descriptor() ([]byte, []int) { + return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{7} +} + +func (x *AuthorizedWorkerList) GetWorkerKeyIdentifiers() []string { + if x != nil { + return x.WorkerKeyIdentifiers + } + return nil +} + type StatusResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -730,12 +787,15 @@ type StatusResponse struct { // The ID of the worker which made the request. The worker can send this value in subsequent requests so the // controller does not need to do a database lookup for the id using the name field. WorkerId string `protobuf:"bytes,40,opt,name=worker_id,json=workerId,proto3" json:"worker_id,omitempty" class:"public"` // @gotags: `class:"public"` + // Of the worker key identifiers provided in the request, these are the ones + // which are authorized to remain connected. + AuthorizedWorkers *AuthorizedWorkerList `protobuf:"bytes,50,opt,name=authorized_workers,json=authorizedWorkers,proto3" json:"authorized_workers,omitempty"` } func (x *StatusResponse) Reset() { *x = StatusResponse{} if protoimpl.UnsafeEnabled { - mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7] + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -748,7 +808,7 @@ func (x *StatusResponse) String() string { func (*StatusResponse) ProtoMessage() {} func (x *StatusResponse) ProtoReflect() protoreflect.Message { - mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7] + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -761,7 +821,7 @@ func (x *StatusResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StatusResponse.ProtoReflect.Descriptor instead. func (*StatusResponse) Descriptor() ([]byte, []int) { - return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{7} + return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{8} } func (x *StatusResponse) GetJobsRequests() []*JobChangeRequest { @@ -785,6 +845,13 @@ func (x *StatusResponse) GetWorkerId() string { return "" } +func (x *StatusResponse) GetAuthorizedWorkers() *AuthorizedWorkerList { + if x != nil { + return x.AuthorizedWorkers + } + return nil +} + // WorkerInfo contains information about workers for the HcpbWorkerResponse message type WorkerInfo struct { state protoimpl.MessageState @@ -800,7 +867,7 @@ type WorkerInfo struct { func (x *WorkerInfo) Reset() { *x = WorkerInfo{} if protoimpl.UnsafeEnabled { - mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8] + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -813,7 +880,7 @@ func (x *WorkerInfo) String() string { func (*WorkerInfo) ProtoMessage() {} func (x *WorkerInfo) ProtoReflect() protoreflect.Message { - mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8] + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -826,7 +893,7 @@ func (x *WorkerInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use WorkerInfo.ProtoReflect.Descriptor instead. func (*WorkerInfo) Descriptor() ([]byte, []int) { - return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{8} + return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{9} } func (x *WorkerInfo) GetId() string { @@ -853,7 +920,7 @@ type ListHcpbWorkersRequest struct { func (x *ListHcpbWorkersRequest) Reset() { *x = ListHcpbWorkersRequest{} if protoimpl.UnsafeEnabled { - mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9] + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -866,7 +933,7 @@ func (x *ListHcpbWorkersRequest) String() string { func (*ListHcpbWorkersRequest) ProtoMessage() {} func (x *ListHcpbWorkersRequest) ProtoReflect() protoreflect.Message { - mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9] + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -879,7 +946,7 @@ func (x *ListHcpbWorkersRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ListHcpbWorkersRequest.ProtoReflect.Descriptor instead. func (*ListHcpbWorkersRequest) Descriptor() ([]byte, []int) { - return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{9} + return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{10} } // A response containing worker information @@ -894,7 +961,7 @@ type ListHcpbWorkersResponse struct { func (x *ListHcpbWorkersResponse) Reset() { *x = ListHcpbWorkersResponse{} if protoimpl.UnsafeEnabled { - mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10] + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -907,7 +974,7 @@ func (x *ListHcpbWorkersResponse) String() string { func (*ListHcpbWorkersResponse) ProtoMessage() {} func (x *ListHcpbWorkersResponse) ProtoReflect() protoreflect.Message { - mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10] + mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -920,7 +987,7 @@ func (x *ListHcpbWorkersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListHcpbWorkersResponse.ProtoReflect.Descriptor instead. func (*ListHcpbWorkersResponse) Descriptor() ([]byte, []int) { - return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{10} + return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{11} } func (x *ListHcpbWorkersResponse) GetWorkers() []*WorkerInfo { @@ -991,7 +1058,7 @@ var file_controller_servers_services_v1_server_coordination_service_proto_rawDes 0x0a, 0x10, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4e, 0x54, 0x52, 0x4f, 0x4c, 0x4c, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x54, 0x59, 0x50, - 0x45, 0x5f, 0x57, 0x4f, 0x52, 0x4b, 0x45, 0x52, 0x10, 0x02, 0x22, 0xcd, 0x01, 0x0a, 0x0d, 0x53, + 0x45, 0x5f, 0x57, 0x4f, 0x52, 0x4b, 0x45, 0x52, 0x10, 0x02, 0x22, 0x96, 0x02, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3d, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, @@ -1003,95 +1070,111 @@ var file_controller_servers_services_v1_server_coordination_service_proto_rawDes 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0c, - 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x4a, 0x04, 0x08, 0x0a, - 0x10, 0x0b, 0x52, 0x06, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x22, 0x98, 0x01, 0x0a, 0x10, 0x4a, - 0x6f, 0x62, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x35, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x63, - 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4a, 0x6f, - 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x4d, 0x0a, 0x0c, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2a, 0x2e, 0x63, - 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x48, - 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x52, 0x0b, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x54, 0x79, 0x70, 0x65, 0x22, 0xfa, 0x01, 0x0a, 0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x55, 0x0a, 0x0d, 0x6a, 0x6f, 0x62, 0x73, - 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x30, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, - 0x2e, 0x4a, 0x6f, 0x62, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x52, 0x0c, 0x6a, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x12, - 0x61, 0x0a, 0x14, 0x63, 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x75, 0x70, - 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x18, 0x1e, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2e, 0x2e, - 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x55, - 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x13, 0x63, - 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x64, 0x55, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, - 0x28, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x4a, - 0x04, 0x08, 0x0a, 0x10, 0x0b, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, - 0x72, 0x73, 0x22, 0x36, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, - 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, - 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x18, 0x0a, 0x16, 0x4c, 0x69, - 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x22, 0x5f, 0x0a, 0x17, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, - 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x44, 0x0a, 0x07, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x2a, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, - 0x31, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x07, 0x77, 0x6f, - 0x72, 0x6b, 0x65, 0x72, 0x73, 0x2a, 0x92, 0x01, 0x0a, 0x10, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, - 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x12, 0x20, 0x0a, 0x1c, 0x43, 0x4f, - 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, - 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, - 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, - 0x5f, 0x41, 0x55, 0x54, 0x48, 0x4f, 0x52, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x01, 0x12, 0x1e, 0x0a, - 0x1a, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, - 0x53, 0x5f, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, - 0x17, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, - 0x53, 0x5f, 0x43, 0x4c, 0x4f, 0x53, 0x45, 0x44, 0x10, 0x03, 0x2a, 0x9e, 0x01, 0x0a, 0x0d, 0x53, - 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x12, 0x1d, 0x0a, 0x19, - 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, - 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x53, - 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x50, 0x45, 0x4e, - 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, - 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x02, - 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, - 0x53, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x03, 0x12, 0x1c, 0x0a, - 0x18, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x54, - 0x45, 0x52, 0x4d, 0x49, 0x4e, 0x41, 0x54, 0x45, 0x44, 0x10, 0x04, 0x2a, 0x37, 0x0a, 0x07, 0x4a, - 0x4f, 0x42, 0x54, 0x59, 0x50, 0x45, 0x12, 0x17, 0x0a, 0x13, 0x4a, 0x4f, 0x42, 0x54, 0x59, 0x50, - 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, - 0x13, 0x0a, 0x0f, 0x4a, 0x4f, 0x42, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49, - 0x4f, 0x4e, 0x10, 0x01, 0x2a, 0x45, 0x0a, 0x0a, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, - 0x50, 0x45, 0x12, 0x1a, 0x0a, 0x16, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, - 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x1b, - 0x0a, 0x17, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x50, 0x44, - 0x41, 0x54, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x10, 0x01, 0x32, 0x8d, 0x02, 0x0a, 0x19, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x69, 0x0a, 0x06, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x2d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, - 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, + 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x47, 0x0a, 0x20, + 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, + 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x73, + 0x18, 0x32, 0x20, 0x03, 0x28, 0x09, 0x52, 0x1d, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, + 0x64, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, + 0x66, 0x69, 0x65, 0x72, 0x73, 0x4a, 0x04, 0x08, 0x0a, 0x10, 0x0b, 0x52, 0x06, 0x77, 0x6f, 0x72, + 0x6b, 0x65, 0x72, 0x22, 0x98, 0x01, 0x0a, 0x10, 0x4a, 0x6f, 0x62, 0x43, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x35, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, + 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, + 0x4d, 0x0a, 0x0c, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2a, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, + 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, + 0x45, 0x52, 0x0b, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, 0x22, 0x4c, + 0x0a, 0x14, 0x41, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x7a, 0x65, 0x64, 0x57, 0x6f, 0x72, 0x6b, + 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x16, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, + 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4b, 0x65, + 0x79, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x73, 0x22, 0xdf, 0x02, 0x0a, + 0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x55, 0x0a, 0x0d, 0x6a, 0x6f, 0x62, 0x73, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, + 0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, + 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4a, 0x6f, 0x62, 0x43, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x0c, 0x6a, 0x6f, 0x62, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x12, 0x61, 0x0a, 0x14, 0x63, 0x61, 0x6c, 0x63, 0x75, 0x6c, + 0x61, 0x74, 0x65, 0x64, 0x5f, 0x75, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x18, 0x1e, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2e, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, + 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x52, 0x13, 0x63, 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x64, + 0x55, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x77, 0x6f, 0x72, + 0x6b, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x28, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, + 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x12, 0x63, 0x0a, 0x12, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, + 0x69, 0x7a, 0x65, 0x64, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x32, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, - 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x84, 0x01, 0x0a, 0x0f, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, - 0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x36, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, - 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, + 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x7a, 0x65, 0x64, 0x57, 0x6f, + 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x11, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, + 0x69, 0x7a, 0x65, 0x64, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x4a, 0x04, 0x08, 0x0a, 0x10, + 0x0b, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x73, 0x22, 0x36, + 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0e, 0x0a, 0x02, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, + 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, + 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x18, 0x0a, 0x16, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x37, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, - 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x51, 0x5a, 0x4f, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, - 0x6f, 0x72, 0x70, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x2f, 0x69, 0x6e, 0x74, - 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, - 0x6c, 0x6c, 0x65, 0x72, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x73, 0x3b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x5f, 0x0a, 0x17, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b, + 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x44, 0x0a, 0x07, 0x77, + 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x63, + 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x57, 0x6f, + 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x07, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, + 0x73, 0x2a, 0x92, 0x01, 0x0a, 0x10, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, + 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x12, 0x20, 0x0a, 0x1c, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, + 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, + 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x43, 0x4f, 0x4e, 0x4e, + 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x41, 0x55, 0x54, + 0x48, 0x4f, 0x52, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x01, 0x12, 0x1e, 0x0a, 0x1a, 0x43, 0x4f, 0x4e, + 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x43, 0x4f, + 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e, + 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x43, 0x4c, + 0x4f, 0x53, 0x45, 0x44, 0x10, 0x03, 0x2a, 0x9e, 0x01, 0x0a, 0x0d, 0x53, 0x45, 0x53, 0x53, 0x49, + 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x12, 0x1d, 0x0a, 0x19, 0x53, 0x45, 0x53, 0x53, + 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, + 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x53, 0x45, 0x53, 0x53, 0x49, + 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, + 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, + 0x54, 0x55, 0x53, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17, + 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x43, 0x41, + 0x4e, 0x43, 0x45, 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x03, 0x12, 0x1c, 0x0a, 0x18, 0x53, 0x45, 0x53, + 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x54, 0x45, 0x52, 0x4d, 0x49, + 0x4e, 0x41, 0x54, 0x45, 0x44, 0x10, 0x04, 0x2a, 0x37, 0x0a, 0x07, 0x4a, 0x4f, 0x42, 0x54, 0x59, + 0x50, 0x45, 0x12, 0x17, 0x0a, 0x13, 0x4a, 0x4f, 0x42, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, + 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x4a, + 0x4f, 0x42, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x01, + 0x2a, 0x45, 0x0a, 0x0a, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x12, 0x1a, + 0x0a, 0x16, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, + 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x48, + 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, + 0x53, 0x54, 0x41, 0x54, 0x45, 0x10, 0x01, 0x32, 0x8d, 0x02, 0x0a, 0x19, 0x53, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x69, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, + 0x2d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, + 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, + 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x84, 0x01, 0x0a, 0x0f, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, + 0x6b, 0x65, 0x72, 0x73, 0x12, 0x36, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, + 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, + 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x37, 0x2e, 0x63, + 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, + 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x51, 0x5a, 0x4f, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, + 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, + 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x73, 0x3b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -1107,7 +1190,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_rawDe } var file_controller_servers_services_v1_server_coordination_service_proto_enumTypes = make([]protoimpl.EnumInfo, 5) -var file_controller_servers_services_v1_server_coordination_service_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_controller_servers_services_v1_server_coordination_service_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_controller_servers_services_v1_server_coordination_service_proto_goTypes = []interface{}{ (CONNECTIONSTATUS)(0), // 0: controller.servers.services.v1.CONNECTIONSTATUS (SESSIONSTATUS)(0), // 1: controller.servers.services.v1.SESSIONSTATUS @@ -1121,11 +1204,12 @@ var file_controller_servers_services_v1_server_coordination_service_proto_goType (*UpstreamServer)(nil), // 9: controller.servers.services.v1.UpstreamServer (*StatusRequest)(nil), // 10: controller.servers.services.v1.StatusRequest (*JobChangeRequest)(nil), // 11: controller.servers.services.v1.JobChangeRequest - (*StatusResponse)(nil), // 12: controller.servers.services.v1.StatusResponse - (*WorkerInfo)(nil), // 13: controller.servers.services.v1.WorkerInfo - (*ListHcpbWorkersRequest)(nil), // 14: controller.servers.services.v1.ListHcpbWorkersRequest - (*ListHcpbWorkersResponse)(nil), // 15: controller.servers.services.v1.ListHcpbWorkersResponse - (*servers.ServerWorkerStatus)(nil), // 16: controller.servers.v1.ServerWorkerStatus + (*AuthorizedWorkerList)(nil), // 12: controller.servers.services.v1.AuthorizedWorkerList + (*StatusResponse)(nil), // 13: controller.servers.services.v1.StatusResponse + (*WorkerInfo)(nil), // 14: controller.servers.services.v1.WorkerInfo + (*ListHcpbWorkersRequest)(nil), // 15: controller.servers.services.v1.ListHcpbWorkersRequest + (*ListHcpbWorkersResponse)(nil), // 16: controller.servers.services.v1.ListHcpbWorkersResponse + (*servers.ServerWorkerStatus)(nil), // 17: controller.servers.v1.ServerWorkerStatus } var file_controller_servers_services_v1_server_coordination_service_proto_depIdxs = []int32{ 0, // 0: controller.servers.services.v1.Connection.status:type_name -> controller.servers.services.v1.CONNECTIONSTATUS @@ -1136,21 +1220,22 @@ var file_controller_servers_services_v1_server_coordination_service_proto_depIdx 7, // 5: controller.servers.services.v1.JobStatus.job:type_name -> controller.servers.services.v1.Job 4, // 6: controller.servers.services.v1.UpstreamServer.type:type_name -> controller.servers.services.v1.UpstreamServer.TYPE 8, // 7: controller.servers.services.v1.StatusRequest.jobs:type_name -> controller.servers.services.v1.JobStatus - 16, // 8: controller.servers.services.v1.StatusRequest.worker_status:type_name -> controller.servers.v1.ServerWorkerStatus + 17, // 8: controller.servers.services.v1.StatusRequest.worker_status:type_name -> controller.servers.v1.ServerWorkerStatus 7, // 9: controller.servers.services.v1.JobChangeRequest.job:type_name -> controller.servers.services.v1.Job 3, // 10: controller.servers.services.v1.JobChangeRequest.request_type:type_name -> controller.servers.services.v1.CHANGETYPE 11, // 11: controller.servers.services.v1.StatusResponse.jobs_requests:type_name -> controller.servers.services.v1.JobChangeRequest 9, // 12: controller.servers.services.v1.StatusResponse.calculated_upstreams:type_name -> controller.servers.services.v1.UpstreamServer - 13, // 13: controller.servers.services.v1.ListHcpbWorkersResponse.workers:type_name -> controller.servers.services.v1.WorkerInfo - 10, // 14: controller.servers.services.v1.ServerCoordinationService.Status:input_type -> controller.servers.services.v1.StatusRequest - 14, // 15: controller.servers.services.v1.ServerCoordinationService.ListHcpbWorkers:input_type -> controller.servers.services.v1.ListHcpbWorkersRequest - 12, // 16: controller.servers.services.v1.ServerCoordinationService.Status:output_type -> controller.servers.services.v1.StatusResponse - 15, // 17: controller.servers.services.v1.ServerCoordinationService.ListHcpbWorkers:output_type -> controller.servers.services.v1.ListHcpbWorkersResponse - 16, // [16:18] is the sub-list for method output_type - 14, // [14:16] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name + 12, // 13: controller.servers.services.v1.StatusResponse.authorized_workers:type_name -> controller.servers.services.v1.AuthorizedWorkerList + 14, // 14: controller.servers.services.v1.ListHcpbWorkersResponse.workers:type_name -> controller.servers.services.v1.WorkerInfo + 10, // 15: controller.servers.services.v1.ServerCoordinationService.Status:input_type -> controller.servers.services.v1.StatusRequest + 15, // 16: controller.servers.services.v1.ServerCoordinationService.ListHcpbWorkers:input_type -> controller.servers.services.v1.ListHcpbWorkersRequest + 13, // 17: controller.servers.services.v1.ServerCoordinationService.Status:output_type -> controller.servers.services.v1.StatusResponse + 16, // 18: controller.servers.services.v1.ServerCoordinationService.ListHcpbWorkers:output_type -> controller.servers.services.v1.ListHcpbWorkersResponse + 17, // [17:19] is the sub-list for method output_type + 15, // [15:17] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name } func init() { file_controller_servers_services_v1_server_coordination_service_proto_init() } @@ -1244,7 +1329,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init( } } file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StatusResponse); i { + switch v := v.(*AuthorizedWorkerList); i { case 0: return &v.state case 1: @@ -1256,7 +1341,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init( } } file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*WorkerInfo); i { + switch v := v.(*StatusResponse); i { case 0: return &v.state case 1: @@ -1268,7 +1353,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init( } } file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListHcpbWorkersRequest); i { + switch v := v.(*WorkerInfo); i { case 0: return &v.state case 1: @@ -1280,6 +1365,18 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init( } } file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListHcpbWorkersRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ListHcpbWorkersResponse); i { case 0: return &v.state @@ -1301,7 +1398,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init( GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_controller_servers_services_v1_server_coordination_service_proto_rawDesc, NumEnums: 5, - NumMessages: 11, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/proto/controller/servers/services/v1/server_coordination_service.proto b/internal/proto/controller/servers/services/v1/server_coordination_service.proto index 456d9fce5a..f26a6152dc 100644 --- a/internal/proto/controller/servers/services/v1/server_coordination_service.proto +++ b/internal/proto/controller/servers/services/v1/server_coordination_service.proto @@ -95,6 +95,10 @@ message StatusRequest { // is easier and going the other route doesn't provide much benefit -- if you // get access to the key and spoof the connection, you're already compromised. servers.v1.ServerWorkerStatus worker_status = 40; + + // The worker key identifiers presented by all downstreams connected to this + // worker + repeated string connected_worker_key_identifiers = 50; } enum CHANGETYPE { @@ -109,6 +113,10 @@ message JobChangeRequest { CHANGETYPE request_type = 2; } +message AuthorizedWorkerList { + repeated string worker_key_identifiers = 1; +} + message StatusResponse { reserved 10; reserved "controllers"; @@ -126,6 +134,10 @@ message StatusResponse { // The ID of the worker which made the request. The worker can send this value in subsequent requests so the // controller does not need to do a database lookup for the id using the name field. string worker_id = 40; // @gotags: `class:"public"` + + // Of the worker key identifiers provided in the request, these are the ones + // which are authorized to remain connected. + AuthorizedWorkerList authorized_workers = 50; } // WorkerInfo contains information about workers for the HcpbWorkerResponse message diff --git a/internal/server/query.go b/internal/server/query.go index a9ae233d9e..5be4ab5a59 100644 --- a/internal/server/query.go +++ b/internal/server/query.go @@ -30,4 +30,12 @@ const ( ) select * from worker_auth_authorized where worker_id in (select * from key_id_to_worker_id) ` + + authorizedWorkerQuery = ` + select distinct w.worker_key_identifier + from + worker_auth_certificate_bundle as w + where + w.worker_key_identifier in (?) + ` ) diff --git a/internal/server/repository_workerauth.go b/internal/server/repository_workerauth.go index 0443a779ad..a93a5e3521 100644 --- a/internal/server/repository_workerauth.go +++ b/internal/server/repository_workerauth.go @@ -520,6 +520,33 @@ func (r *WorkerAuthRepositoryStorage) findCertBundles(ctx context.Context, worke return certBundle, nil } +// FilterToAuthorizedWorkerKeyIds returns all the worker key identifiers that +// are authorizable from the slice of key identifiers provided to the function. +func (r *WorkerAuthRepositoryStorage) FilterToAuthorizedWorkerKeyIds(ctx context.Context, workerKeyIds []string) ([]string, error) { + const op = "server.(WorkerAuthRepositoryStorage).FilterToAuthorizedWorkerKeyIds" + if len(workerKeyIds) == 0 { + return nil, nil + } + rows, err := r.reader.Query(ctx, authorizedWorkerQuery, []interface{}{workerKeyIds}) + if err != nil { + } + defer rows.Close() + + type rowsResult struct { + WorkerKeyIdentifier string + } + var ret []string + for rows.Next() { + var result rowsResult + err = r.reader.ScanRows(ctx, rows, &result) + if err != nil { + return nil, errors.Wrap(ctx, err, op) + } + ret = append(ret, result.WorkerKeyIdentifier) + } + return ret, nil +} + func (r *WorkerAuthRepositoryStorage) loadRootCertificates(ctx context.Context, cert *types.RootCertificates) error { const op = "server.(WorkerAuthRepositoryStorage).loadRootCertificates" if cert == nil { diff --git a/internal/server/repository_workerauth_test.go b/internal/server/repository_workerauth_test.go index e3f1f0bc4e..1028c1eabf 100644 --- a/internal/server/repository_workerauth_test.go +++ b/internal/server/repository_workerauth_test.go @@ -450,6 +450,49 @@ func TestStoreNodeInformationTx(t *testing.T) { } } +func TestFilterToAuthorizedWorkerKeyIds(t *testing.T) { + ctx := context.Background() + rootWrapper := db.TestWrapper(t) + conn, _ := db.TestSetup(t, "postgres") + kmsCache := kms.TestKms(t, conn, rootWrapper) + + // Ensures the global scope contains a valid root key + require.NoError(t, kmsCache.CreateKeys(context.Background(), scope.Global.String(), kms.WithRandomReader(rand.Reader))) + + rw := db.New(conn) + repo, err := NewRepositoryStorage(ctx, rw, rw, kmsCache) + require.NoError(t, err) + got, err := repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{}) + require.NoError(t, err) + assert.Empty(t, got) + + var keyId1 string + w1 := TestPkiWorker(t, conn, rootWrapper, WithTestPkiWorkerAuthorizedKeyId(&keyId1)) + var keyId2 string + _ = TestPkiWorker(t, conn, rootWrapper, WithTestPkiWorkerAuthorizedKeyId(&keyId2)) + + got, err = repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{"not-found-key-id", keyId1}) + assert.NoError(t, err) + assert.Equal(t, []string{keyId1}, got) + + got, err = repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{keyId2, "not-found-key-id"}) + assert.NoError(t, err) + assert.Equal(t, []string{keyId2}, got) + + got, err = repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{keyId1, keyId2, "unfound-key"}) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{keyId1, keyId2}, got) + + workerRepo, err := NewRepository(rw, rw, kmsCache) + require.NoError(t, err) + _, err = workerRepo.DeleteWorker(ctx, w1.GetPublicId()) + require.NoError(t, err) + + got, err = repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{keyId1, keyId2, "unfound-key"}) + assert.NoError(t, err) + assert.Equal(t, []string{keyId2}, got) +} + type mockTestWrapper struct { wrapping.Wrapper decryptError bool