diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index c9d89258dd..3025443530 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -52,7 +52,7 @@ import ( type downstreamRouter interface { // StartRouteMgmtTicking starts a ticker which manages the router's // connections. - StartRouteMgmtTicking(context.Context, func() string, int) error + StartConnectionMgmtTicking(context.Context, func() string, int) error // ProcessPendingConnections starts a function that continually processes // incoming client connections. This only returns when the provided context @@ -468,7 +468,7 @@ func (c *Controller) Start() error { }() go func() { defer c.tickerWg.Done() - err := c.downstreamRoutes.StartRouteMgmtTicking( + err := c.downstreamRoutes.StartConnectionMgmtTicking( c.baseContext, servNameFn, -1, diff --git a/internal/daemon/controller/listeners.go b/internal/daemon/controller/listeners.go index 6f65e18c20..e363bf82a3 100644 --- a/internal/daemon/controller/listeners.go +++ b/internal/daemon/controller/listeners.go @@ -26,7 +26,7 @@ import ( // the function that handles a secondary connection over a provided listener var handleSecondaryConnection = closeListener -func closeListener(_ context.Context, l, _ net.Listener, _ any) error { +func closeListener(_ context.Context, l net.Listener, _ any) error { if l != nil { return l.Close() } @@ -244,7 +244,7 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error return func() { err := handleSecondaryConnection(c.baseContext, metric.InstrumentClusterTrackingListener(multiplexingReverseGrpcListener, "reverse-grpc"), - nil, c.downstreamRoutes) + c.downstreamRoutes) if err != nil { event.WriteError(c.baseContext, op, err, event.WithInfoMsg("handleSecondaryConnection error")) } diff --git a/internal/daemon/worker/handler.go b/internal/daemon/worker/handler.go index 7f652c38be..7f4d2d0d94 100644 --- a/internal/daemon/worker/handler.go +++ b/internal/daemon/worker/handler.go @@ -272,7 +272,7 @@ func (w *Worker) handleProxy(listenerCfg *listenerutil.ListenerConfig, sessionMa } } - pDialer, err := proxyHandlers.GetEndpointDialer(ctx, endpointUrl.Host, workerId, acResp, w.downstreamRoutes) + pDialer, err := proxyHandlers.GetEndpointDialer(ctx, endpointUrl.Host, workerId, acResp, w.downstreamReceiver) if err != nil { conn.Close(proxyHandlers.WebsocketStatusProtocolSetupError, "unable to get endpoint dialer") event.WriteError(ctx, op, err) diff --git a/internal/daemon/worker/listeners.go b/internal/daemon/worker/listeners.go index cb48cd501b..b639d9ad94 100644 --- a/internal/daemon/worker/listeners.go +++ b/internal/daemon/worker/listeners.go @@ -234,7 +234,7 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger, return func() { handleSecondaryConnection(cancelCtx, metric.InstrumentWorkerClusterTrackingListener(revPkiWorkerTrackingListener, "reverse-grpc"), - metric.InstrumentWorkerClusterTrackingListener(dataPlaneProxyTrackingListener, "multihop-proxy-dataplane"), w.downstreamRoutes) + metric.InstrumentWorkerClusterTrackingListener(dataPlaneProxyTrackingListener, "multihop-proxy-dataplane"), w.downstreamReceiver) go w.workerAuthSplitListener.Start() go httpServer.Serve(proxyListener) go ln.GrpcServer.Serve(metric.InstrumentWorkerClusterTrackingListener(pkiWorkerTrackingListener, "grpc")) diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index 2c08855a79..cfd4a48f8d 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -44,15 +44,15 @@ import ( type randFn func(length int) (string, error) -// downstreamRouter defines a min interface which must be met by a -// Worker.downstreamRoutes field -type downstreamRouter interface { - // StartRouteMgmtTicking starts a ticker which manages the router's +// reverseConnReceiver defines a min interface which must be met by a +// Worker.downstreamReceiver field +type reverseConnReceiver interface { + // StartConnectionMgmtTicking starts a ticker which manages the receiver's // connections. - StartRouteMgmtTicking(context.Context, func() string, int) error + StartConnectionMgmtTicking(context.Context, func() string, int) error - // ProcessPendingConnections starts a function that continually processes - // incoming client connections. This only returns when the provided context + // StartProcessingPendingConnections is a function that continually + // processes incoming connections. This only returns when the provided context // is done. StartProcessingPendingConnections(context.Context, func() string) error } @@ -65,9 +65,9 @@ type downstreamers interface { RootId() string } -// downstreamRouterFactory provides a simple factory which a Worker can use to -// create its downstreamRouter -var downstreamRouterFactory func() downstreamRouter +// reverseConnReceiverFactory provides a simple factory which a Worker can use to +// create its reverseConnReceiver +var reverseConnReceiverFactory func() reverseConnReceiver var initializeReverseGrpcClientCollectors = noopInitializePromCollectors @@ -127,8 +127,8 @@ type Worker struct { workerAuthSplitListener *nodeenet.SplitListener // downstream workers and routes to those workers - downstreamWorkers downstreamers - downstreamRoutes downstreamRouter + downstreamWorkers downstreamers + downstreamReceiver reverseConnReceiver // Timing variables. These are atomics for SIGHUP support, and are int64 // because they are casted to time.Duration. @@ -170,8 +170,8 @@ func New(conf *Config) (*Worker, error) { statusCallTimeoutDuration: new(atomic.Int64), } - if downstreamRouterFactory != nil { - w.downstreamRoutes = downstreamRouterFactory() + if reverseConnReceiverFactory != nil { + w.downstreamReceiver = reverseConnReceiverFactory() } w.lastStatusSuccess.Store((*LastStatusInformation)(nil)) @@ -437,7 +437,7 @@ func (w *Worker) Start() error { w.startAuthRotationTicking(w.baseContext) }() - if w.downstreamRoutes != nil { + if w.downstreamReceiver != nil { w.tickerWg.Add(2) servNameFn := func() string { if s := w.LastStatusSuccess(); s != nil { @@ -447,13 +447,13 @@ func (w *Worker) Start() error { } go func() { defer w.tickerWg.Done() - if err := w.downstreamRoutes.StartProcessingPendingConnections(w.baseContext, servNameFn); err != nil { + if err := w.downstreamReceiver.StartProcessingPendingConnections(w.baseContext, servNameFn); err != nil { errors.Wrap(w.baseContext, err, op) } }() go func() { defer w.tickerWg.Done() - err := w.downstreamRoutes.StartRouteMgmtTicking( + err := w.downstreamReceiver.StartConnectionMgmtTicking( w.baseContext, servNameFn, -1, // indicates the ticker should run until cancelled.