From 59ae4b5901cd7df6fa8651696d6c05833822b3bb Mon Sep 17 00:00:00 2001 From: Todd Date: Fri, 28 Apr 2023 12:24:42 -0700 Subject: [PATCH] Expand the recorder manager interface --- .../daemon/worker/controller_connection.go | 1 - internal/daemon/worker/rpc_registration.go | 2 +- internal/daemon/worker/status.go | 46 +++++++++++++++++-- internal/daemon/worker/worker.go | 39 ++++++++++++---- .../daemon/worker/worker_proxy_service.go | 24 ++++------ 5 files changed, 82 insertions(+), 30 deletions(-) diff --git a/internal/daemon/worker/controller_connection.go b/internal/daemon/worker/controller_connection.go index 9da64036b6..f4921768ae 100644 --- a/internal/daemon/worker/controller_connection.go +++ b/internal/daemon/worker/controller_connection.go @@ -262,7 +262,6 @@ func (w *Worker) createClientConn(addr string) error { } w.GrpcClientConn = cc - w.controllerStatusConn.Store(pbs.NewServerCoordinationServiceClient(cc)) w.controllerMultihopConn.Store(multihop.NewMultihopServiceClient(cc)) var producer handlers.UpstreamMessageServiceClientProducer diff --git a/internal/daemon/worker/rpc_registration.go b/internal/daemon/worker/rpc_registration.go index ccee878615..1f8e2ecdb5 100644 --- a/internal/daemon/worker/rpc_registration.go +++ b/internal/daemon/worker/rpc_registration.go @@ -37,7 +37,7 @@ func registerWorkerStatusSessionService(ctx context.Context, w *Worker, server * return fmt.Errorf("%s: server is nil", op) } - statusSessionService := NewWorkerProxyServiceServer(w.GrpcClientConn, w.controllerStatusConn) + statusSessionService := NewWorkerProxyServiceServer(w.GrpcClientConn) pbs.RegisterServerCoordinationServiceServer(server, statusSessionService) pbs.RegisterSessionServiceServer(server, statusSessionService) return nil diff --git a/internal/daemon/worker/status.go b/internal/daemon/worker/status.go index 0433d96dd2..30a0d76145 100644 --- a/internal/daemon/worker/status.go +++ b/internal/daemon/worker/status.go @@ -33,7 +33,7 @@ type LastStatusInformation struct { LastCalculatedUpstreams []string } -func (w *Worker) startStatusTicking(cancelCtx context.Context, sessionManager session.Manager, addrReceivers *[]addressReceiver) { +func (w *Worker) startStatusTicking(cancelCtx context.Context, sessionManager session.Manager, addrReceivers *[]addressReceiver, recorderManager recorderManager) { const op = "worker.(Worker).startStatusTicking" r := rand.New(rand.NewSource(time.Now().UnixNano())) // This function exists to desynchronize calls to controllers from @@ -65,7 +65,7 @@ func (w *Worker) startStatusTicking(cancelCtx context.Context, sessionManager se continue } - w.sendWorkerStatus(cancelCtx, sessionManager, addrReceivers) + w.sendWorkerStatus(cancelCtx, sessionManager, addrReceivers, recorderManager) timer.Reset(getRandomInterval()) } } @@ -113,15 +113,41 @@ func (w *Worker) WaitForNextSuccessfulStatusUpdate() error { return nil } -func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager session.Manager, addressReceivers *[]addressReceiver) { +func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager session.Manager, addressReceivers *[]addressReceiver, recorderManager recorderManager) { const op = "worker.(Worker).sendWorkerStatus" w.statusLock.Lock() defer w.statusLock.Unlock() + // Collect the different session ids that are being monitored by this worker + var monitoredSessionIds []string + + if recorderManager != nil { + recSessIds, err := recorderManager.SessionsManaged(cancelCtx) + if err != nil { + event.WriteError(cancelCtx, op, err, event.WithInfoMsg("error getting session ids from recorderManager")) + } else { + monitoredSessionIds = append(monitoredSessionIds, recSessIds...) + } + } + // First send info as-is. We'll perform cleanup duties after we // get cancel/job change info back. var activeJobs []*pbs.JobStatus + for _, sid := range monitoredSessionIds { + activeJobs = append(activeJobs, &pbs.JobStatus{ + Job: &pbs.Job{ + Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION, + JobInfo: &pbs.Job_MonitorSessionInfo{ + MonitorSessionInfo: &pbs.MonitorSessionJobInfo{ + SessionId: sid, + Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE, + }, + }, + }, + }) + } + // Range over known sessions and collect info sessionManager.ForEachLocalSession(func(s session.Session) bool { var jobInfo pbs.SessionJobInfo @@ -154,7 +180,7 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess }) // Send status information - client := w.controllerStatusConn.Load().(pbs.ServerCoordinationServiceClient) + client := pbs.NewServerCoordinationServiceClient(w.GrpcClientConn) var tags []*pb.TagPair // If we're not going to request a tag update, no reason to have these // marshaled on every status call. @@ -283,10 +309,17 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now(), LastCalculatedUpstreams: addrs}) + var nonActiveMonitoredSessionIds []string + for _, request := range result.GetJobsRequests() { switch request.GetRequestType() { case pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE: switch request.GetJob().GetType() { + case pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION: + si := request.GetJob().GetMonitorSessionInfo() + if si != nil && si.Status != pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE { + nonActiveMonitoredSessionIds = append(nonActiveMonitoredSessionIds, si.GetSessionId()) + } case pbs.JOBTYPE_JOBTYPE_SESSION: sessInfo := request.GetJob().GetSessionInfo() sessionId := sessInfo.GetSessionId() @@ -307,6 +340,11 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess } } } + if recorderManager != nil { + if err := recorderManager.ReauthorizeAllExcept(cancelCtx, nonActiveMonitoredSessionIds); err != nil { + event.WriteError(cancelCtx, op, err) + } + } // Standard cleanup: Run through current jobs. Cancel connections // for any canceling session or any session that is expired. diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index 1d2ca0a20f..1fcf4e2b14 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -77,7 +77,15 @@ type downstreamers interface { // recorderManager updates the status updates with relevant recording // information -type recorderManager any +type recorderManager interface { + // ReauthorizeAllExcept should be called with the result of the status update + // to reauthorize all recorders for the relevant sessions except the ones provided + ReauthorizeAllExcept(ctx context.Context, closedSessions []string) error + // SessionsManaged gets the list of session ids managed by this recorderManager + SessionsManaged(ctx context.Context) ([]string, error) + // Shutdown must be called prior to exiting the process + Shutdown(ctx context.Context) +} // reverseConnReceiverFactory provides a simple factory which a Worker can use to // create its reverseConnReceiver @@ -119,11 +127,10 @@ type Worker struct { recorderManager recorderManager - controllerStatusConn *atomic.Value - everAuthenticated *ua.Uint32 - lastStatusSuccess *atomic.Value - workerStartTime time.Time - operationalState *atomic.Value + everAuthenticated *ua.Uint32 + lastStatusSuccess *atomic.Value + workerStartTime time.Time + operationalState *atomic.Value controllerMultihopConn *atomic.Value @@ -185,7 +192,6 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { conf: conf, logger: conf.Logger.Named("worker"), started: ua.NewBool(false), - controllerStatusConn: new(atomic.Value), everAuthenticated: ua.NewUint32(authenticationStatusNeverAuthenticated), lastStatusSuccess: new(atomic.Value), controllerMultihopConn: new(atomic.Value), @@ -200,6 +206,8 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { statusCallTimeoutDuration: new(atomic.Int64), } + w.operationalState.Store(server.UnknownOperationalState) + if reverseConnReceiverFactory != nil { w.downstreamReceiver = reverseConnReceiverFactory() } @@ -358,8 +366,6 @@ func (w *Worker) Start() error { return nil } - w.operationalState.Store(server.UnknownOperationalState) - if !w.conf.RawConfig.Worker.UseDeprecatedKmsAuthMethod { // In this section, we look for existing worker credentials. The two // variables below store whether to create new credentials and whether to @@ -521,7 +527,7 @@ func (w *Worker) Start() error { w.tickerWg.Add(2) go func() { defer w.tickerWg.Done() - w.startStatusTicking(w.baseContext, w.sessionManager, &w.addressReceivers) + w.startStatusTicking(w.baseContext, w.sessionManager, &w.addressReceivers, w.recorderManager) }() go func() { defer w.tickerWg.Done() @@ -614,6 +620,18 @@ func (w *Worker) Shutdown() error { return fmt.Errorf("error stopping worker servers and listeners: %w", err) } + var recManWg sync.WaitGroup + if w.recorderManager != nil { + recManWg.Add(1) + go func() { + // Shutdown recorder manager to close all recorders, done in a go routine + // since it will not force shutdown of channels until the passed in context + // is Done. + defer recManWg.Done() + w.recorderManager.Shutdown(w.baseContext) + }() + } + // Shut down all connections. w.cleanupConnections(w.baseContext, true, w.sessionManager) @@ -644,6 +662,7 @@ func (w *Worker) Shutdown() error { w.started.Store(false) w.tickerWg.Wait() + recManWg.Wait() if w.conf.Eventer != nil { if err := w.conf.Eventer.FlushNodes(context.Background()); err != nil { return fmt.Errorf("error flushing worker eventer nodes: %w", err) diff --git a/internal/daemon/worker/worker_proxy_service.go b/internal/daemon/worker/worker_proxy_service.go index 18ca4a6078..63fb8cd0a8 100644 --- a/internal/daemon/worker/worker_proxy_service.go +++ b/internal/daemon/worker/worker_proxy_service.go @@ -5,7 +5,6 @@ package worker import ( "context" - "sync/atomic" pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" "google.golang.org/grpc" @@ -15,8 +14,7 @@ type workerProxyServiceServer struct { pbs.UnsafeServerCoordinationServiceServer pbs.UnsafeSessionServiceServer - scsClient *atomic.Value - ssClient pbs.SessionServiceClient + cc *grpc.ClientConn } var ( @@ -26,16 +24,14 @@ var ( func NewWorkerProxyServiceServer( cc *grpc.ClientConn, - scsClient *atomic.Value, ) *workerProxyServiceServer { return &workerProxyServiceServer{ - scsClient: scsClient, - ssClient: pbs.NewSessionServiceClient(cc), + cc: cc, } } func (ws *workerProxyServiceServer) Status(ctx context.Context, req *pbs.StatusRequest) (*pbs.StatusResponse, error) { - resp, err := ws.scsClient.Load().(pbs.ServerCoordinationServiceClient).Status(ctx, req) + resp, err := pbs.NewServerCoordinationServiceClient(ws.cc).Status(ctx, req) if resp != nil { // We don't currently support distributing new addreses to workers @@ -47,29 +43,29 @@ func (ws *workerProxyServiceServer) Status(ctx context.Context, req *pbs.StatusR } func (ws *workerProxyServiceServer) ListHcpbWorkers(ctx context.Context, req *pbs.ListHcpbWorkersRequest) (*pbs.ListHcpbWorkersResponse, error) { - return ws.scsClient.Load().(pbs.ServerCoordinationServiceClient).ListHcpbWorkers(ctx, req) + return pbs.NewServerCoordinationServiceClient(ws.cc).ListHcpbWorkers(ctx, req) } func (ws *workerProxyServiceServer) LookupSession(ctx context.Context, req *pbs.LookupSessionRequest) (*pbs.LookupSessionResponse, error) { - return ws.ssClient.LookupSession(ctx, req) + return pbs.NewSessionServiceClient(ws.cc).LookupSession(ctx, req) } func (ws *workerProxyServiceServer) CancelSession(ctx context.Context, req *pbs.CancelSessionRequest) (*pbs.CancelSessionResponse, error) { - return ws.ssClient.CancelSession(ctx, req) + return pbs.NewSessionServiceClient(ws.cc).CancelSession(ctx, req) } func (ws *workerProxyServiceServer) ActivateSession(ctx context.Context, req *pbs.ActivateSessionRequest) (*pbs.ActivateSessionResponse, error) { - return ws.ssClient.ActivateSession(ctx, req) + return pbs.NewSessionServiceClient(ws.cc).ActivateSession(ctx, req) } func (ws *workerProxyServiceServer) AuthorizeConnection(ctx context.Context, req *pbs.AuthorizeConnectionRequest) (*pbs.AuthorizeConnectionResponse, error) { - return ws.ssClient.AuthorizeConnection(ctx, req) + return pbs.NewSessionServiceClient(ws.cc).AuthorizeConnection(ctx, req) } func (ws *workerProxyServiceServer) ConnectConnection(ctx context.Context, req *pbs.ConnectConnectionRequest) (*pbs.ConnectConnectionResponse, error) { - return ws.ssClient.ConnectConnection(ctx, req) + return pbs.NewSessionServiceClient(ws.cc).ConnectConnection(ctx, req) } func (ws *workerProxyServiceServer) CloseConnection(ctx context.Context, req *pbs.CloseConnectionRequest) (*pbs.CloseConnectionResponse, error) { - return ws.ssClient.CloseConnection(ctx, req) + return pbs.NewSessionServiceClient(ws.cc).CloseConnection(ctx, req) }