|
|
|
|
@ -50,14 +50,14 @@ import (
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// downstreamRouter defines a min interface which must be met by a
|
|
|
|
|
// Controller.downstreamRoutes field
|
|
|
|
|
type downstreamRouter interface {
|
|
|
|
|
// StartRouteMgmtTicking starts a ticker which manages the router's
|
|
|
|
|
// downstreamReceiver defines a min interface which must be met by a
|
|
|
|
|
// Controller.downstreamConns field
|
|
|
|
|
type downstreamReceiver interface {
|
|
|
|
|
// StartConnectionMgmtTicking starts a ticker which manages the receiver's
|
|
|
|
|
// connections.
|
|
|
|
|
StartConnectionMgmtTicking(context.Context, func() string, int) error
|
|
|
|
|
|
|
|
|
|
// ProcessPendingConnections starts a function that continually processes
|
|
|
|
|
// StartProcessingPendingConnections starts a function that continually processes
|
|
|
|
|
// incoming client connections. This only returns when the provided context
|
|
|
|
|
// is done.
|
|
|
|
|
StartProcessingPendingConnections(context.Context, func() string) error
|
|
|
|
|
@ -72,10 +72,10 @@ type downstreamWorkersTicker interface {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
downstreamRouterFactory func() downstreamRouter
|
|
|
|
|
downstreamReceiverFactory func() downstreamReceiver
|
|
|
|
|
|
|
|
|
|
downstreamersFactory func(context.Context, string, string) (common.Downstreamers, error)
|
|
|
|
|
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamRouter) (downstreamWorkersTicker, error)
|
|
|
|
|
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver) (downstreamWorkersTicker, error)
|
|
|
|
|
commandClientFactory func(context.Context, *Controller) error
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@ -94,7 +94,7 @@ type Controller struct {
|
|
|
|
|
|
|
|
|
|
// downstream workers and routes to those workers
|
|
|
|
|
downstreamWorkers common.Downstreamers
|
|
|
|
|
downstreamRoutes downstreamRouter
|
|
|
|
|
downstreamConns downstreamReceiver
|
|
|
|
|
|
|
|
|
|
apiListeners []*base.ServerListener
|
|
|
|
|
clusterListener *base.ServerListener
|
|
|
|
|
@ -157,8 +157,8 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
|
|
|
|
|
livenessTimeToStale: new(atomic.Int64),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if downstreamRouterFactory != nil {
|
|
|
|
|
c.downstreamRoutes = downstreamRouterFactory()
|
|
|
|
|
if downstreamReceiverFactory != nil {
|
|
|
|
|
c.downstreamConns = downstreamReceiverFactory()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.started.Store(false)
|
|
|
|
|
@ -454,7 +454,7 @@ func (c *Controller) Start() error {
|
|
|
|
|
return errors.Wrap(c.baseContext, err, op)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.downstreamRoutes != nil {
|
|
|
|
|
if c.downstreamConns != nil {
|
|
|
|
|
c.tickerWg.Add(2)
|
|
|
|
|
|
|
|
|
|
servNameFn := func() string {
|
|
|
|
|
@ -467,11 +467,11 @@ func (c *Controller) Start() error {
|
|
|
|
|
}
|
|
|
|
|
go func() {
|
|
|
|
|
defer c.tickerWg.Done()
|
|
|
|
|
c.downstreamRoutes.StartProcessingPendingConnections(c.baseContext, servNameFn)
|
|
|
|
|
c.downstreamConns.StartProcessingPendingConnections(c.baseContext, servNameFn)
|
|
|
|
|
}()
|
|
|
|
|
go func() {
|
|
|
|
|
defer c.tickerWg.Done()
|
|
|
|
|
err := c.downstreamRoutes.StartConnectionMgmtTicking(
|
|
|
|
|
err := c.downstreamConns.StartConnectionMgmtTicking(
|
|
|
|
|
c.baseContext,
|
|
|
|
|
servNameFn,
|
|
|
|
|
-1,
|
|
|
|
|
@ -485,7 +485,7 @@ func (c *Controller) Start() error {
|
|
|
|
|
// we'll use "root" to designate that this is the root of the graph (aka
|
|
|
|
|
// a controller)
|
|
|
|
|
boundVer := version.Get().VersionNumber()
|
|
|
|
|
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamRoutes)
|
|
|
|
|
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error creating downstream workers ticker: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|