Disconnect PKI workers from upstreams when they are no longer authorized (#2515)

pull/2596/head
Todd 4 years ago committed by GitHub
parent 80cdbc305d
commit 2a4ce02de6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,67 @@
package cluster
import (
"net"
"sync"
)
// DownstreamManager associates downstream worker key identifiers with the
// connections to a specific server.
// It is safe to access DownstreamManager concurrently.
type DownstreamManager struct {
workerConnections map[string][]net.Conn
l sync.RWMutex
}
func NewDownstreamManager() *DownstreamManager {
return &DownstreamManager{
workerConnections: make(map[string][]net.Conn),
}
}
// addConnection adds a connection associated with the provided downstream
// worker key identifier.
func (m *DownstreamManager) addConnection(id string, c net.Conn) {
m.l.Lock()
defer m.l.Unlock()
m.workerConnections[id] = append(m.workerConnections[id], c)
}
// Disconnect closes all connections associated with the provided worker key
// identifier.
func (m *DownstreamManager) Disconnect(id string) {
m.l.Lock()
defer m.l.Unlock()
for _, c := range m.workerConnections[id] {
c.Close()
}
delete(m.workerConnections, id)
}
// Connected returns a slice of worker key identifiers for all workers that are
// currently being tracked by this downstream manager.
func (m *DownstreamManager) Connected() []string {
m.l.RLock()
defer m.l.RUnlock()
var r []string
for k, v := range m.workerConnections {
if len(v) > 0 {
r = append(r, k)
}
}
return r
}
// DisconnectUnauthorized calls disconnects for all ids which are present in
// the connected but not in the authorized slice of key ids.
func DisconnectUnauthorized(dm *DownstreamManager, connected, authorized []string) {
am := make(map[string]struct{}, len(authorized))
for _, i := range authorized {
am[i] = struct{}{}
}
for _, i := range connected {
if _, found := am[i]; !found {
dm.Disconnect(i)
}
}
}

@ -0,0 +1,97 @@
package cluster
import (
"io"
"net"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestNewDownstreamManager(t *testing.T) {
dm := NewDownstreamManager()
assert.Empty(t, dm.Connected())
t.Run("single connection", func(t *testing.T) {
w1, g1 := net.Pipe()
dm.addConnection("1", w1)
assert.Equal(t, []string{"1"}, dm.Connected())
dm.Disconnect("1")
assert.ErrorIs(t, w1.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.ErrorIs(t, g1.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.Empty(t, dm.Connected())
})
t.Run("multiple connection with single name", func(t *testing.T) {
w1, g1 := net.Pipe()
dm.addConnection("1", w1)
assert.Equal(t, []string{"1"}, dm.Connected())
w2, g2 := net.Pipe()
dm.addConnection("1", w2)
assert.Equal(t, []string{"1"}, dm.Connected())
assert.NoError(t, w1.SetReadDeadline(time.Now()))
assert.NoError(t, g1.SetReadDeadline(time.Now()))
assert.NoError(t, w2.SetReadDeadline(time.Now()))
assert.NoError(t, g2.SetReadDeadline(time.Now()))
dm.Disconnect("1")
assert.ErrorIs(t, w1.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.ErrorIs(t, g1.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.ErrorIs(t, w2.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.ErrorIs(t, g2.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.Empty(t, dm.Connected())
})
t.Run("multiple connection with different names", func(t *testing.T) {
w1, g1 := net.Pipe()
dm.addConnection("1", w1)
assert.Equal(t, []string{"1"}, dm.Connected())
w2, g2 := net.Pipe()
dm.addConnection("2", w2)
got := dm.Connected()
sort.Strings(got)
assert.Equal(t, []string{"1", "2"}, got)
dm.Disconnect("1")
assert.ErrorIs(t, w1.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.ErrorIs(t, g1.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.NoError(t, w2.SetReadDeadline(time.Now()))
assert.NoError(t, g2.SetReadDeadline(time.Now()))
assert.Equal(t, []string{"2"}, dm.Connected())
dm.Disconnect("2")
assert.ErrorIs(t, w2.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.ErrorIs(t, g2.SetReadDeadline(time.Now()), io.ErrClosedPipe)
})
}
func TestDisconnectUnauthorized(t *testing.T) {
dm := NewDownstreamManager()
w1, _ := net.Pipe()
dm.addConnection("w1", w1)
w2a, _ := net.Pipe()
dm.addConnection("w2", w2a)
w2b, _ := net.Pipe()
dm.addConnection("w2", w2b)
w3, _ := net.Pipe()
dm.addConnection("w3", w3)
assert.NoError(t, w1.SetReadDeadline(time.Now()))
assert.NoError(t, w2a.SetReadDeadline(time.Now()))
assert.NoError(t, w2b.SetReadDeadline(time.Now()))
assert.NoError(t, w3.SetReadDeadline(time.Now()))
DisconnectUnauthorized(dm, dm.Connected(), []string{"w3"})
assert.ErrorIs(t, w1.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.ErrorIs(t, w2a.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.ErrorIs(t, w2b.SetReadDeadline(time.Now()), io.ErrClosedPipe)
assert.NoError(t, w3.SetReadDeadline(time.Now()))
}

@ -29,6 +29,7 @@ type workerServiceServer struct {
pbs.UnsafeSessionServiceServer
serversRepoFn common.ServersRepoFactory
workerAuthRepoFn common.WorkerAuthRepoStorageFactory
sessionRepoFn session.RepositoryFactory
connectionRepoFn common.ConnectionRepoFactory
updateTimes *sync.Map
@ -42,6 +43,7 @@ var (
func NewWorkerServiceServer(
serversRepoFn common.ServersRepoFactory,
workerAuthRepoFn common.WorkerAuthRepoStorageFactory,
sessionRepoFn session.RepositoryFactory,
connectionRepoFn common.ConnectionRepoFactory,
updateTimes *sync.Map,
@ -49,6 +51,7 @@ func NewWorkerServiceServer(
) *workerServiceServer {
return &workerServiceServer{
serversRepoFn: serversRepoFn,
workerAuthRepoFn: workerAuthRepoFn,
sessionRepoFn: sessionRepoFn,
connectionRepoFn: connectionRepoFn,
updateTimes: updateTimes,
@ -135,9 +138,22 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques
}
responseControllers = append(responseControllers, thisController)
}
workerAuthRepo, err := ws.workerAuthRepoFn()
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting worker auth repo"))
return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error acquiring repo to lookup worker auth info: %v", err)
}
authorizedWorkers, err := workerAuthRepo.FilterToAuthorizedWorkerKeyIds(ctx, req.GetConnectedWorkerKeyIdentifiers())
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting authorizable worker key ids"))
return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error getting authorized worker key ids: %v", err)
}
ret := &pbs.StatusResponse{
CalculatedUpstreams: responseControllers,
WorkerId: wrk.GetPublicId(),
AuthorizedWorkers: &pbs.AuthorizedWorkerList{WorkerKeyIdentifiers: authorizedWorkers},
}
stateReport := make([]*session.StateReport, 0, len(req.GetJobs()))

@ -2,6 +2,8 @@ package handlers_test
import (
"context"
"crypto/rand"
"sort"
"sync"
"testing"
@ -20,6 +22,7 @@ import (
"github.com/hashicorp/boundary/internal/session"
"github.com/hashicorp/boundary/internal/target"
"github.com/hashicorp/boundary/internal/target/tcp"
"github.com/hashicorp/boundary/internal/types/scope"
"github.com/hashicorp/nodeenrollment"
"github.com/hashicorp/nodeenrollment/registration"
"github.com/hashicorp/nodeenrollment/rotation"
@ -47,6 +50,9 @@ func TestStatus(t *testing.T) {
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) {
return server.NewRepositoryStorage(ctx, rw, rw, kms)
}
sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) {
return session.NewRepository(ctx, rw, rw, kms, opts...)
}
@ -89,7 +95,7 @@ func TestStatus(t *testing.T) {
require.NoError(t, err)
require.NoError(t, err)
s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
require.NotNil(t, s)
connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
@ -119,7 +125,8 @@ func TestStatus(t *testing.T) {
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
},
},
{
@ -158,7 +165,8 @@ func TestStatus(t *testing.T) {
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
},
},
{
@ -209,6 +217,7 @@ func TestStatus(t *testing.T) {
pbs.Job_SessionInfo{},
pbs.SessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
),
@ -233,6 +242,9 @@ func TestStatusSessionClosed(t *testing.T) {
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) {
return server.NewRepositoryStorage(ctx, rw, rw, kms)
}
sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) {
return session.NewRepository(ctx, rw, rw, kms, opts...)
}
@ -287,7 +299,7 @@ func TestStatusSessionClosed(t *testing.T) {
sess2, _, err = repo.ActivateSession(ctx, sess2.PublicId, sess2.Version, tofu2)
require.NoError(t, err)
s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
require.NotNil(t, s)
connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
@ -355,7 +367,8 @@ func TestStatusSessionClosed(t *testing.T) {
RequestType: pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE,
},
},
WorkerId: worker1.PublicId,
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
},
},
}
@ -387,6 +400,7 @@ func TestStatusSessionClosed(t *testing.T) {
pbs.Job_SessionInfo{},
pbs.SessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
),
@ -414,6 +428,9 @@ func TestStatusDeadConnection(t *testing.T) {
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) {
return server.NewRepositoryStorage(ctx, rw, rw, kms)
}
sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) {
return session.NewRepository(ctx, rw, rw, kms, opts...)
}
@ -466,7 +483,7 @@ func TestStatusDeadConnection(t *testing.T) {
sess2, _, err = repo.ActivateSession(ctx, sess2.PublicId, sess2.Version, tofu2)
require.NoError(t, err)
s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
require.NotNil(t, s)
connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
@ -508,7 +525,8 @@ func TestStatusDeadConnection(t *testing.T) {
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
}
got, err := s.Status(ctx, req)
@ -525,6 +543,7 @@ func TestStatusDeadConnection(t *testing.T) {
pbs.Job_SessionInfo{},
pbs.SessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
),
@ -554,6 +573,9 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) {
return server.NewRepositoryStorage(ctx, rw, rw, kms)
}
sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) {
return session.NewRepository(ctx, rw, rw, kms, opts...)
}
@ -619,7 +641,7 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
require.NoError(t, err)
require.NoError(t, err)
s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
require.NotNil(t, s)
connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
@ -648,7 +670,8 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
},
},
{
@ -686,7 +709,8 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
},
},
}
@ -716,6 +740,154 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
pbs.Job_SessionInfo{},
pbs.SessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
),
)
})
}
}
func TestStatusAuthorizedWorkers(t *testing.T) {
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
wrapper := db.TestWrapper(t)
kmsCache := kms.TestKms(t, conn, wrapper)
err := kmsCache.CreateKeys(context.Background(), scope.Global.String(), kms.WithRandomReader(rand.Reader))
require.NoError(t, err)
serverRepo, _ := server.NewRepository(rw, rw, kmsCache)
serverRepo.UpsertController(ctx, &store.Controller{
PrivateId: "test_controller1",
Address: "127.0.0.1",
})
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) {
return server.NewRepositoryStorage(ctx, rw, rw, kmsCache)
}
sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) {
return session.NewRepository(ctx, rw, rw, kmsCache, opts...)
}
connRepoFn := func() (*session.ConnectionRepository, error) {
return session.NewConnectionRepository(ctx, rw, rw, kmsCache)
}
worker1 := server.TestKmsWorker(t, conn, wrapper)
var w1KeyId, w2KeyId string
_ = server.TestPkiWorker(t, conn, wrapper, server.WithTestPkiWorkerAuthorizedKeyId(&w1KeyId))
_ = server.TestPkiWorker(t, conn, wrapper, server.WithTestPkiWorkerAuthorizedKeyId(&w2KeyId))
s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kmsCache)
require.NotNil(t, s)
cases := []struct {
name string
wantErr bool
wantErrMsg string
req *pbs.StatusRequest
want *pbs.StatusResponse
}{
{
name: "No downstreams",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{},
},
want: &pbs.StatusResponse{
CalculatedUpstreams: []*pbs.UpstreamServer{
{
Type: pbs.UpstreamServer_TYPE_CONTROLLER,
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
},
},
{
name: "Unauthorized ConnectedWorkers",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
},
want: &pbs.StatusResponse{
CalculatedUpstreams: []*pbs.UpstreamServer{
{
Type: pbs.UpstreamServer_TYPE_CONTROLLER,
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{},
},
},
{
name: "Some authorized connected downstreams",
wantErr: false,
req: &pbs.StatusRequest{
WorkerStatus: &pb.ServerWorkerStatus{
PublicId: worker1.GetPublicId(),
Name: worker1.GetName(),
Address: worker1.GetAddress(),
},
ConnectedWorkerKeyIdentifiers: []string{w1KeyId, w2KeyId, "unknown"},
},
want: &pbs.StatusResponse{
CalculatedUpstreams: []*pbs.UpstreamServer{
{
Type: pbs.UpstreamServer_TYPE_CONTROLLER,
Address: "127.0.0.1",
},
},
WorkerId: worker1.PublicId,
AuthorizedWorkers: &pbs.AuthorizedWorkerList{
WorkerKeyIdentifiers: []string{w1KeyId, w2KeyId},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
got, err := s.Status(ctx, tc.req)
if tc.wantErr {
require.Error(err)
assert.Equal(got, &pbs.StatusResponse{})
assert.Equal(tc.wantErrMsg, err.Error())
return
}
sort.Strings(got.GetAuthorizedWorkers().GetWorkerKeyIdentifiers())
sort.Strings(tc.want.GetAuthorizedWorkers().GetWorkerKeyIdentifiers())
assert.Empty(
cmp.Diff(
tc.want,
got,
cmpopts.IgnoreUnexported(
pbs.StatusResponse{},
pb.ServerWorkerStatus{},
pbs.UpstreamServer{},
pbs.JobChangeRequest{},
pbs.Job{},
pbs.Job_SessionInfo{},
pbs.SessionJobInfo{},
pbs.Connection{},
pbs.AuthorizedWorkerList{},
),
cmpopts.IgnoreFields(pb.ServerWorkerStatus{}, "Tags"),
),
@ -739,6 +911,9 @@ func TestWorkerOperationalStatus(t *testing.T) {
serversRepoFn := func() (*server.Repository, error) {
return serverRepo, nil
}
workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) {
return server.NewRepositoryStorage(ctx, rw, rw, kms)
}
sessionRepoFn := func(opt ...session.Option) (*session.Repository, error) {
return session.NewRepository(ctx, rw, rw, kms)
}
@ -748,7 +923,7 @@ func TestWorkerOperationalStatus(t *testing.T) {
worker1 := server.TestKmsWorker(t, conn, wrapper)
s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, new(sync.Map), kms)
require.NotNil(t, s)
cases := []struct {

@ -39,6 +39,9 @@ func TestLookupSession(t *testing.T) {
serversRepoFn := func() (*server.Repository, error) {
return server.NewRepository(rw, rw, kms)
}
workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) {
return server.NewRepositoryStorage(ctx, rw, rw, kms)
}
sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) {
return session.NewRepository(ctx, rw, rw, kms, opts...)
}
@ -122,7 +125,7 @@ func TestLookupSession(t *testing.T) {
err = repo.AddSessionCredentials(ctx, sessWithCreds.ProjectId, sessWithCreds.GetPublicId(), workerCreds)
require.NoError(t, err)
s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connectionRepoFn, new(sync.Map), kms)
s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connectionRepoFn, new(sync.Map), kms)
require.NotNil(t, s)
cases := []struct {
@ -214,6 +217,9 @@ func TestHcpbWorkers(t *testing.T) {
serversRepoFn := func() (*server.Repository, error) {
return server.NewRepository(rw, rw, kmsCache)
}
workerAuthRepoFn := func() (*server.WorkerAuthRepositoryStorage, error) {
return server.NewRepositoryStorage(ctx, rw, rw, kmsCache)
}
sessionRepoFn := func(opts ...session.Option) (*session.Repository, error) {
return session.NewRepository(ctx, rw, rw, kmsCache, opts...)
}
@ -231,7 +237,7 @@ func TestHcpbWorkers(t *testing.T) {
server.TestPkiWorker(t, conn, wrapper, opt...)
}
s := handlers.NewWorkerServiceServer(serversRepoFn, sessionRepoFn, connectionRepoFn, new(sync.Map), kmsCache)
s := handlers.NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connectionRepoFn, new(sync.Map), kmsCache)
require.NotNil(t, s)
res, err := s.ListHcpbWorkers(ctx, &pbs.ListHcpbWorkersRequest{})

@ -0,0 +1,81 @@
package cluster
import (
"context"
"crypto/tls"
"net"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/observability/event"
nodee "github.com/hashicorp/nodeenrollment"
)
// trackingListener tracks a nodee connection in a DownstreamManager when a
// worker has connected successfully.
type trackingListener struct {
ctx context.Context
ln net.Listener
dsm *DownstreamManager
}
// NewTrackingListener returns a listener which adds all connections made
// through it to the provided DownstreamManager. The net.Conn returned by
// Accept is only tracked if it is a *tls.Conn and was created by the
// nodeenrollment library.
func NewTrackingListener(ctx context.Context, l net.Listener, m *DownstreamManager) (net.Listener, error) {
const op = "common.NewTrackingListener"
switch {
case m == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "nil DownstreamManager")
case l == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "nil base listener")
}
return &trackingListener{
ctx: ctx,
ln: l,
dsm: m,
}, nil
}
// Accept satisfies the net.Listener interface. If the the wrapped listener
// must return a tls.Conn or an error is returned. If the tls.Conn has no
// PeerCertificates then no error is returned and the Conn is not added to the
// DownstreamManager.
func (e *trackingListener) Accept() (net.Conn, error) {
const op = "cluster.(trackingListener).Accept"
conn, err := e.ln.Accept()
if err != nil || conn == nil {
return conn, err
}
tlsConn, ok := conn.(*tls.Conn)
if !ok {
return conn, err
}
if tlsConn == nil || len(tlsConn.ConnectionState().PeerCertificates) == 0 {
return conn, nil
}
keyId, err := nodee.KeyIdFromPkix(tlsConn.ConnectionState().PeerCertificates[0].SubjectKeyId)
if err != nil {
// Create an error so it gets written out to the event log
_ = errors.Wrap(e.ctx, err, op)
}
if keyId == "" {
// No key id means there is nothing to track.
return conn, nil
}
e.dsm.addConnection(keyId, conn)
event.WriteSysEvent(e.ctx, op, "tracking worker connection", "key_id", keyId)
return conn, nil
}
func (e *trackingListener) Close() error {
return e.ln.Close()
}
func (e *trackingListener) Addr() net.Addr {
return e.ln.Addr()
}

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/boundary/internal/cmd/config"
credstatic "github.com/hashicorp/boundary/internal/credential/static"
"github.com/hashicorp/boundary/internal/credential/vault"
"github.com/hashicorp/boundary/internal/daemon/cluster"
"github.com/hashicorp/boundary/internal/daemon/controller/common"
"github.com/hashicorp/boundary/internal/daemon/controller/handlers/health"
"github.com/hashicorp/boundary/internal/daemon/controller/internal/metric"
@ -132,6 +133,8 @@ type Controller struct {
// Used to signal the Health Service to start
// replying to queries with "503 Service Unavailable".
HealthService *health.Service
pkiConnManager *cluster.DownstreamManager
}
func New(ctx context.Context, conf *Config) (*Controller, error) {
@ -146,6 +149,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
workerStatusUpdateTimes: new(sync.Map),
enabledPlugins: conf.Server.EnabledPlugins,
apiListeners: make([]*base.ServerListener, 0),
pkiConnManager: cluster.NewDownstreamManager(),
}
if downstreamRouterFactory != nil {
@ -385,7 +389,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
func (c *Controller) Start() error {
const op = "controller.(Controller).Start"
if c.started.Load() {
if c.started.Swap(true) {
event.WriteSysEvent(context.TODO(), op, "already started, skipping")
return nil
}
@ -424,10 +428,9 @@ func (c *Controller) Start() error {
defer c.tickerWg.Done()
c.startCloseExpiredPendingTokens(c.baseContext)
}()
go func() {
defer c.tickerWg.Done()
c.started.Store(true)
}()
if err := c.startWorkerConnectionMaintenanceTicking(c.baseContext, c.tickerWg, c.pkiConnManager); err != nil {
return errors.Wrap(c.baseContext, err, op)
}
if c.downstreamRoutes != nil {
c.tickerWg.Add(2)

@ -12,6 +12,7 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/hashicorp/boundary/internal/cmd/base"
"github.com/hashicorp/boundary/internal/daemon/cluster"
"github.com/hashicorp/boundary/internal/daemon/common"
"github.com/hashicorp/boundary/internal/daemon/controller/internal/metric"
"github.com/hashicorp/boundary/internal/errors"
@ -157,12 +158,20 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error
if err != nil {
return nil, fmt.Errorf("%s: error creating eventing listener: %w", op, err)
}
// This wraps the normal pki worker connections with a listener which adds
// the worker key id of the connections to the controller's pkiConnManager.
pkiWorkerTrackingListener, err := cluster.NewTrackingListener(c.baseContext, eventingAuthedListener, c.pkiConnManager)
if err != nil {
return nil, fmt.Errorf("%s: error creating pki worker tracking listener: %w", op, err)
}
// Create a multiplexer to unify connections between PKI and KMS
multiplexingAuthedListener, err := nodeenet.NewMultiplexingListener(c.baseContext, nodeeAuthedListener.Addr())
if err != nil {
return nil, fmt.Errorf("error instantiating authed multiplexed listener: %w", err)
}
if err := multiplexingAuthedListener.IngressListener(eventingAuthedListener); err != nil {
if err := multiplexingAuthedListener.IngressListener(pkiWorkerTrackingListener); err != nil {
return nil, fmt.Errorf("error adding authed evented listener to multiplexed listener: %w", err)
}
// Connections that came in on the cluster listener that are not authed via
@ -178,12 +187,20 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error
if err != nil {
return nil, fmt.Errorf("error instantiating reverse gprc connection split listener: %w", err)
}
// This wraps the reverse grpc pki worker connections with a listener which
// adds the worker key id of the connections to the pkiConnManager.
revPkiWorkerTrackingListener, err := cluster.NewTrackingListener(c.baseContext, reverseGrpcListener, c.pkiConnManager)
if err != nil {
return nil, fmt.Errorf("%s: error creating reverse grpc pki worker tracking listener: %w", op, err)
}
// Create a multiplexer to unify reverse grpc connections between PKI and KMS
multiplexingReverseGrpcListener, err := nodeenet.NewMultiplexingListener(c.baseContext, reverseGrpcListener.Addr())
if err != nil {
return nil, fmt.Errorf("error instantiating reverse grpc multiplexed listener: %w", err)
}
if err := multiplexingReverseGrpcListener.IngressListener(reverseGrpcListener); err != nil {
if err := multiplexingReverseGrpcListener.IngressListener(revPkiWorkerTrackingListener); err != nil {
return nil, fmt.Errorf("error adding reverse grpc listener to multiplexed listener: %w", err)
}

@ -33,8 +33,8 @@ func registerControllerServerCoordinationService(ctx context.Context, c *Control
return fmt.Errorf("%s: server is nil", op)
}
workerService := handlers.NewWorkerServiceServer(c.ServersRepoFn, c.SessionRepoFn, c.ConnectionRepoFn,
c.workerStatusUpdateTimes, c.kms)
workerService := handlers.NewWorkerServiceServer(c.ServersRepoFn, c.WorkerAuthRepoStorageFn,
c.SessionRepoFn, c.ConnectionRepoFn, c.workerStatusUpdateTimes, c.kms)
pbs.RegisterServerCoordinationServiceServer(server, workerService)
return nil
}
@ -51,8 +51,8 @@ func registerControllerSessionService(ctx context.Context, c *Controller, server
return fmt.Errorf("%s: server is nil", op)
}
workerService := handlers.NewWorkerServiceServer(c.ServersRepoFn, c.SessionRepoFn, c.ConnectionRepoFn,
c.workerStatusUpdateTimes, c.kms)
workerService := handlers.NewWorkerServiceServer(c.ServersRepoFn, c.WorkerAuthRepoStorageFn,
c.SessionRepoFn, c.ConnectionRepoFn, c.workerStatusUpdateTimes, c.kms)
pbs.RegisterSessionServiceServer(server, workerService)
return nil
}

@ -3,8 +3,10 @@ package controller
import (
"context"
"math/rand"
"sync"
"time"
"github.com/hashicorp/boundary/internal/daemon/cluster"
"github.com/hashicorp/boundary/internal/server/store"
"github.com/hashicorp/boundary/internal/errors"
@ -13,8 +15,9 @@ import (
// In the future we could make this configurable
const (
statusInterval = 10 * time.Second
terminationInterval = 1 * time.Minute
workerConnectionMaintenanceInterval = 3 * time.Second
statusInterval = 10 * time.Second
terminationInterval = 1 * time.Minute
)
// This is exported so it can be tweaked in tests
@ -155,3 +158,51 @@ func (c *Controller) startCloseExpiredPendingTokens(cancelCtx context.Context) {
}
}
}
func (c *Controller) startWorkerConnectionMaintenanceTicking(cancelCtx context.Context, wg *sync.WaitGroup, m *cluster.DownstreamManager) error {
const op = "controller.(Controller).startWorkerConnectionMaintenanceTicking"
switch {
case m == nil:
return errors.New(cancelCtx, errors.InvalidParameter, op, "DownstreamManager is nil")
case wg == nil:
return errors.New(cancelCtx, errors.InvalidParameter, op, "wait group is nil")
}
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
getRandomInterval := func() time.Duration {
// 0 to 0.5 adjustment to the base
f := r.Float64() / 2
// Half a chance to be faster, not slower
if r.Float32() > 0.5 {
f = -1 * f
}
return workerConnectionMaintenanceInterval + time.Duration(f*float64(workerConnectionMaintenanceInterval))
}
timer := time.NewTimer(0)
for {
select {
case <-cancelCtx.Done():
event.WriteSysEvent(cancelCtx, op, "context done, shutting down")
return
case <-timer.C:
repo, err := c.WorkerAuthRepoStorageFn()
if err != nil {
event.WriteError(cancelCtx, op, err, event.WithInfoMsg("error fetching repository for cluster connection maintenance"))
break
}
wKeyIds := m.Connected()
authorized, err := repo.FilterToAuthorizedWorkerKeyIds(cancelCtx, wKeyIds)
if err != nil {
event.WriteError(cancelCtx, op, err, event.WithInfoMsg("couldn't get authorized workers from repo"))
break
}
cluster.DisconnectUnauthorized(m, wKeyIds, authorized)
}
timer.Reset(getRandomInterval())
}
}()
return nil
}

@ -13,6 +13,7 @@ import (
"time"
"github.com/hashicorp/boundary/internal/cmd/base"
"github.com/hashicorp/boundary/internal/daemon/cluster"
"github.com/hashicorp/boundary/internal/daemon/common"
"github.com/hashicorp/boundary/internal/daemon/worker/internal/metric"
"github.com/hashicorp/boundary/internal/daemon/worker/session"
@ -172,6 +173,13 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger,
return nil, fmt.Errorf("error instantiating non-worker split listener: %w", err)
}
// This wraps the reverse grpc pki worker connections with a listener which
// adds the worker key id of the connections to the worker's pkiConnManager.
revPkiWorkerTrackingListener, err := cluster.NewTrackingListener(w.baseContext, reverseGrpcListener, w.pkiConnManager)
if err != nil {
return nil, fmt.Errorf("%s: error creating reverse grpc pki worker tracking listener: %w", op, err)
}
statsHandler, err := metric.InstrumentClusterStatsHandler(w.baseContext)
if err != nil {
return nil, errors.Wrap(w.baseContext, err, op)
@ -197,11 +205,18 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger,
return nil, fmt.Errorf("%s: error creating eventing listener: %w", op, err)
}
// This wraps the normal pki worker connections with a listener which adds
// the worker key id of the connections to the worker's pkiConnManager.
pkiWorkerTrackingListener, err := cluster.NewTrackingListener(cancelCtx, eventingListener, w.pkiConnManager)
if err != nil {
return nil, fmt.Errorf("%s: error creating pki worker tracking listener: %w", op, err)
}
return func() {
go w.workerAuthSplitListener.Start()
go httpServer.Serve(proxyListener)
go ln.GrpcServer.Serve(eventingListener)
go handleSecondaryConnection(cancelCtx, reverseGrpcListener, w.downstreamRoutes, -1)
go ln.GrpcServer.Serve(pkiWorkerTrackingListener)
go handleSecondaryConnection(cancelCtx, revPkiWorkerTrackingListener, w.downstreamRoutes, -1)
}, nil
}

@ -7,6 +7,7 @@ import (
"math/rand"
"time"
"github.com/hashicorp/boundary/internal/daemon/cluster"
"github.com/hashicorp/boundary/internal/daemon/worker/common"
"github.com/hashicorp/boundary/internal/daemon/worker/session"
pb "github.com/hashicorp/boundary/internal/gen/controller/servers"
@ -164,6 +165,7 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
event.WithInfoMsg("error making status request to controller"))
}
versionInfo := version.Get()
connectedWorkerKeyIds := w.pkiConnManager.Connected()
result, err := client.Status(statusCtx, &pbs.StatusRequest{
Jobs: activeJobs,
WorkerStatus: &pb.ServerWorkerStatus{
@ -175,7 +177,8 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
ReleaseVersion: versionInfo.FullVersionNumber(false),
OperationalState: w.operationalState.Load().(server.OperationalState).String(),
},
UpdateTags: w.updateTags.Load(),
ConnectedWorkerKeyIdentifiers: connectedWorkerKeyIds,
UpdateTags: w.updateTags.Load(),
})
if err != nil {
event.WriteError(cancelCtx, op, err, event.WithInfoMsg("error making status request to controller"))
@ -213,6 +216,10 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
}
w.updateTags.Store(false)
if authorized := result.GetAuthorizedWorkers(); authorized != nil {
cluster.DisconnectUnauthorized(w.pkiConnManager, connectedWorkerKeyIds, authorized.GetWorkerKeyIdentifiers())
}
var addrs []string
// This may be empty if we are in a multiple hop scenario
if len(result.CalculatedUpstreams) > 0 {

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/boundary/globals"
"github.com/hashicorp/boundary/internal/cmd/base"
"github.com/hashicorp/boundary/internal/cmd/config"
"github.com/hashicorp/boundary/internal/daemon/cluster"
"github.com/hashicorp/boundary/internal/daemon/worker/internal/metric"
"github.com/hashicorp/boundary/internal/daemon/worker/session"
"github.com/hashicorp/boundary/internal/errors"
@ -133,6 +134,8 @@ type Worker struct {
TestOverrideAuthRotationPeriod time.Duration
statusLock sync.Mutex
pkiConnManager *cluster.DownstreamManager
}
func New(conf *Config) (*Worker, error) {
@ -155,6 +158,7 @@ func New(conf *Config) (*Worker, error) {
nonceFn: base62.Random,
WorkerAuthCurrentKeyId: new(ua.String),
operationalState: new(atomic.Value),
pkiConnManager: cluster.NewDownstreamManager(),
}
if downstreamRouterFactory != nil {

@ -0,0 +1,90 @@
package worker
import (
"context"
"testing"
"time"
"github.com/hashicorp/boundary/internal/cmd/config"
"github.com/hashicorp/boundary/internal/daemon/controller"
"github.com/hashicorp/boundary/internal/server"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/connectivity"
)
func TestDeleteConnectedWorkers(t *testing.T) {
ctx := context.Background()
logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
})
conf, err := config.DevController()
require.NoError(t, err)
c := controller.NewTestController(t, &controller.TestControllerOpts{
Config: conf,
Logger: logger.Named("controller"),
})
t.Cleanup(c.Shutdown)
_, directPkiWorker, multiHoppedPkiWorker := NewTestMultihopWorkers(t, logger, c.Context(), c.ClusterAddrs(),
c.Config().WorkerAuthKms, c.Controller().ServersRepoFn, nil, nil)
serverRepo, err := c.Controller().ServersRepoFn()
require.NoError(t, err)
workers, err := serverRepo.ListWorkers(ctx, []string{"global"}, server.WithLiveness(-1))
require.NoError(t, err)
var childWorker *server.Worker
var pkiWorker *server.Worker
for _, w := range workers {
if w.Type == "pki" && w.GetAddress() == multiHoppedPkiWorker.ProxyAddrs()[0] {
childWorker = w
}
if w.Type == "pki" && w.GetAddress() == directPkiWorker.ProxyAddrs()[0] {
pkiWorker = w
}
}
require.NotNil(t, childWorker)
require.NotNil(t, pkiWorker)
cases := []struct {
name string
workerId string
testWorker *TestWorker
}{
{
name: "multi hop worker",
workerId: childWorker.GetPublicId(),
testWorker: multiHoppedPkiWorker,
},
{
name: "directly connected worker",
workerId: pkiWorker.GetPublicId(),
testWorker: directPkiWorker,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
prevState := tc.testWorker.Worker().GrpcClientConn.GetState()
require.NotEqual(t, connectivity.TransientFailure, prevState)
require.NotEqual(t, connectivity.Shutdown, prevState)
_, err = serverRepo.DeleteWorker(ctx, tc.workerId)
require.NoError(t, err)
stateChangeCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()
for {
tc.testWorker.Worker().GrpcClientConn.ResetConnectBackoff()
if !tc.testWorker.Worker().GrpcClientConn.WaitForStateChange(stateChangeCtx, prevState) {
assert.Fail(t, "State didn't change before context timed out")
break
}
newState := tc.testWorker.Worker().GrpcClientConn.GetState()
t.Logf("Changed from previous state: %s to new state: %s", prevState, newState)
if newState == connectivity.Shutdown || newState == connectivity.TransientFailure {
break
}
prevState = newState
}
})
}
}

@ -604,6 +604,9 @@ type StatusRequest struct {
// is easier and going the other route doesn't provide much benefit -- if you
// get access to the key and spoof the connection, you're already compromised.
WorkerStatus *servers.ServerWorkerStatus `protobuf:"bytes,40,opt,name=worker_status,json=workerStatus,proto3" json:"worker_status,omitempty"`
// The worker key identifiers presented by all downstreams connected to this
// worker
ConnectedWorkerKeyIdentifiers []string `protobuf:"bytes,50,rep,name=connected_worker_key_identifiers,json=connectedWorkerKeyIdentifiers,proto3" json:"connected_worker_key_identifiers,omitempty"`
}
func (x *StatusRequest) Reset() {
@ -659,6 +662,13 @@ func (x *StatusRequest) GetWorkerStatus() *servers.ServerWorkerStatus {
return nil
}
func (x *StatusRequest) GetConnectedWorkerKeyIdentifiers() []string {
if x != nil {
return x.ConnectedWorkerKeyIdentifiers
}
return nil
}
type JobChangeRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -714,6 +724,53 @@ func (x *JobChangeRequest) GetRequestType() CHANGETYPE {
return CHANGETYPE_CHANGETYPE_UNSPECIFIED
}
type AuthorizedWorkerList struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
WorkerKeyIdentifiers []string `protobuf:"bytes,1,rep,name=worker_key_identifiers,json=workerKeyIdentifiers,proto3" json:"worker_key_identifiers,omitempty"`
}
func (x *AuthorizedWorkerList) Reset() {
*x = AuthorizedWorkerList{}
if protoimpl.UnsafeEnabled {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *AuthorizedWorkerList) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AuthorizedWorkerList) ProtoMessage() {}
func (x *AuthorizedWorkerList) ProtoReflect() protoreflect.Message {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AuthorizedWorkerList.ProtoReflect.Descriptor instead.
func (*AuthorizedWorkerList) Descriptor() ([]byte, []int) {
return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{7}
}
func (x *AuthorizedWorkerList) GetWorkerKeyIdentifiers() []string {
if x != nil {
return x.WorkerKeyIdentifiers
}
return nil
}
type StatusResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -730,12 +787,15 @@ type StatusResponse struct {
// The ID of the worker which made the request. The worker can send this value in subsequent requests so the
// controller does not need to do a database lookup for the id using the name field.
WorkerId string `protobuf:"bytes,40,opt,name=worker_id,json=workerId,proto3" json:"worker_id,omitempty" class:"public"` // @gotags: `class:"public"`
// Of the worker key identifiers provided in the request, these are the ones
// which are authorized to remain connected.
AuthorizedWorkers *AuthorizedWorkerList `protobuf:"bytes,50,opt,name=authorized_workers,json=authorizedWorkers,proto3" json:"authorized_workers,omitempty"`
}
func (x *StatusResponse) Reset() {
*x = StatusResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7]
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -748,7 +808,7 @@ func (x *StatusResponse) String() string {
func (*StatusResponse) ProtoMessage() {}
func (x *StatusResponse) ProtoReflect() protoreflect.Message {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7]
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -761,7 +821,7 @@ func (x *StatusResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use StatusResponse.ProtoReflect.Descriptor instead.
func (*StatusResponse) Descriptor() ([]byte, []int) {
return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{7}
return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{8}
}
func (x *StatusResponse) GetJobsRequests() []*JobChangeRequest {
@ -785,6 +845,13 @@ func (x *StatusResponse) GetWorkerId() string {
return ""
}
func (x *StatusResponse) GetAuthorizedWorkers() *AuthorizedWorkerList {
if x != nil {
return x.AuthorizedWorkers
}
return nil
}
// WorkerInfo contains information about workers for the HcpbWorkerResponse message
type WorkerInfo struct {
state protoimpl.MessageState
@ -800,7 +867,7 @@ type WorkerInfo struct {
func (x *WorkerInfo) Reset() {
*x = WorkerInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8]
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -813,7 +880,7 @@ func (x *WorkerInfo) String() string {
func (*WorkerInfo) ProtoMessage() {}
func (x *WorkerInfo) ProtoReflect() protoreflect.Message {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8]
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -826,7 +893,7 @@ func (x *WorkerInfo) ProtoReflect() protoreflect.Message {
// Deprecated: Use WorkerInfo.ProtoReflect.Descriptor instead.
func (*WorkerInfo) Descriptor() ([]byte, []int) {
return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{8}
return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{9}
}
func (x *WorkerInfo) GetId() string {
@ -853,7 +920,7 @@ type ListHcpbWorkersRequest struct {
func (x *ListHcpbWorkersRequest) Reset() {
*x = ListHcpbWorkersRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9]
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -866,7 +933,7 @@ func (x *ListHcpbWorkersRequest) String() string {
func (*ListHcpbWorkersRequest) ProtoMessage() {}
func (x *ListHcpbWorkersRequest) ProtoReflect() protoreflect.Message {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9]
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -879,7 +946,7 @@ func (x *ListHcpbWorkersRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use ListHcpbWorkersRequest.ProtoReflect.Descriptor instead.
func (*ListHcpbWorkersRequest) Descriptor() ([]byte, []int) {
return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{9}
return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{10}
}
// A response containing worker information
@ -894,7 +961,7 @@ type ListHcpbWorkersResponse struct {
func (x *ListHcpbWorkersResponse) Reset() {
*x = ListHcpbWorkersResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10]
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -907,7 +974,7 @@ func (x *ListHcpbWorkersResponse) String() string {
func (*ListHcpbWorkersResponse) ProtoMessage() {}
func (x *ListHcpbWorkersResponse) ProtoReflect() protoreflect.Message {
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10]
mi := &file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[11]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -920,7 +987,7 @@ func (x *ListHcpbWorkersResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use ListHcpbWorkersResponse.ProtoReflect.Descriptor instead.
func (*ListHcpbWorkersResponse) Descriptor() ([]byte, []int) {
return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{10}
return file_controller_servers_services_v1_server_coordination_service_proto_rawDescGZIP(), []int{11}
}
func (x *ListHcpbWorkersResponse) GetWorkers() []*WorkerInfo {
@ -991,7 +1058,7 @@ var file_controller_servers_services_v1_server_coordination_service_proto_rawDes
0x0a, 0x10, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49,
0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4e,
0x54, 0x52, 0x4f, 0x4c, 0x4c, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x54, 0x59, 0x50,
0x45, 0x5f, 0x57, 0x4f, 0x52, 0x4b, 0x45, 0x52, 0x10, 0x02, 0x22, 0xcd, 0x01, 0x0a, 0x0d, 0x53,
0x45, 0x5f, 0x57, 0x4f, 0x52, 0x4b, 0x45, 0x52, 0x10, 0x02, 0x22, 0x96, 0x02, 0x0a, 0x0d, 0x53,
0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3d, 0x0a, 0x04,
0x6a, 0x6f, 0x62, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x63, 0x6f, 0x6e,
0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e,
@ -1003,95 +1070,111 @@ var file_controller_servers_services_v1_server_coordination_service_proto_rawDes
0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72,
0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0c,
0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x4a, 0x04, 0x08, 0x0a,
0x10, 0x0b, 0x52, 0x06, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x22, 0x98, 0x01, 0x0a, 0x10, 0x4a,
0x6f, 0x62, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x35, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x63,
0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4a, 0x6f,
0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x4d, 0x0a, 0x0c, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2a, 0x2e, 0x63,
0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x48,
0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x52, 0x0b, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x54, 0x79, 0x70, 0x65, 0x22, 0xfa, 0x01, 0x0a, 0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x55, 0x0a, 0x0d, 0x6a, 0x6f, 0x62, 0x73,
0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x30, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72,
0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31,
0x2e, 0x4a, 0x6f, 0x62, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x52, 0x0c, 0x6a, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x12,
0x61, 0x0a, 0x14, 0x63, 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x75, 0x70,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x18, 0x1e, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2e, 0x2e,
0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65,
0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x55,
0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x13, 0x63,
0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x64, 0x55, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18,
0x28, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x4a,
0x04, 0x08, 0x0a, 0x10, 0x0b, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65,
0x72, 0x73, 0x22, 0x36, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f,
0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64,
0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28,
0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x18, 0x0a, 0x16, 0x4c, 0x69,
0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x22, 0x5f, 0x0a, 0x17, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62,
0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
0x44, 0x0a, 0x07, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x2a, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65,
0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76,
0x31, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x07, 0x77, 0x6f,
0x72, 0x6b, 0x65, 0x72, 0x73, 0x2a, 0x92, 0x01, 0x0a, 0x10, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43,
0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x12, 0x20, 0x0a, 0x1c, 0x43, 0x4f,
0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55,
0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b,
0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53,
0x5f, 0x41, 0x55, 0x54, 0x48, 0x4f, 0x52, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x01, 0x12, 0x1e, 0x0a,
0x1a, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55,
0x53, 0x5f, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a,
0x17, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55,
0x53, 0x5f, 0x43, 0x4c, 0x4f, 0x53, 0x45, 0x44, 0x10, 0x03, 0x2a, 0x9e, 0x01, 0x0a, 0x0d, 0x53,
0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x12, 0x1d, 0x0a, 0x19,
0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e,
0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x53,
0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x50, 0x45, 0x4e,
0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f,
0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x02,
0x12, 0x1b, 0x0a, 0x17, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55,
0x53, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x03, 0x12, 0x1c, 0x0a,
0x18, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x54,
0x45, 0x52, 0x4d, 0x49, 0x4e, 0x41, 0x54, 0x45, 0x44, 0x10, 0x04, 0x2a, 0x37, 0x0a, 0x07, 0x4a,
0x4f, 0x42, 0x54, 0x59, 0x50, 0x45, 0x12, 0x17, 0x0a, 0x13, 0x4a, 0x4f, 0x42, 0x54, 0x59, 0x50,
0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12,
0x13, 0x0a, 0x0f, 0x4a, 0x4f, 0x42, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49,
0x4f, 0x4e, 0x10, 0x01, 0x2a, 0x45, 0x0a, 0x0a, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59,
0x50, 0x45, 0x12, 0x1a, 0x0a, 0x16, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45,
0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x1b,
0x0a, 0x17, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x50, 0x44,
0x41, 0x54, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x10, 0x01, 0x32, 0x8d, 0x02, 0x0a, 0x19,
0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x69, 0x0a, 0x06, 0x53, 0x74, 0x61,
0x74, 0x75, 0x73, 0x12, 0x2d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72,
0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e,
0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x47, 0x0a, 0x20,
0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72,
0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x73,
0x18, 0x32, 0x20, 0x03, 0x28, 0x09, 0x52, 0x1d, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65,
0x64, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69,
0x66, 0x69, 0x65, 0x72, 0x73, 0x4a, 0x04, 0x08, 0x0a, 0x10, 0x0b, 0x52, 0x06, 0x77, 0x6f, 0x72,
0x6b, 0x65, 0x72, 0x22, 0x98, 0x01, 0x0a, 0x10, 0x4a, 0x6f, 0x62, 0x43, 0x68, 0x61, 0x6e, 0x67,
0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x35, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c,
0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12,
0x4d, 0x0a, 0x0c, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2a, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c,
0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50,
0x45, 0x52, 0x0b, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x79, 0x70, 0x65, 0x22, 0x4c,
0x0a, 0x14, 0x41, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x7a, 0x65, 0x64, 0x57, 0x6f, 0x72, 0x6b,
0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x16, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72,
0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x73,
0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x14, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4b, 0x65,
0x79, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x73, 0x22, 0xdf, 0x02, 0x0a,
0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
0x55, 0x0a, 0x0d, 0x6a, 0x6f, 0x62, 0x73, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73,
0x18, 0x14, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c,
0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4a, 0x6f, 0x62, 0x43, 0x68, 0x61, 0x6e, 0x67,
0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x0c, 0x6a, 0x6f, 0x62, 0x73, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x12, 0x61, 0x0a, 0x14, 0x63, 0x61, 0x6c, 0x63, 0x75, 0x6c,
0x61, 0x74, 0x65, 0x64, 0x5f, 0x75, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x18, 0x1e,
0x20, 0x03, 0x28, 0x0b, 0x32, 0x2e, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65,
0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63,
0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65,
0x72, 0x76, 0x65, 0x72, 0x52, 0x13, 0x63, 0x61, 0x6c, 0x63, 0x75, 0x6c, 0x61, 0x74, 0x65, 0x64,
0x55, 0x70, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x77, 0x6f, 0x72,
0x6b, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x28, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f,
0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x12, 0x63, 0x0a, 0x12, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72,
0x69, 0x7a, 0x65, 0x64, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x32, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x34, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e,
0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73,
0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x12, 0x84, 0x01, 0x0a, 0x0f, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70,
0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x36, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72,
0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63,
0x2e, 0x76, 0x31, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x7a, 0x65, 0x64, 0x57, 0x6f,
0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x11, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72,
0x69, 0x7a, 0x65, 0x64, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x4a, 0x04, 0x08, 0x0a, 0x10,
0x0b, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x73, 0x22, 0x36,
0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0e, 0x0a, 0x02,
0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07,
0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x18, 0x0a, 0x16, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63,
0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x37, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65,
0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76,
0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72,
0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x51, 0x5a, 0x4f, 0x67,
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63,
0x6f, 0x72, 0x70, 0x2f, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x2f, 0x69, 0x6e, 0x74,
0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f,
0x6c, 0x6c, 0x65, 0x72, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x73, 0x3b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x22, 0x5f, 0x0a, 0x17, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b,
0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x44, 0x0a, 0x07, 0x77,
0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x63,
0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x57, 0x6f,
0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x07, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72,
0x73, 0x2a, 0x92, 0x01, 0x0a, 0x10, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e,
0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x12, 0x20, 0x0a, 0x1c, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43,
0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45,
0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x43, 0x4f, 0x4e, 0x4e,
0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x41, 0x55, 0x54,
0x48, 0x4f, 0x52, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x01, 0x12, 0x1e, 0x0a, 0x1a, 0x43, 0x4f, 0x4e,
0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x43, 0x4f,
0x4e, 0x4e, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x4f, 0x4e,
0x4e, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x43, 0x4c,
0x4f, 0x53, 0x45, 0x44, 0x10, 0x03, 0x2a, 0x9e, 0x01, 0x0a, 0x0d, 0x53, 0x45, 0x53, 0x53, 0x49,
0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x12, 0x1d, 0x0a, 0x19, 0x53, 0x45, 0x53, 0x53,
0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43,
0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x53, 0x45, 0x53, 0x53, 0x49,
0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47,
0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41,
0x54, 0x55, 0x53, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x02, 0x12, 0x1b, 0x0a, 0x17,
0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x43, 0x41,
0x4e, 0x43, 0x45, 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x03, 0x12, 0x1c, 0x0a, 0x18, 0x53, 0x45, 0x53,
0x53, 0x49, 0x4f, 0x4e, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x54, 0x45, 0x52, 0x4d, 0x49,
0x4e, 0x41, 0x54, 0x45, 0x44, 0x10, 0x04, 0x2a, 0x37, 0x0a, 0x07, 0x4a, 0x4f, 0x42, 0x54, 0x59,
0x50, 0x45, 0x12, 0x17, 0x0a, 0x13, 0x4a, 0x4f, 0x42, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e,
0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x4a,
0x4f, 0x42, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x45, 0x53, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x01,
0x2a, 0x45, 0x0a, 0x0a, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x12, 0x1a,
0x0a, 0x16, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53,
0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x48,
0x41, 0x4e, 0x47, 0x45, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f,
0x53, 0x54, 0x41, 0x54, 0x45, 0x10, 0x01, 0x32, 0x8d, 0x02, 0x0a, 0x19, 0x53, 0x65, 0x72, 0x76,
0x65, 0x72, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x69, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12,
0x2d, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72,
0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31,
0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e,
0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76,
0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e,
0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
0x12, 0x84, 0x01, 0x0a, 0x0f, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72,
0x6b, 0x65, 0x72, 0x73, 0x12, 0x36, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65,
0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63,
0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f,
0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x37, 0x2e, 0x63,
0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x73, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69,
0x73, 0x74, 0x48, 0x63, 0x70, 0x62, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x51, 0x5a, 0x4f, 0x67, 0x69, 0x74, 0x68, 0x75,
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f,
0x62, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x72, 0x79, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61,
0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72,
0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x73, 0x3b, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
@ -1107,7 +1190,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_rawDe
}
var file_controller_servers_services_v1_server_coordination_service_proto_enumTypes = make([]protoimpl.EnumInfo, 5)
var file_controller_servers_services_v1_server_coordination_service_proto_msgTypes = make([]protoimpl.MessageInfo, 11)
var file_controller_servers_services_v1_server_coordination_service_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
var file_controller_servers_services_v1_server_coordination_service_proto_goTypes = []interface{}{
(CONNECTIONSTATUS)(0), // 0: controller.servers.services.v1.CONNECTIONSTATUS
(SESSIONSTATUS)(0), // 1: controller.servers.services.v1.SESSIONSTATUS
@ -1121,11 +1204,12 @@ var file_controller_servers_services_v1_server_coordination_service_proto_goType
(*UpstreamServer)(nil), // 9: controller.servers.services.v1.UpstreamServer
(*StatusRequest)(nil), // 10: controller.servers.services.v1.StatusRequest
(*JobChangeRequest)(nil), // 11: controller.servers.services.v1.JobChangeRequest
(*StatusResponse)(nil), // 12: controller.servers.services.v1.StatusResponse
(*WorkerInfo)(nil), // 13: controller.servers.services.v1.WorkerInfo
(*ListHcpbWorkersRequest)(nil), // 14: controller.servers.services.v1.ListHcpbWorkersRequest
(*ListHcpbWorkersResponse)(nil), // 15: controller.servers.services.v1.ListHcpbWorkersResponse
(*servers.ServerWorkerStatus)(nil), // 16: controller.servers.v1.ServerWorkerStatus
(*AuthorizedWorkerList)(nil), // 12: controller.servers.services.v1.AuthorizedWorkerList
(*StatusResponse)(nil), // 13: controller.servers.services.v1.StatusResponse
(*WorkerInfo)(nil), // 14: controller.servers.services.v1.WorkerInfo
(*ListHcpbWorkersRequest)(nil), // 15: controller.servers.services.v1.ListHcpbWorkersRequest
(*ListHcpbWorkersResponse)(nil), // 16: controller.servers.services.v1.ListHcpbWorkersResponse
(*servers.ServerWorkerStatus)(nil), // 17: controller.servers.v1.ServerWorkerStatus
}
var file_controller_servers_services_v1_server_coordination_service_proto_depIdxs = []int32{
0, // 0: controller.servers.services.v1.Connection.status:type_name -> controller.servers.services.v1.CONNECTIONSTATUS
@ -1136,21 +1220,22 @@ var file_controller_servers_services_v1_server_coordination_service_proto_depIdx
7, // 5: controller.servers.services.v1.JobStatus.job:type_name -> controller.servers.services.v1.Job
4, // 6: controller.servers.services.v1.UpstreamServer.type:type_name -> controller.servers.services.v1.UpstreamServer.TYPE
8, // 7: controller.servers.services.v1.StatusRequest.jobs:type_name -> controller.servers.services.v1.JobStatus
16, // 8: controller.servers.services.v1.StatusRequest.worker_status:type_name -> controller.servers.v1.ServerWorkerStatus
17, // 8: controller.servers.services.v1.StatusRequest.worker_status:type_name -> controller.servers.v1.ServerWorkerStatus
7, // 9: controller.servers.services.v1.JobChangeRequest.job:type_name -> controller.servers.services.v1.Job
3, // 10: controller.servers.services.v1.JobChangeRequest.request_type:type_name -> controller.servers.services.v1.CHANGETYPE
11, // 11: controller.servers.services.v1.StatusResponse.jobs_requests:type_name -> controller.servers.services.v1.JobChangeRequest
9, // 12: controller.servers.services.v1.StatusResponse.calculated_upstreams:type_name -> controller.servers.services.v1.UpstreamServer
13, // 13: controller.servers.services.v1.ListHcpbWorkersResponse.workers:type_name -> controller.servers.services.v1.WorkerInfo
10, // 14: controller.servers.services.v1.ServerCoordinationService.Status:input_type -> controller.servers.services.v1.StatusRequest
14, // 15: controller.servers.services.v1.ServerCoordinationService.ListHcpbWorkers:input_type -> controller.servers.services.v1.ListHcpbWorkersRequest
12, // 16: controller.servers.services.v1.ServerCoordinationService.Status:output_type -> controller.servers.services.v1.StatusResponse
15, // 17: controller.servers.services.v1.ServerCoordinationService.ListHcpbWorkers:output_type -> controller.servers.services.v1.ListHcpbWorkersResponse
16, // [16:18] is the sub-list for method output_type
14, // [14:16] is the sub-list for method input_type
14, // [14:14] is the sub-list for extension type_name
14, // [14:14] is the sub-list for extension extendee
0, // [0:14] is the sub-list for field type_name
12, // 13: controller.servers.services.v1.StatusResponse.authorized_workers:type_name -> controller.servers.services.v1.AuthorizedWorkerList
14, // 14: controller.servers.services.v1.ListHcpbWorkersResponse.workers:type_name -> controller.servers.services.v1.WorkerInfo
10, // 15: controller.servers.services.v1.ServerCoordinationService.Status:input_type -> controller.servers.services.v1.StatusRequest
15, // 16: controller.servers.services.v1.ServerCoordinationService.ListHcpbWorkers:input_type -> controller.servers.services.v1.ListHcpbWorkersRequest
13, // 17: controller.servers.services.v1.ServerCoordinationService.Status:output_type -> controller.servers.services.v1.StatusResponse
16, // 18: controller.servers.services.v1.ServerCoordinationService.ListHcpbWorkers:output_type -> controller.servers.services.v1.ListHcpbWorkersResponse
17, // [17:19] is the sub-list for method output_type
15, // [15:17] is the sub-list for method input_type
15, // [15:15] is the sub-list for extension type_name
15, // [15:15] is the sub-list for extension extendee
0, // [0:15] is the sub-list for field type_name
}
func init() { file_controller_servers_services_v1_server_coordination_service_proto_init() }
@ -1244,7 +1329,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init(
}
}
file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StatusResponse); i {
switch v := v.(*AuthorizedWorkerList); i {
case 0:
return &v.state
case 1:
@ -1256,7 +1341,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init(
}
}
file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*WorkerInfo); i {
switch v := v.(*StatusResponse); i {
case 0:
return &v.state
case 1:
@ -1268,7 +1353,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init(
}
}
file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListHcpbWorkersRequest); i {
switch v := v.(*WorkerInfo); i {
case 0:
return &v.state
case 1:
@ -1280,6 +1365,18 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init(
}
}
file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListHcpbWorkersRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_controller_servers_services_v1_server_coordination_service_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ListHcpbWorkersResponse); i {
case 0:
return &v.state
@ -1301,7 +1398,7 @@ func file_controller_servers_services_v1_server_coordination_service_proto_init(
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_controller_servers_services_v1_server_coordination_service_proto_rawDesc,
NumEnums: 5,
NumMessages: 11,
NumMessages: 12,
NumExtensions: 0,
NumServices: 1,
},

@ -95,6 +95,10 @@ message StatusRequest {
// is easier and going the other route doesn't provide much benefit -- if you
// get access to the key and spoof the connection, you're already compromised.
servers.v1.ServerWorkerStatus worker_status = 40;
// The worker key identifiers presented by all downstreams connected to this
// worker
repeated string connected_worker_key_identifiers = 50;
}
enum CHANGETYPE {
@ -109,6 +113,10 @@ message JobChangeRequest {
CHANGETYPE request_type = 2;
}
message AuthorizedWorkerList {
repeated string worker_key_identifiers = 1;
}
message StatusResponse {
reserved 10;
reserved "controllers";
@ -126,6 +134,10 @@ message StatusResponse {
// The ID of the worker which made the request. The worker can send this value in subsequent requests so the
// controller does not need to do a database lookup for the id using the name field.
string worker_id = 40; // @gotags: `class:"public"`
// Of the worker key identifiers provided in the request, these are the ones
// which are authorized to remain connected.
AuthorizedWorkerList authorized_workers = 50;
}
// WorkerInfo contains information about workers for the HcpbWorkerResponse message

@ -30,4 +30,12 @@ const (
)
select * from worker_auth_authorized where worker_id in (select * from key_id_to_worker_id)
`
authorizedWorkerQuery = `
select distinct w.worker_key_identifier
from
worker_auth_certificate_bundle as w
where
w.worker_key_identifier in (?)
`
)

@ -520,6 +520,33 @@ func (r *WorkerAuthRepositoryStorage) findCertBundles(ctx context.Context, worke
return certBundle, nil
}
// FilterToAuthorizedWorkerKeyIds returns all the worker key identifiers that
// are authorizable from the slice of key identifiers provided to the function.
func (r *WorkerAuthRepositoryStorage) FilterToAuthorizedWorkerKeyIds(ctx context.Context, workerKeyIds []string) ([]string, error) {
const op = "server.(WorkerAuthRepositoryStorage).FilterToAuthorizedWorkerKeyIds"
if len(workerKeyIds) == 0 {
return nil, nil
}
rows, err := r.reader.Query(ctx, authorizedWorkerQuery, []interface{}{workerKeyIds})
if err != nil {
}
defer rows.Close()
type rowsResult struct {
WorkerKeyIdentifier string
}
var ret []string
for rows.Next() {
var result rowsResult
err = r.reader.ScanRows(ctx, rows, &result)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
ret = append(ret, result.WorkerKeyIdentifier)
}
return ret, nil
}
func (r *WorkerAuthRepositoryStorage) loadRootCertificates(ctx context.Context, cert *types.RootCertificates) error {
const op = "server.(WorkerAuthRepositoryStorage).loadRootCertificates"
if cert == nil {

@ -450,6 +450,49 @@ func TestStoreNodeInformationTx(t *testing.T) {
}
}
func TestFilterToAuthorizedWorkerKeyIds(t *testing.T) {
ctx := context.Background()
rootWrapper := db.TestWrapper(t)
conn, _ := db.TestSetup(t, "postgres")
kmsCache := kms.TestKms(t, conn, rootWrapper)
// Ensures the global scope contains a valid root key
require.NoError(t, kmsCache.CreateKeys(context.Background(), scope.Global.String(), kms.WithRandomReader(rand.Reader)))
rw := db.New(conn)
repo, err := NewRepositoryStorage(ctx, rw, rw, kmsCache)
require.NoError(t, err)
got, err := repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{})
require.NoError(t, err)
assert.Empty(t, got)
var keyId1 string
w1 := TestPkiWorker(t, conn, rootWrapper, WithTestPkiWorkerAuthorizedKeyId(&keyId1))
var keyId2 string
_ = TestPkiWorker(t, conn, rootWrapper, WithTestPkiWorkerAuthorizedKeyId(&keyId2))
got, err = repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{"not-found-key-id", keyId1})
assert.NoError(t, err)
assert.Equal(t, []string{keyId1}, got)
got, err = repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{keyId2, "not-found-key-id"})
assert.NoError(t, err)
assert.Equal(t, []string{keyId2}, got)
got, err = repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{keyId1, keyId2, "unfound-key"})
assert.NoError(t, err)
assert.ElementsMatch(t, []string{keyId1, keyId2}, got)
workerRepo, err := NewRepository(rw, rw, kmsCache)
require.NoError(t, err)
_, err = workerRepo.DeleteWorker(ctx, w1.GetPublicId())
require.NoError(t, err)
got, err = repo.FilterToAuthorizedWorkerKeyIds(ctx, []string{keyId1, keyId2, "unfound-key"})
assert.NoError(t, err)
assert.Equal(t, []string{keyId2}, got)
}
type mockTestWrapper struct {
wrapping.Wrapper
decryptError bool

Loading…
Cancel
Save