revert merge llb-multihop-sessions (#2628)

pull/2631/head
Irena Rindos 4 years ago committed by GitHub
parent edd323b73a
commit 70e20b5cf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -93,7 +93,7 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/cenkalti/backoff/v4 v4.1.0
github.com/hashicorp/go-kms-wrapping/extras/kms/v2 v2.0.0-20220914160710-1c6d04de2431
github.com/hashicorp/nodeenrollment v0.1.18
github.com/hashicorp/nodeenrollment v0.1.17
github.com/kelseyhightower/envconfig v1.4.0
golang.org/x/exp v0.0.0-20220921164117-439092de6870
golang.org/x/net v0.0.0-20220722155237-a158d28d115b

@ -743,8 +743,8 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/nodeenrollment v0.1.18 h1:fqFMZ5e2cWupeQDxKledF9DGHsk9GoJl7K2Wrdiys6k=
github.com/hashicorp/nodeenrollment v0.1.18/go.mod h1:N5gYsm8mWiDfIw/j+1IQ6NBO1cWCmhPpvQ9GB1QUnsU=
github.com/hashicorp/nodeenrollment v0.1.17 h1:ZaGNugd3EOZIdJxgC5bUA1CbHZ/OKZcgXnquzoFql6E=
github.com/hashicorp/nodeenrollment v0.1.17/go.mod h1:N5gYsm8mWiDfIw/j+1IQ6NBO1cWCmhPpvQ9GB1QUnsU=
github.com/hashicorp/vault/api v1.3.1 h1:pkDkcgTh47PRjY1NEFeofqR4W/HkNUi9qIakESO2aRM=
github.com/hashicorp/vault/api v1.3.1/go.mod h1:QeJoWxMFt+MsuWcYhmwRLwKEXrjwAFFywzhptMsTIUw=
github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M=

@ -1,6 +1,3 @@
package common
const (
ReverseGrpcConnectionAlpnValue = "the-downstream-dialer-plays-an-uno-reverse-card"
WebsocketProxyingAlpnValue = "i-herd-you-like-proxies-so-i-put-a-proxy-in-your-proxy"
)
const ReverseGrpcConnectionAlpnValue = "the-downstream-dialer-plays-an-uno-reverse-card"

@ -39,7 +39,6 @@ import (
host_plugin_assets "github.com/hashicorp/boundary/plugins/host"
"github.com/hashicorp/boundary/sdk/pbs/plugin"
external_host_plugins "github.com/hashicorp/boundary/sdk/plugins/host"
"github.com/hashicorp/boundary/version"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/mlock"
"github.com/hashicorp/go-secure-stdlib/pluginutil/v2"
@ -57,7 +56,7 @@ type downstreamRouter interface {
// ProcessPendingConnections starts a function that continually processes
// incoming client connections. This only returns when the provided context
// is done.
StartProcessingPendingConnections(context.Context, func() string) error
StartProcessingPendingConnections(context.Context, func() string)
}
// downstreamWorkersTicker defines an interface for a ticker that maintains the
@ -79,8 +78,8 @@ type downstreamers interface {
var (
downstreamRouterFactory func() downstreamRouter
downstreamersFactory func(context.Context, string, string) (downstreamers, error)
downstreamWorkersTickerFactory func(context.Context, string, string, downstreamers, downstreamRouter) (downstreamWorkersTicker, error)
downstreamersFactory func(context.Context, string) (downstreamers, error)
downstreamWorkersTickerFactory func(context.Context, string, downstreamers, downstreamRouter) (downstreamWorkersTicker, error)
commandClientFactory func(context.Context, *Controller) error
)
@ -398,8 +397,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
}
if downstreamersFactory != nil {
boundVer := version.Get().VersionNumber()
c.downstreamWorkers, err = downstreamersFactory(ctx, "root", boundVer)
c.downstreamWorkers, err = downstreamersFactory(ctx, "root")
if err != nil {
return nil, fmt.Errorf("unable to initialize downstream workers graph: %w", err)
}
@ -489,8 +487,7 @@ func (c *Controller) Start() error {
if downstreamWorkersTickerFactory != nil {
// we'll use "root" to designate that this is the root of the graph (aka
// a controller)
boundVer := version.Get().VersionNumber()
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamRoutes)
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", c.downstreamWorkers, c.downstreamRoutes)
if err != nil {
return fmt.Errorf("error creating downstream workers ticker: %w", err)
}

@ -26,7 +26,7 @@ import (
// the function that handles a secondary connection over a provided listener
var handleSecondaryConnection = closeListener
func closeListener(_ context.Context, l, _ net.Listener, _ any) error {
func closeListener(_ context.Context, l net.Listener, _ any, _ int) error {
if l != nil {
return l.Close()
}
@ -242,16 +242,18 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error
ln.GrpcServer = workerServer
return func() {
err := handleSecondaryConnection(c.baseContext, multiplexingReverseGrpcListener, nil, c.downstreamRoutes)
if err != nil {
event.WriteError(c.baseContext, op, err, event.WithInfoMsg("handleSecondaryConnection error"))
}
go func() {
err := splitListener.Start()
if err != nil {
event.WriteError(c.baseContext, op, err, event.WithInfoMsg("splitListener.Start() error"))
}
}()
go func() {
err := handleSecondaryConnection(c.baseContext, multiplexingReverseGrpcListener, c.downstreamRoutes, -1)
if err != nil {
event.WriteError(c.baseContext, op, err, event.WithInfoMsg("handleSecondaryConnection error"))
}
}()
go func() {
err := ln.GrpcServer.Serve(multiplexingAuthedListener)
if err != nil {

@ -16,9 +16,9 @@ func noopAddressReceivers(context.Context, *Worker) ([]addressReceiver, error) {
type receiverType uint
const (
UnknownReceiverType receiverType = iota
grpcResolverReceiverType
secondaryConnectionReceiverType
UnknownReceiverType receiverType = 0
grpcResolverReceiverType receiverType = 1
dialingListenerReceiverType receiverType = 2
)
// String returns a string representation of the receiverType
@ -26,7 +26,7 @@ func (s receiverType) String() string {
return [...]string{
"unknown",
"grpcResolver",
"secondaryConnections",
"dialingListener",
}[s]
}

@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
)
const hcpbUrlSuffix = ".proxy.boundary.hashicorp.cloud:9202"
@ -78,14 +77,8 @@ func (w *Worker) StartControllerConnections() error {
return nil
}
// upstreamDialerFunc dials an upstream server. extraAlpnProtos can be provided
// to kms and pki connections and are used for identifying, on the server side,
// the intended purpose of the connection. upstreamDialerFunc takes an optional
// state struct which, if the connection is made using the node enrolment
// library, sends the state to the server and can be retrieved from the
// resulting protocol.Conn.
func (w *Worker) upstreamDialerFunc(state *structpb.Struct, extraAlpnProtos ...string) func(context.Context, string) (net.Conn, error) {
const op = "worker.(Worker).upstreamDialerFunc"
func (w *Worker) controllerDialerFunc(extraAlpnProtos ...string) func(context.Context, string) (net.Conn, error) {
const op = "worker.(Worker).controllerDialerFunc"
return func(ctx context.Context, addr string) (net.Conn, error) {
var conn net.Conn
var err error
@ -97,7 +90,6 @@ func (w *Worker) upstreamDialerFunc(state *structpb.Struct, extraAlpnProtos ...s
ctx,
w.WorkerAuthStorage,
addr,
nodeenrollment.WithState(state),
nodeenrollment.WithWrapper(w.conf.WorkerAuthStorageKms),
nodeenrollment.WithExtraAlpnProtos(extraAlpnProtos),
// If the activation token hasn't been populated, this won't do
@ -210,7 +202,7 @@ func (w *Worker) createClientConn(addr string) error {
grpc.WithUnaryInterceptor(metric.InstrumentClusterClient()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(math.MaxInt32)),
grpc.WithContextDialer(w.upstreamDialerFunc(nil)),
grpc.WithContextDialer(w.controllerDialerFunc()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(defServiceConfig),
// Don't have the resolver reach out for a service config from the

@ -32,12 +32,9 @@ import (
// the function that handles a secondary connection over a provided listener
var handleSecondaryConnection = closeListener
func closeListener(_ context.Context, l, l2 net.Listener, _ any) error {
func closeListener(_ context.Context, l net.Listener, _ any, _ int) error {
if l != nil {
l.Close()
}
if l2 != nil {
l2.Close()
return l.Close()
}
return nil
}
@ -171,16 +168,9 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger,
// Connections coming in here are authed by nodeenrollment and are for the
// reverse grpc purpose
reverseGrpcListener, err := w.workerAuthSplitListener.GetListener(common.ReverseGrpcConnectionAlpnValue, nodee.WithNativeConns(true))
if err != nil {
return nil, fmt.Errorf("error instantiating reverse grpc split listener: %w", err)
}
// Connections coming in here are authed by nodeenrollment and are for the
// multi-hop session-proxying
websocketProxyingListener, err := w.workerAuthSplitListener.GetListener(common.WebsocketProxyingAlpnValue, nodee.WithNativeConns(true))
reverseGrpcListener, err := w.workerAuthSplitListener.GetListener(common.ReverseGrpcConnectionAlpnValue)
if err != nil {
return nil, fmt.Errorf("error instantiating websocket proxying split listener: %w", err)
return nil, fmt.Errorf("error instantiating non-worker split listener: %w", err)
}
// This wraps the reverse grpc pki worker connections with a listener which
@ -223,10 +213,10 @@ func (w *Worker) configureForWorker(ln *base.ServerListener, logger *log.Logger,
}
return func() {
handleSecondaryConnection(cancelCtx, revPkiWorkerTrackingListener, websocketProxyingListener, w.downstreamRoutes)
go w.workerAuthSplitListener.Start()
go httpServer.Serve(proxyListener)
go ln.GrpcServer.Serve(pkiWorkerTrackingListener)
go handleSecondaryConnection(cancelCtx, revPkiWorkerTrackingListener, w.downstreamRoutes, -1)
}, nil
}

@ -20,7 +20,7 @@ import (
var firstStatusCheckPostHooks []func(context.Context, *Worker) error
var downstreamWorkersFactory func(ctx context.Context, workerId string, ver string) (downstreamers, error)
var downstreamWorkersFactory func(ctx context.Context, workerId string) (downstreamers, error)
type LastStatusInformation struct {
*pbs.StatusResponse
@ -261,10 +261,10 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
}
// regardless of whether or not it's a new address, we need to set
// them for secondaryConnections
// them for dialingListeners
for _, as := range *addressReceivers {
switch {
case as.Type() == secondaryConnectionReceiverType:
case as.Type() == dialingListenerReceiverType:
tmpAddrs := make([]string, len(addrs))
copy(tmpAddrs, addrs)
if len(tmpAddrs) == 0 {
@ -308,7 +308,7 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context, sessionManager sess
// If we have post hooks for after the first status check, run them now
if w.everAuthenticated.CAS(authenticationStatusFirstAuthentication, authenticationStatusFirstStatusRpcSuccessful) {
if downstreamWorkersFactory != nil {
w.downstreamWorkers, err = downstreamWorkersFactory(cancelCtx, w.LastStatusSuccess().WorkerId, versionInfo.VersionNumber())
w.downstreamWorkers, err = downstreamWorkersFactory(cancelCtx, w.LastStatusSuccess().WorkerId)
if err != nil {
event.WriteError(cancelCtx, op, err)
w.conf.ServerSideShutdownCh <- struct{}{}

@ -53,7 +53,7 @@ type downstreamRouter interface {
// ProcessPendingConnections starts a function that continually processes
// incoming client connections. This only returns when the provided context
// is done.
StartProcessingPendingConnections(context.Context, func() string) error
StartProcessingPendingConnections(context.Context, func() string)
}
// downstreamers provides at least a minimum interface that must be met by a
@ -447,9 +447,7 @@ func (w *Worker) Start() error {
}
go func() {
defer w.tickerWg.Done()
if err := w.downstreamRoutes.StartProcessingPendingConnections(w.baseContext, servNameFn); err != nil {
errors.Wrap(w.baseContext, err, op)
}
w.downstreamRoutes.StartProcessingPendingConnections(w.baseContext, servNameFn)
}()
go func() {
defer w.tickerWg.Done()

Loading…
Cancel
Save