Expand the recorder manager interface

pull/3251/head
Todd 3 years ago committed by Timothy Messier
parent ba0dd857f3
commit 59ae4b5901
No known key found for this signature in database
GPG Key ID: EFD2F184F7600572

@ -262,7 +262,6 @@ func (w *Worker) createClientConn(addr string) error {
}
w.GrpcClientConn = cc
w.controllerStatusConn.Store(pbs.NewServerCoordinationServiceClient(cc))
w.controllerMultihopConn.Store(multihop.NewMultihopServiceClient(cc))
var producer handlers.UpstreamMessageServiceClientProducer

@ -37,7 +37,7 @@ func registerWorkerStatusSessionService(ctx context.Context, w *Worker, server *
return fmt.Errorf("%s: server is nil", op)
}
statusSessionService := NewWorkerProxyServiceServer(w.GrpcClientConn, w.controllerStatusConn)
statusSessionService := NewWorkerProxyServiceServer(w.GrpcClientConn)
pbs.RegisterServerCoordinationServiceServer(server, statusSessionService)
pbs.RegisterSessionServiceServer(server, statusSessionService)
return nil

@ -33,7 +33,7 @@ type LastStatusInformation struct {
LastCalculatedUpstreams []string
}
func (w *Worker) startStatusTicking(cancelCtx context.Context, sessionManager session.Manager, addrReceivers *[]addressReceiver) {
func (w *Worker) startStatusTicking(cancelCtx context.Context, sessionManager session.Manager, addrReceivers *[]addressReceiver, recorderManager recorderManager) {
const op = "worker.(Worker).startStatusTicking"
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// This function exists to desynchronize calls to controllers from
@ -65,7 +65,7 @@ func (w *Worker) startStatusTicking(cancelCtx context.Context, sessionManager se
continue
}
w.sendWorkerStatus(cancelCtx, sessionManager, addrReceivers)
w.sendWorkerStatus(cancelCtx, sessionManager, addrReceivers, recorderManager)
timer.Reset(getRandomInterval())
}
}
@ -113,15 +113,41 @@ func (w *Worker) WaitForNextSuccessfulStatusUpdate() error {
return nil
}
func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager session.Manager, addressReceivers *[]addressReceiver) {
func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager session.Manager, addressReceivers *[]addressReceiver, recorderManager recorderManager) {
const op = "worker.(Worker).sendWorkerStatus"
w.statusLock.Lock()
defer w.statusLock.Unlock()
// Collect the different session ids that are being monitored by this worker
var monitoredSessionIds []string
if recorderManager != nil {
recSessIds, err := recorderManager.SessionsManaged(cancelCtx)
if err != nil {
event.WriteError(cancelCtx, op, err, event.WithInfoMsg("error getting session ids from recorderManager"))
} else {
monitoredSessionIds = append(monitoredSessionIds, recSessIds...)
}
}
// First send info as-is. We'll perform cleanup duties after we
// get cancel/job change info back.
var activeJobs []*pbs.JobStatus
for _, sid := range monitoredSessionIds {
activeJobs = append(activeJobs, &pbs.JobStatus{
Job: &pbs.Job{
Type: pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION,
JobInfo: &pbs.Job_MonitorSessionInfo{
MonitorSessionInfo: &pbs.MonitorSessionJobInfo{
SessionId: sid,
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
},
},
},
})
}
// Range over known sessions and collect info
sessionManager.ForEachLocalSession(func(s session.Session) bool {
var jobInfo pbs.SessionJobInfo
@ -154,7 +180,7 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
})
// Send status information
client := w.controllerStatusConn.Load().(pbs.ServerCoordinationServiceClient)
client := pbs.NewServerCoordinationServiceClient(w.GrpcClientConn)
var tags []*pb.TagPair
// If we're not going to request a tag update, no reason to have these
// marshaled on every status call.
@ -283,10 +309,17 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now(), LastCalculatedUpstreams: addrs})
var nonActiveMonitoredSessionIds []string
for _, request := range result.GetJobsRequests() {
switch request.GetRequestType() {
case pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE:
switch request.GetJob().GetType() {
case pbs.JOBTYPE_JOBTYPE_MONITOR_SESSION:
si := request.GetJob().GetMonitorSessionInfo()
if si != nil && si.Status != pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE {
nonActiveMonitoredSessionIds = append(nonActiveMonitoredSessionIds, si.GetSessionId())
}
case pbs.JOBTYPE_JOBTYPE_SESSION:
sessInfo := request.GetJob().GetSessionInfo()
sessionId := sessInfo.GetSessionId()
@ -307,6 +340,11 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
}
}
}
if recorderManager != nil {
if err := recorderManager.ReauthorizeAllExcept(cancelCtx, nonActiveMonitoredSessionIds); err != nil {
event.WriteError(cancelCtx, op, err)
}
}
// Standard cleanup: Run through current jobs. Cancel connections
// for any canceling session or any session that is expired.

