From 70e20b5cf54ec6b3ac8a9ffef8ed630c0b5418fa Mon Sep 17 00:00:00 2001 From: Irena Rindos Date: Thu, 17 Nov 2022 14:51:29 -0500 Subject: [PATCH] revert merge llb-multihop-sessions (#2628) --- go.mod | 2 +- go.sum | 4 ++-- internal/daemon/common/const.go | 5 +---- internal/daemon/controller/controller.go | 13 +++++------- internal/daemon/controller/listeners.go | 12 ++++++----- internal/daemon/worker/addressreceiver.go | 8 ++++---- .../daemon/worker/controller_connection.go | 14 +++---------- internal/daemon/worker/listeners.go | 20 +++++-------------- internal/daemon/worker/status.go | 8 ++++---- internal/daemon/worker/worker.go | 6 ++---- 10 files changed, 34 insertions(+), 58 deletions(-) diff --git a/go.mod b/go.mod index fba6bde927..4f3d47bf43 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,7 @@ require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/cenkalti/backoff/v4 v4.1.0 github.com/hashicorp/go-kms-wrapping/extras/kms/v2 v2.0.0-20220914160710-1c6d04de2431 - github.com/hashicorp/nodeenrollment v0.1.18 + github.com/hashicorp/nodeenrollment v0.1.17 github.com/kelseyhightower/envconfig v1.4.0 golang.org/x/exp v0.0.0-20220921164117-439092de6870 golang.org/x/net v0.0.0-20220722155237-a158d28d115b diff --git a/go.sum b/go.sum index 0da453bb45..55a2f851db 100644 --- a/go.sum +++ b/go.sum @@ -743,8 +743,8 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/hashicorp/nodeenrollment v0.1.18 h1:fqFMZ5e2cWupeQDxKledF9DGHsk9GoJl7K2Wrdiys6k= -github.com/hashicorp/nodeenrollment v0.1.18/go.mod h1:N5gYsm8mWiDfIw/j+1IQ6NBO1cWCmhPpvQ9GB1QUnsU= +github.com/hashicorp/nodeenrollment v0.1.17 h1:ZaGNugd3EOZIdJxgC5bUA1CbHZ/OKZcgXnquzoFql6E= +github.com/hashicorp/nodeenrollment v0.1.17/go.mod h1:N5gYsm8mWiDfIw/j+1IQ6NBO1cWCmhPpvQ9GB1QUnsU= github.com/hashicorp/vault/api v1.3.1 h1:pkDkcgTh47PRjY1NEFeofqR4W/HkNUi9qIakESO2aRM= github.com/hashicorp/vault/api v1.3.1/go.mod h1:QeJoWxMFt+MsuWcYhmwRLwKEXrjwAFFywzhptMsTIUw= github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M= diff --git a/internal/daemon/common/const.go b/internal/daemon/common/const.go index 6c8c96046f..27b5e2925c 100644 --- a/internal/daemon/common/const.go +++ b/internal/daemon/common/const.go @@ -1,6 +1,3 @@ package common -const ( - ReverseGrpcConnectionAlpnValue = "the-downstream-dialer-plays-an-uno-reverse-card" - WebsocketProxyingAlpnValue = "i-herd-you-like-proxies-so-i-put-a-proxy-in-your-proxy" -) +const ReverseGrpcConnectionAlpnValue = "the-downstream-dialer-plays-an-uno-reverse-card" diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index f92e3175b9..90d147d78b 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -39,7 +39,6 @@ import ( host_plugin_assets "github.com/hashicorp/boundary/plugins/host" "github.com/hashicorp/boundary/sdk/pbs/plugin" external_host_plugins "github.com/hashicorp/boundary/sdk/plugins/host" - "github.com/hashicorp/boundary/version" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-secure-stdlib/mlock" "github.com/hashicorp/go-secure-stdlib/pluginutil/v2" @@ -57,7 +56,7 @@ type downstreamRouter interface { // ProcessPendingConnections starts a function that continually processes // incoming client connections. This only returns when the provided context // is done. - StartProcessingPendingConnections(context.Context, func() string) error + StartProcessingPendingConnections(context.Context, func() string) } // downstreamWorkersTicker defines an interface for a ticker that maintains the @@ -79,8 +78,8 @@ type downstreamers interface { var ( downstreamRouterFactory func() downstreamRouter - downstreamersFactory func(context.Context, string, string) (downstreamers, error) - downstreamWorkersTickerFactory func(context.Context, string, string, downstreamers, downstreamRouter) (downstreamWorkersTicker, error) + downstreamersFactory func(context.Context, string) (downstreamers, error) + downstreamWorkersTickerFactory func(context.Context, string, downstreamers, downstreamRouter) (downstreamWorkersTicker, error) commandClientFactory func(context.Context, *Controller) error ) @@ -398,8 +397,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { } if downstreamersFactory != nil { - boundVer := version.Get().VersionNumber() - c.downstreamWorkers, err = downstreamersFactory(ctx, "root", boundVer) + c.downstreamWorkers, err = downstreamersFactory(ctx, "root") if err != nil { return nil, fmt.Errorf("unable to initialize downstream workers graph: %w", err) } @@ -489,8 +487,7 @@ func (c *Controller) Start() error { if downstreamWorkersTickerFactory != nil { // we'll use "root" to designate that this is the root of the graph (aka // a controller) - boundVer := version.Get().VersionNumber() - dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamRoutes) + dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", c.downstreamWorkers, c.downstreamRoutes) if err != nil { return fmt.Errorf("error creating downstream workers ticker: %w", err) } diff --git a/internal/daemon/controller/listeners.go b/internal/daemon/controller/listeners.go index 6f4a273367..32b6af46ce 100644 --- a/internal/daemon/controller/listeners.go +++ b/internal/daemon/controller/listeners.go @@ -26,7 +26,7 @@ import ( // the function that handles a secondary connection over a provided listener var handleSecondaryConnection = closeListener -func closeListener(_ context.Context, l, _ net.Listener, _ any) error { +func closeListener(_ context.Context, l net.Listener, _ any, _ int) error { if l != nil { return l.Close() } @@ -242,16 +242,18 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error ln.GrpcServer = workerServer return func() { - err := handleSecondaryConnection(c.baseContext, multiplexingReverseGrpcListener, nil, c.downstreamRoutes) - if err != nil { - event.WriteError(c.baseContext, op, err, event.WithInfoMsg("handleSecondaryConnection error")) - } go func() { err := splitListener.Start() if err != nil { event.WriteError(c.baseContext, op, err, event.WithInfoMsg("splitListener.Start() error")) } }() + go func() { + err := handleSecondaryConnection(c.baseContext, multiplexingReverseGrpcListener, c.downstreamRoutes, -1) + if err != nil { + event.WriteError(c.baseContext, op, err, event.WithInfoMsg("handleSecondaryConnection error")) + } + }() go func() { err := ln.GrpcServer.Serve(multiplexingAuthedListener) if err != nil { diff --git a/internal/daemon/worker/addressreceiver.go b/internal/daemon/worker/addressreceiver.go index 68a4636b69..b86136dbfb 100644 --- a/internal/daemon/worker/addressreceiver.go +++ b/internal/daemon/worker/addressreceiver.go @@ -16,9 +16,9 @@ func noopAddressReceivers(context.Context, *Worker) ([]addressReceiver, error) { type receiverType uint const ( - UnknownReceiverType receiverType = iota - grpcResolverReceiverType - secondaryConnectionReceiverType + UnknownReceiverType receiverType = 0 + grpcResolverReceiverType receiverType = 1 + dialingListenerReceiverType receiverType = 2 ) // String returns a string representation of the receiverType @@ -26,7 +26,7 @@ func (s receiverType) String() string { return [...]string{ "unknown", "grpcResolver", - "secondaryConnections", + "dialingListener", }[s] } diff --git a/internal/daemon/worker/controller_connection.go b/internal/daemon/worker/controller_connection.go index 30e331d19b..e8bb3355f0 100644 --- a/internal/daemon/worker/controller_connection.go +++ b/internal/daemon/worker/controller_connection.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/resolver" "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/structpb" ) const hcpbUrlSuffix = ".proxy.boundary.hashicorp.cloud:9202" @@ -78,14 +77,8 @@ func (w *Worker) StartControllerConnections() error { return nil } -// upstreamDialerFunc dials an upstream server. extraAlpnProtos can be provided -// to kms and pki connections and are used for identifying, on the server side, -// the intended purpose of the connection. upstreamDialerFunc takes an optional -// state struct which, if the connection is made using the node enrolment -// library, sends the state to the server and can be retrieved from the -// resulting protocol.Conn. -func (w *Worker) upstreamDialerFunc(state *structpb.Struct, extraAlpnProtos ...string) func(context.Context, string) (net.Conn, error) { - const op = "worker.(Worker).upstreamDialerFunc" +func (w *Worker) controllerDialerFunc(extraAlpnProtos ...string) func(context.Context, string) (net.Conn, error) { + const op = "worker.(Worker).controllerDialerFunc" return func(ctx context.Context, addr string) (net.Conn, error) { var conn net.Conn var err error @@ -97,7 +90,6 @@ func (w *Worker) upstreamDialerFunc(state *structpb.Struct, extraAlpnProtos ...s ctx, w.WorkerAuthStorage, addr, - nodeenrollment.WithState(state), nodeenrollment.WithWrapper(w.conf.WorkerAuthStorageKms), nodeenrollment.WithExtraAlpnProtos(extraAlpnProtos), // If the activation token hasn't been populated, this won't do @@ -210,7 +202,7 @@ func (w *Worker) createClientConn(addr string) error { grpc.WithUnaryInterceptor(metric.InstrumentClusterClient()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(math.MaxInt32)), - grpc.WithContextDialer(w.upstreamDialerFunc(nil)), + grpc.WithContextDialer(w.controllerDialerFunc()), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(defServiceConfig), // Don't have the resolver reach out for a service config from the diff --git a/internal/daemon/worker/listeners.go b/internal/daemon/worker/listeners.go index 1a2eccef19..e091f738ef 100644 --- a/internal/daemon/worker/listeners.go +++ b/internal/daemon/worker/listeners.go @@ -32,12 +32,9 @@ import ( // the function that handles a secondary connection over a provided listener var handleSecondaryConnection = closeListener -func closeListener(_ context.Context, l, l2 net.Listener, _ any) error { +func closeListener(_ context.Context, l net.Listener, _ any, _ int) error { if l != nil { - l.Close() - } - if l2 != nil { - l2.Close() + return l.Close() } return nil } @@ -171,16 +168,9 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger, // Connections coming in here are authed by nodeenrollment and are for the // reverse grpc purpose - reverseGrpcListener, err := w.workerAuthSplitListener.GetListener(common.ReverseGrpcConnectionAlpnValue, nodee.WithNativeConns(true)) - if err != nil { - return nil, fmt.Errorf("error instantiating reverse grpc split listener: %w", err) - } - - // Connections coming in here are authed by nodeenrollment and are for the - // multi-hop session-proxying - websocketProxyingListener, err := w.workerAuthSplitListener.GetListener(common.WebsocketProxyingAlpnValue, nodee.WithNativeConns(true)) + reverseGrpcListener, err := w.workerAuthSplitListener.GetListener(common.ReverseGrpcConnectionAlpnValue) if err != nil { - return nil, fmt.Errorf("error instantiating websocket proxying split listener: %w", err) + return nil, fmt.Errorf("error instantiating non-worker split listener: %w", err) } // This wraps the reverse grpc pki worker connections with a listener which @@ -223,10 +213,10 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger, } return func() { - handleSecondaryConnection(cancelCtx, revPkiWorkerTrackingListener, websocketProxyingListener, w.downstreamRoutes) go w.workerAuthSplitListener.Start() go httpServer.Serve(proxyListener) go ln.GrpcServer.Serve(pkiWorkerTrackingListener) + go handleSecondaryConnection(cancelCtx, revPkiWorkerTrackingListener, w.downstreamRoutes, -1) }, nil } diff --git a/internal/daemon/worker/status.go b/internal/daemon/worker/status.go index b59cb80e90..09c1f1b629 100644 --- a/internal/daemon/worker/status.go +++ b/internal/daemon/worker/status.go @@ -20,7 +20,7 @@ import ( var firstStatusCheckPostHooks []func(context.Context, *Worker) error -var downstreamWorkersFactory func(ctx context.Context, workerId string, ver string) (downstreamers, error) +var downstreamWorkersFactory func(ctx context.Context, workerId string) (downstreamers, error) type LastStatusInformation struct { *pbs.StatusResponse @@ -261,10 +261,10 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess } // regardless of whether or not it's a new address, we need to set - // them for secondaryConnections + // them for dialingListeners for _, as := range *addressReceivers { switch { - case as.Type() == secondaryConnectionReceiverType: + case as.Type() == dialingListenerReceiverType: tmpAddrs := make([]string, len(addrs)) copy(tmpAddrs, addrs) if len(tmpAddrs) == 0 { @@ -308,7 +308,7 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess // If we have post hooks for after the first status check, run them now if w.everAuthenticated.CAS(authenticationStatusFirstAuthentication, authenticationStatusFirstStatusRpcSuccessful) { if downstreamWorkersFactory != nil { - w.downstreamWorkers, err = downstreamWorkersFactory(cancelCtx, w.LastStatusSuccess().WorkerId, versionInfo.VersionNumber()) + w.downstreamWorkers, err = downstreamWorkersFactory(cancelCtx, w.LastStatusSuccess().WorkerId) if err != nil { event.WriteError(cancelCtx, op, err) w.conf.ServerSideShutdownCh <- struct{}{} diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index 8e2a492560..1d31742491 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -53,7 +53,7 @@ type downstreamRouter interface { // ProcessPendingConnections starts a function that continually processes // incoming client connections. This only returns when the provided context // is done. - StartProcessingPendingConnections(context.Context, func() string) error + StartProcessingPendingConnections(context.Context, func() string) } // downstreamers provides at least a minimum interface that must be met by a @@ -447,9 +447,7 @@ func (w *Worker) Start() error { } go func() { defer w.tickerWg.Done() - if err := w.downstreamRoutes.StartProcessingPendingConnections(w.baseContext, servNameFn); err != nil { - errors.Wrap(w.baseContext, err, op) - } + w.downstreamRoutes.StartProcessingPendingConnections(w.baseContext, servNameFn) }() go func() { defer w.tickerWg.Done()