|
|
|
|
@ -135,7 +135,18 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger,
|
|
|
|
|
return nil, fmt.Errorf("error instantiating node auth listener: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
w.workerAuthSplitListener = nodeenet.NewSplitListener(interceptingListener)
|
|
|
|
|
w.workerAuthSplitListener, err = nodeenet.NewSplitListener(interceptingListener)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("error instantiating split listener: %w", err)
|
|
|
|
|
}
|
|
|
|
|
workerListener, err := w.workerAuthSplitListener.GetListener(nodeenet.AuthenticatedNonSpecificNextProto)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("error instantiating worker split listener: %w", err)
|
|
|
|
|
}
|
|
|
|
|
nonWorkerListener, err := w.workerAuthSplitListener.GetListener(nodeenet.UnauthenticatedNextProto)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("error instantiating non-worker split listener: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
downstreamServer := grpc.NewServer(
|
|
|
|
|
grpc.MaxRecvMsgSize(math.MaxInt32),
|
|
|
|
|
@ -158,11 +169,11 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger,
|
|
|
|
|
|
|
|
|
|
return func() {
|
|
|
|
|
go w.workerAuthSplitListener.Start()
|
|
|
|
|
go httpServer.Serve(w.workerAuthSplitListener.OtherListener())
|
|
|
|
|
go httpServer.Serve(nonWorkerListener)
|
|
|
|
|
go ln.GrpcServer.Serve(
|
|
|
|
|
&eventingListener{
|
|
|
|
|
ctx: cancelCtx,
|
|
|
|
|
baseLn: w.workerAuthSplitListener.NodeEnrollmentListener(),
|
|
|
|
|
baseLn: workerListener,
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
}, nil
|
|
|
|
|
@ -173,30 +184,8 @@ func (w *Worker) stopServersAndListeners() error {
|
|
|
|
|
mg.Go(w.stopHttpServer)
|
|
|
|
|
mg.Go(w.stopClusterGrpcServer)
|
|
|
|
|
|
|
|
|
|
// FIXME (jeff): For some reason, unlike the controller, the grpc server
|
|
|
|
|
// really likes to hang on closing. Maybe because it's never served a
|
|
|
|
|
// connection? This is a workaround to force it until I can dig in.
|
|
|
|
|
var cancel context.CancelFunc
|
|
|
|
|
if w.workerAuthSplitListener != nil {
|
|
|
|
|
var ctx context.Context
|
|
|
|
|
ctx, cancel = context.WithTimeout(w.baseContext, 2*time.Second)
|
|
|
|
|
go func() {
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
w.workerAuthSplitListener.Stop()
|
|
|
|
|
cancel()
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stopErrors := mg.Wait()
|
|
|
|
|
|
|
|
|
|
if w.workerAuthSplitListener != nil {
|
|
|
|
|
cancel()
|
|
|
|
|
err := w.workerAuthSplitListener.Stop()
|
|
|
|
|
if err != nil {
|
|
|
|
|
stopErrors = multierror.Append(stopErrors, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err := w.stopAnyListeners()
|
|
|
|
|
if err != nil {
|
|
|
|
|
stopErrors = multierror.Append(stopErrors, err)
|
|
|
|
|
@ -240,19 +229,11 @@ func (w *Worker) stopAnyListeners() error {
|
|
|
|
|
if w.proxyListener == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
var closeErrors *multierror.Error
|
|
|
|
|
var err error
|
|
|
|
|
if w.workerAuthSplitListener != nil {
|
|
|
|
|
err = w.workerAuthSplitListener.Stop()
|
|
|
|
|
} else if w.proxyListener.ProxyListener != nil {
|
|
|
|
|
err = w.proxyListener.ProxyListener.Close()
|
|
|
|
|
}
|
|
|
|
|
err = listenerCloseErrorCheck("proxy", err)
|
|
|
|
|
if err != nil {
|
|
|
|
|
closeErrors = multierror.Append(closeErrors, err)
|
|
|
|
|
if w.proxyListener.ProxyListener == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return closeErrors.ErrorOrNil()
|
|
|
|
|
return listenerCloseErrorCheck("proxy", w.proxyListener.ProxyListener.Close())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// listenerCloseErrorCheck does some validation on an error returned
|
|
|
|
|
|