@ -77,7 +77,15 @@ type downstreamers interface {
// recorderManager updates the status updates with relevant recording
// information
type recorderManager any
type recorderManager interface {
// ReauthorizeAllExcept should be called with the result of the status update
// to reauthorize all recorders for the relevant sessions except the ones provided
ReauthorizeAllExcept(ctx context.Context, closedSessions []string) error
// SessionsManaged gets the list of session ids managed by this recorderManager
SessionsManaged(ctx context.Context) ([]string, error)
// Shutdown must be called prior to exiting the process
Shutdown(ctx context.Context)
}
// reverseConnReceiverFactory provides a simple factory which a Worker can use to
// create its reverseConnReceiver
@ -119,11 +127,10 @@ type Worker struct {
recorderManager recorderManager
controllerStatusConn *atomic.Value
everAuthenticated *ua.Uint32
lastStatusSuccess *atomic.Value
workerStartTime time.Time
operationalState *atomic.Value
everAuthenticated *ua.Uint32
lastStatusSuccess *atomic.Value
workerStartTime time.Time
operationalState *atomic.Value
controllerMultihopConn *atomic.Value
@ -185,7 +192,6 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
conf: conf,
logger: conf.Logger.Named("worker"),
started: ua.NewBool(false),
controllerStatusConn: new(atomic.Value),
everAuthenticated: ua.NewUint32(authenticationStatusNeverAuthenticated),
lastStatusSuccess: new(atomic.Value),
controllerMultihopConn: new(atomic.Value),
@ -200,6 +206,8 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
statusCallTimeoutDuration: new(atomic.Int64),
}
w.operationalState.Store(server.UnknownOperationalState)
if reverseConnReceiverFactory != nil {
w.downstreamReceiver = reverseConnReceiverFactory()
}
@ -358,8 +366,6 @@ func (w *Worker) Start() error {
return nil
}
w.operationalState.Store(server.UnknownOperationalState)
if !w.conf.RawConfig.Worker.UseDeprecatedKmsAuthMethod {
// In this section, we look for existing worker credentials. The two
// variables below store whether to create new credentials and whether to
@ -521,7 +527,7 @@ func (w *Worker) Start() error {
w.tickerWg.Add(2)
go func() {
defer w.tickerWg.Done()
w.startStatusTicking(w.baseContext, w.sessionManager, &w.addressReceivers)
w.startStatusTicking(w.baseContext, w.sessionManager, &w.addressReceivers, w.recorderManager)
}()
go func() {
defer w.tickerWg.Done()
@ -614,6 +620,18 @@ func (w *Worker) Shutdown() error {
return fmt.Errorf("error stopping worker servers and listeners: %w", err)
}
var recManWg sync.WaitGroup
if w.recorderManager != nil {
recManWg.Add(1)
go func() {
// Shutdown recorder manager to close all recorders, done in a go routine
// since it will not force shutdown of channels until the passed in context
// is Done.
defer recManWg.Done()
w.recorderManager.Shutdown(w.baseContext)
}()
}
// Shut down all connections.
w.cleanupConnections(w.baseContext, true, w.sessionManager)
@ -644,6 +662,7 @@ func (w *Worker) Shutdown() error {
w.started.Store(false)
w.tickerWg.Wait()
recManWg.Wait()
if w.conf.Eventer != nil {
if err := w.conf.Eventer.FlushNodes(context.Background()); err != nil {
return fmt.Errorf("error flushing worker eventer nodes: %w", err)

@ -5,7 +5,6 @@ package worker
import (
"context"
"sync/atomic"
pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"google.golang.org/grpc"
@ -15,8 +14,7 @@ type workerProxyServiceServer struct {
pbs.UnsafeServerCoordinationServiceServer
pbs.UnsafeSessionServiceServer
scsClient *atomic.Value
ssClient pbs.SessionServiceClient
cc *grpc.ClientConn
}
var (
@ -26,16 +24,14 @@ var (
func NewWorkerProxyServiceServer(
cc *grpc.ClientConn,
scsClient *atomic.Value,
) *workerProxyServiceServer {
return &workerProxyServiceServer{
scsClient: scsClient,
ssClient: pbs.NewSessionServiceClient(cc),
cc: cc,
}
}
func (ws *workerProxyServiceServer) Status(ctx context.Context, req *pbs.StatusRequest) (*pbs.StatusResponse, error) {
resp, err := ws.scsClient.Load().(pbs.ServerCoordinationServiceClient).Status(ctx, req)
resp, err := pbs.NewServerCoordinationServiceClient(ws.cc).Status(ctx, req)
if resp != nil {
// We don't currently support distributing new addreses to workers
@ -47,29 +43,29 @@ func (ws *workerProxyServiceServer) Status(ctx context.Context, req *pbs.StatusR
}
func (ws *workerProxyServiceServer) ListHcpbWorkers(ctx context.Context, req *pbs.ListHcpbWorkersRequest) (*pbs.ListHcpbWorkersResponse, error) {
return ws.scsClient.Load().(pbs.ServerCoordinationServiceClient).ListHcpbWorkers(ctx, req)
return pbs.NewServerCoordinationServiceClient(ws.cc).ListHcpbWorkers(ctx, req)
}
func (ws *workerProxyServiceServer) LookupSession(ctx context.Context, req *pbs.LookupSessionRequest) (*pbs.LookupSessionResponse, error) {
return ws.ssClient.LookupSession(ctx, req)
return pbs.NewSessionServiceClient(ws.cc).LookupSession(ctx, req)
}
func (ws *workerProxyServiceServer) CancelSession(ctx context.Context, req *pbs.CancelSessionRequest) (*pbs.CancelSessionResponse, error) {
return ws.ssClient.CancelSession(ctx, req)
return pbs.NewSessionServiceClient(ws.cc).CancelSession(ctx, req)
}
func (ws *workerProxyServiceServer) ActivateSession(ctx context.Context, req *pbs.ActivateSessionRequest) (*pbs.ActivateSessionResponse, error) {
return ws.ssClient.ActivateSession(ctx, req)
return pbs.NewSessionServiceClient(ws.cc).ActivateSession(ctx, req)
}
func (ws *workerProxyServiceServer) AuthorizeConnection(ctx context.Context, req *pbs.AuthorizeConnectionRequest) (*pbs.AuthorizeConnectionResponse, error) {
return ws.ssClient.AuthorizeConnection(ctx, req)
return pbs.NewSessionServiceClient(ws.cc).AuthorizeConnection(ctx, req)
}
func (ws *workerProxyServiceServer) ConnectConnection(ctx context.Context, req *pbs.ConnectConnectionRequest) (*pbs.ConnectConnectionResponse, error) {
return ws.ssClient.ConnectConnection(ctx, req)
return pbs.NewSessionServiceClient(ws.cc).ConnectConnection(ctx, req)
}
func (ws *workerProxyServiceServer) CloseConnection(ctx context.Context, req *pbs.CloseConnectionRequest) (*pbs.CloseConnectionResponse, error) {
return ws.ssClient.CloseConnection(ctx, req)
return pbs.NewSessionServiceClient(ws.cc).CloseConnection(ctx, req)
}

Loading…
Cancel
Save