Rename router to receiver for secondary connections (#2816)

pull/2725/head
Todd 3 years ago committed by GitHub
parent b359561e51
commit 87b09d5e27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -52,7 +52,7 @@ import (
type downstreamRouter interface {
// StartRouteMgmtTicking starts a ticker which manages the router's
// connections.
StartRouteMgmtTicking(context.Context, func() string, int) error
StartConnectionMgmtTicking(context.Context, func() string, int) error
// ProcessPendingConnections starts a function that continually processes
// incoming client connections. This only returns when the provided context
@ -468,7 +468,7 @@ func (c *Controller) Start() error {
}()
go func() {
defer c.tickerWg.Done()
err := c.downstreamRoutes.StartRouteMgmtTicking(
err := c.downstreamRoutes.StartConnectionMgmtTicking(
c.baseContext,
servNameFn,
-1,

@ -26,7 +26,7 @@ import (
// the function that handles a secondary connection over a provided listener
var handleSecondaryConnection = closeListener
func closeListener(_ context.Context, l, _ net.Listener, _ any) error {
func closeListener(_ context.Context, l net.Listener, _ any) error {
if l != nil {
return l.Close()
}
@ -244,7 +244,7 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error
return func() {
err := handleSecondaryConnection(c.baseContext, metric.InstrumentClusterTrackingListener(multiplexingReverseGrpcListener, "reverse-grpc"),
nil, c.downstreamRoutes)
c.downstreamRoutes)
if err != nil {
event.WriteError(c.baseContext, op, err, event.WithInfoMsg("handleSecondaryConnection error"))
}

@ -272,7 +272,7 @@ func (w *Worker) handleProxy(listenerCfg *listenerutil.ListenerConfig, sessionMa
}
}
pDialer, err := proxyHandlers.GetEndpointDialer(ctx, endpointUrl.Host, workerId, acResp, w.downstreamRoutes)
pDialer, err := proxyHandlers.GetEndpointDialer(ctx, endpointUrl.Host, workerId, acResp, w.downstreamReceiver)
if err != nil {
conn.Close(proxyHandlers.WebsocketStatusProtocolSetupError, "unable to get endpoint dialer")
event.WriteError(ctx, op, err)

@ -234,7 +234,7 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger,
return func() {
handleSecondaryConnection(cancelCtx, metric.InstrumentWorkerClusterTrackingListener(revPkiWorkerTrackingListener, "reverse-grpc"),
metric.InstrumentWorkerClusterTrackingListener(dataPlaneProxyTrackingListener, "multihop-proxy-dataplane"), w.downstreamRoutes)
metric.InstrumentWorkerClusterTrackingListener(dataPlaneProxyTrackingListener, "multihop-proxy-dataplane"), w.downstreamReceiver)
go w.workerAuthSplitListener.Start()
go httpServer.Serve(proxyListener)
go ln.GrpcServer.Serve(metric.InstrumentWorkerClusterTrackingListener(pkiWorkerTrackingListener, "grpc"))

@ -44,15 +44,15 @@ import (
type randFn func(length int) (string, error)
// downstreamRouter defines a min interface which must be met by a
// Worker.downstreamRoutes field
type downstreamRouter interface {
// StartRouteMgmtTicking starts a ticker which manages the router's
// reverseConnReceiver defines a min interface which must be met by a
// Worker.downstreamReceiver field
type reverseConnReceiver interface {
// StartConnectionMgmtTicking starts a ticker which manages the receiver's
// connections.
StartRouteMgmtTicking(context.Context, func() string, int) error
StartConnectionMgmtTicking(context.Context, func() string, int) error
// ProcessPendingConnections starts a function that continually processes
// incoming client connections. This only returns when the provided context
// StartProcessingPendingConnections is a function that continually
// processes incoming connections. This only returns when the provided context
// is done.
StartProcessingPendingConnections(context.Context, func() string) error
}
@ -65,9 +65,9 @@ type downstreamers interface {
RootId() string
}
// downstreamRouterFactory provides a simple factory which a Worker can use to
// create its downstreamRouter
var downstreamRouterFactory func() downstreamRouter
// reverseConnReceiverFactory provides a simple factory which a Worker can use to
// create its reverseConnReceiver
var reverseConnReceiverFactory func() reverseConnReceiver
var initializeReverseGrpcClientCollectors = noopInitializePromCollectors
@ -127,8 +127,8 @@ type Worker struct {
workerAuthSplitListener *nodeenet.SplitListener
// downstream workers and routes to those workers
downstreamWorkers downstreamers
downstreamRoutes downstreamRouter
downstreamWorkers downstreamers
downstreamReceiver reverseConnReceiver
// Timing variables. These are atomics for SIGHUP support, and are int64
// because they are casted to time.Duration.
@ -170,8 +170,8 @@ func New(conf *Config) (*Worker, error) {
statusCallTimeoutDuration: new(atomic.Int64),
}
if downstreamRouterFactory != nil {
w.downstreamRoutes = downstreamRouterFactory()
if reverseConnReceiverFactory != nil {
w.downstreamReceiver = reverseConnReceiverFactory()
}
w.lastStatusSuccess.Store((*LastStatusInformation)(nil))
@ -437,7 +437,7 @@ func (w *Worker) Start() error {
w.startAuthRotationTicking(w.baseContext)
}()
if w.downstreamRoutes != nil {
if w.downstreamReceiver != nil {
w.tickerWg.Add(2)
servNameFn := func() string {
if s := w.LastStatusSuccess(); s != nil {
@ -447,13 +447,13 @@ func (w *Worker) Start() error {
}
go func() {
defer w.tickerWg.Done()
if err := w.downstreamRoutes.StartProcessingPendingConnections(w.baseContext, servNameFn); err != nil {
if err := w.downstreamReceiver.StartProcessingPendingConnections(w.baseContext, servNameFn); err != nil {
errors.Wrap(w.baseContext, err, op)
}
}()
go func() {
defer w.tickerWg.Done()
err := w.downstreamRoutes.StartRouteMgmtTicking(
err := w.downstreamReceiver.StartConnectionMgmtTicking(
w.baseContext,
servNameFn,
-1, // indicates the ticker should run until cancelled.

Loading…
Cancel
Save