mirror of https://github.com/hashicorp/boundary
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
435 lines
14 KiB
435 lines
14 KiB
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package proxy
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"net/netip"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/hashicorp/boundary/api"
|
|
"github.com/hashicorp/boundary/api/sessions"
|
|
"github.com/hashicorp/boundary/api/targets"
|
|
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
|
"github.com/hashicorp/go-secure-stdlib/base62"
|
|
"github.com/hashicorp/go-secure-stdlib/temperror"
|
|
ua "go.uber.org/atomic"
|
|
)
|
|
|
|
// Note: Much of the proxying behavior is tested in various tests within
|
|
// internal/tests but those cannot be included in this package as they would
|
|
// create a dependency on the main module from this module.
|
|
|
|
// This can take more time than you might expect, especially if a lot of these
|
|
// are sent at once, so the timeout is quite long. We could allow a custom
|
|
// timeout to be an option if we wish.
|
|
const sessionCancelTimeout = 30 * time.Second
|
|
|
|
type ClientProxy struct {
|
|
tofuToken string
|
|
cachedListenerAddress *ua.String
|
|
connectionsLeft *atomic.Int32
|
|
connsLeftCh chan int32
|
|
callerConnectionsLeftCh chan int32
|
|
apiClient *api.Client
|
|
sessionAuthzData *targets.SessionAuthorizationData
|
|
createTime time.Time
|
|
expiration time.Time
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
transport *http.Transport
|
|
workerAddr string
|
|
listenAddrPort netip.AddrPort
|
|
listener *atomic.Value
|
|
listenerCloseOnce *sync.Once
|
|
clientTlsConf *tls.Config
|
|
connWg *sync.WaitGroup
|
|
started *atomic.Bool
|
|
skipSessionTeardown bool
|
|
}
|
|
|
|
// New creates a new client proxy. The given context should be cancelable; once
|
|
// the proxy is started, cancel the context to stop the proxy. The proxy may
|
|
// also cancel on its own if the session expires or there are no connections
|
|
// left.
|
|
//
|
|
// Supported options:
|
|
//
|
|
// * WithListenAddrPort - Specify a TCP address and port on which to listen
|
|
//
|
|
// * WithListener - Specify a custom listener on which to accept connections;
|
|
// overrides WithListenAddrPort if both are set
|
|
//
|
|
// * WithSessionAuthorizationData - Specify an already-unmarshaled session
|
|
// authorization object. If set, authzToken can be empty.
|
|
//
|
|
// * WithConnectionsLeftCh - Specify a channel on which to send the number of
|
|
// remaining connections as they are consumed
|
|
//
|
|
// * WithWorkerHost - If set, use this host name as the SNI host when making the
|
|
// TLS connection to the worker
|
|
//
|
|
// EXPERIMENTAL: While this API is not expected to change, it is new and
|
|
// feedback from users may necessitate changes.
|
|
func New(ctx context.Context, authzToken string, opt ...Option) (*ClientProxy, error) {
|
|
opts, err := getOpts(opt...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not parse options: %w", err)
|
|
}
|
|
|
|
if authzToken == "" && opts.WithSessionAuthorizationData == nil {
|
|
return nil, fmt.Errorf("empty session authorization token and object")
|
|
}
|
|
|
|
p := &ClientProxy{
|
|
cachedListenerAddress: ua.NewString(""),
|
|
connsLeftCh: make(chan int32),
|
|
connectionsLeft: new(atomic.Int32),
|
|
listener: new(atomic.Value),
|
|
listenerCloseOnce: new(sync.Once),
|
|
connWg: new(sync.WaitGroup),
|
|
listenAddrPort: opts.WithListenAddrPort,
|
|
callerConnectionsLeftCh: opts.WithConnectionsLeftCh,
|
|
started: new(atomic.Bool),
|
|
skipSessionTeardown: opts.WithSkipSessionTeardown,
|
|
apiClient: opts.withApiClient,
|
|
}
|
|
|
|
if opts.WithListener != nil {
|
|
p.listener.Store(opts.WithListener)
|
|
}
|
|
|
|
p.tofuToken, err = base62.Random(20)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not derive random bytes for tofu token: %w", err)
|
|
}
|
|
|
|
p.sessionAuthzData = opts.WithSessionAuthorizationData
|
|
if p.sessionAuthzData == nil {
|
|
p.sessionAuthzData, err = targets.SessionAuthorization{AuthorizationToken: authzToken}.GetSessionAuthorizationData()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error turning authz token into authorization data: %w", err)
|
|
}
|
|
}
|
|
|
|
if len(p.sessionAuthzData.WorkerInfo) == 0 {
|
|
return nil, errors.New("no workers found in authorization data")
|
|
}
|
|
|
|
if opts.WithListener == nil {
|
|
if p.listenAddrPort.Port() == 0 {
|
|
p.listenAddrPort = netip.AddrPortFrom(p.listenAddrPort.Addr(), uint16(p.sessionAuthzData.DefaultClientPort))
|
|
}
|
|
}
|
|
p.connectionsLeft.Store(p.sessionAuthzData.ConnectionLimit)
|
|
p.workerAddr = p.sessionAuthzData.WorkerInfo[0].Address
|
|
|
|
tlsConf, err := p.clientTlsConfig(opt...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating TLS configuration: %w", err)
|
|
}
|
|
p.createTime = p.sessionAuthzData.CreatedTime
|
|
p.expiration = p.sessionAuthzData.Expiration
|
|
|
|
// We don't _rely_ on client-side timeout verification but this prevents us
|
|
// seeming to be ready for a connection that will immediately fail when we
|
|
// try to actually make it
|
|
p.ctx, p.cancel = context.WithDeadline(ctx, p.expiration)
|
|
|
|
transport := cleanhttp.DefaultTransport()
|
|
transport.DisableKeepAlives = false
|
|
// This isn't/shouldn't be used anyways really because the connection is
|
|
// hijacked, just setting for completeness
|
|
transport.IdleConnTimeout = 0
|
|
transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
dialer := &tls.Dialer{Config: tlsConf}
|
|
return dialer.DialContext(ctx, network, addr)
|
|
}
|
|
p.transport = transport
|
|
|
|
return p, nil
|
|
}
|
|
|
|
// Start starts the listener for client proxying. It ends, with any errors, when
|
|
// the listener is closed and no connections are left. Cancel the client's proxy
|
|
// to force this to happen early. It is not safe to call Start twice, including
|
|
// once it has exited, and will immediately error in this case; create a new
|
|
// ClientProxy with New().
|
|
//
|
|
// Note: if a custom listener implementation is used and the implementation can
|
|
// return a Temporary error, the listener will not be closed on that condition
|
|
// and no feedback will be given. It is up to the listener implementation to
|
|
// inform the client, if needed, of any status causing a Temporary error to be
|
|
// returned on accept.
|
|
//
|
|
// EXPERIMENTAL: While this API is not expected to change, it is new and
|
|
// feedback from users may necessitate changes.
|
|
func (p *ClientProxy) Start(opt ...Option) (retErr error) {
|
|
opts, err := getOpts(opt...)
|
|
if err != nil {
|
|
return fmt.Errorf("could not parse options: %w", err)
|
|
}
|
|
if !p.started.CompareAndSwap(false, true) {
|
|
return errors.New("proxy was already started")
|
|
}
|
|
|
|
defer p.cancel()
|
|
|
|
if opts.withSessionTeardownTimeout == 0 {
|
|
opts.withSessionTeardownTimeout = sessionCancelTimeout
|
|
}
|
|
|
|
if p.listener.Load() == nil {
|
|
var err error
|
|
ln, err := net.ListenTCP("tcp", &net.TCPAddr{
|
|
IP: p.listenAddrPort.Addr().AsSlice(),
|
|
Port: int(p.listenAddrPort.Port()),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to start listening: %w", err)
|
|
}
|
|
p.listener.Store(ln)
|
|
}
|
|
|
|
listenerCloseFunc := func() {
|
|
p.listenerCloseOnce.Do(func() {
|
|
// Forces the for loop to exit instead of spinning on errors
|
|
p.connectionsLeft.Store(0)
|
|
if err := p.listener.Load().(net.Listener).Close(); err != nil && err != net.ErrClosed {
|
|
retErr = errors.Join(retErr, fmt.Errorf("error closing proxy listener: %w", err))
|
|
}
|
|
})
|
|
}
|
|
|
|
// Ensure closing the listener runs on any other return condition
|
|
defer listenerCloseFunc()
|
|
|
|
fin := make(chan error, 10)
|
|
p.connWg.Add(1)
|
|
go func() {
|
|
defer p.connWg.Done()
|
|
for {
|
|
listeningConn, err := p.listener.Load().(net.Listener).Accept()
|
|
if err != nil {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
default:
|
|
if errors.Is(err, net.ErrClosed) {
|
|
// Generally this will be because we canceled the
|
|
// context or ran out of session connections and are
|
|
// winding down. This will never revert, so return.
|
|
return
|
|
}
|
|
// If the upstream listener indicates that this is an error
|
|
// with e.g. just this connection, don't close, just
|
|
// continue
|
|
if temperror.IsTempError(err) {
|
|
continue
|
|
}
|
|
// No reason to think we can successfully handle the next
|
|
// connection that comes our way, so cancel the proxy
|
|
fin <- fmt.Errorf("error from accept: %w", err)
|
|
listenerCloseFunc()
|
|
p.cancel()
|
|
return
|
|
}
|
|
}
|
|
p.connWg.Add(1)
|
|
go func() {
|
|
defer listeningConn.Close()
|
|
defer p.connWg.Done()
|
|
wsConn, err := p.getWsConn(p.ctx)
|
|
if err != nil {
|
|
fin <- fmt.Errorf("error from getWsConn: %w", err)
|
|
// No reason to think we can successfully handle the next
|
|
// connection that comes our way, so cancel the proxy
|
|
listenerCloseFunc()
|
|
p.cancel()
|
|
return
|
|
}
|
|
if err := p.runTcpProxyV1(wsConn, listeningConn); err != nil {
|
|
fin <- fmt.Errorf("error from runTcpProxyV1: %w", err)
|
|
// No reason to think we can successfully handle the next
|
|
// connection that comes our way, so cancel the proxy
|
|
listenerCloseFunc()
|
|
p.cancel()
|
|
return
|
|
}
|
|
|
|
// TODO: Determine if this is useful or if there is a better approach
|
|
// that we may use in the long term.
|
|
if p.apiClient != nil {
|
|
// If we can tell that the session for the connection we just
|
|
// closed is terminated, we can close the listener, otherwise
|
|
// might as well leave it open so the next connection can be
|
|
// tried.
|
|
sess, err := sessions.NewClient(p.apiClient).Read(p.ctx, p.sessionAuthzData.SessionId)
|
|
if err != nil || sess == nil || sess.Item == nil || sess.Item.Status == "active" {
|
|
return
|
|
}
|
|
|
|
// We got a valid session response for the session we just
|
|
// closed a connection for. Since the status isn't active
|
|
// we can treat the session as no longer being able to
|
|
// support connections, so close this proxy.
|
|
fin <- fmt.Errorf("session no longer active")
|
|
listenerCloseFunc()
|
|
p.cancel()
|
|
}
|
|
}()
|
|
}
|
|
}()
|
|
|
|
p.connWg.Add(1)
|
|
go func() {
|
|
defer func() {
|
|
// Run a function (last, after connwg is done) just to ensure that
|
|
// we drain from this in case any connections starting as this
|
|
// number changes are trying to send the information down
|
|
for {
|
|
select {
|
|
case <-p.connsLeftCh:
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
defer p.connWg.Done()
|
|
defer listenerCloseFunc()
|
|
|
|
for {
|
|
select {
|
|
case <-p.ctx.Done():
|
|
return
|
|
case connsLeft := <-p.connsLeftCh:
|
|
p.connectionsLeft.Store(connsLeft)
|
|
if p.callerConnectionsLeftCh != nil {
|
|
p.callerConnectionsLeftCh <- connsLeft
|
|
}
|
|
if connsLeft == 0 {
|
|
// Close the listener as we can't authorize any more
|
|
// connections
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
p.connWg.Wait()
|
|
defer p.cancel()
|
|
|
|
{
|
|
// the go funcs are done, so we can safely close the chan and range over any errors
|
|
close(fin)
|
|
var finErrors []error
|
|
for err := range fin {
|
|
finErrors = append(finErrors, err)
|
|
}
|
|
if len(finErrors) > 0 {
|
|
return errors.Join(finErrors...)
|
|
}
|
|
}
|
|
|
|
var sendSessionCancel bool
|
|
// If we're not after expiration, ensure there is a bit of buffer in
|
|
// case clocks are not quite the same between worker and this machine
|
|
if time.Now().Before(p.expiration.Add(-5 * time.Minute)) {
|
|
sendSessionCancel = true
|
|
}
|
|
|
|
if !sendSessionCancel || p.skipSessionTeardown {
|
|
return nil
|
|
}
|
|
|
|
return p.CloseSession(opts.withSessionTeardownTimeout)
|
|
}
|
|
|
|
// CloseSession attempts to close the currently proxied session by sending a
|
|
// request to do so to the worker proxying the connection
|
|
func (p *ClientProxy) CloseSession(sessionTeardownTimeout time.Duration) error {
|
|
if sessionTeardownTimeout == 0 {
|
|
sessionTeardownTimeout = sessionCancelTimeout
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), sessionTeardownTimeout)
|
|
defer cancel()
|
|
if err := p.sendSessionTeardown(ctx); err != nil {
|
|
return fmt.Errorf("error sending session teardown request to worker: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListenerAddress returns the address of the client proxy listener. Because the
|
|
// listener is started with Start(), this could be called before listening
|
|
// occurs. To avoid returning until we have a valid value, pass a context;
|
|
// canceling the context will cause the function to return an empty AddrPort if
|
|
// it's not yet known. Otherwise the function will return when the address is
|
|
// available. In either case, test the result to ensure it's not empty.
|
|
//
|
|
// Warning: a non-cancelable context will cause this call to block forever until
|
|
// the listener's address can be determined.
|
|
//
|
|
// EXPERIMENTAL: While this API is not expected to change, it is new and
|
|
// feedback from users may necessitate changes.
|
|
func (p *ClientProxy) ListenerAddress(ctx context.Context) string {
|
|
switch {
|
|
case p.cachedListenerAddress.Load() != "":
|
|
return p.cachedListenerAddress.Load()
|
|
case p.listener.Load() != nil:
|
|
addr := p.listener.Load().(net.Listener).Addr().String()
|
|
p.cachedListenerAddress.Store(addr)
|
|
return addr
|
|
case ctx == nil:
|
|
return ""
|
|
}
|
|
timer := time.NewTimer(0)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return ""
|
|
case <-timer.C:
|
|
if p.listener.Load() != nil {
|
|
timer.Stop()
|
|
addr := p.listener.Load().(net.Listener).Addr().String()
|
|
p.cachedListenerAddress.Store(addr)
|
|
return addr
|
|
}
|
|
timer.Reset(10 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
|
|
// SessionCreatedTime returns the creation time of the session
|
|
//
|
|
// EXPERIMENTAL: While this API is not expected to change, it is new and
|
|
// feedback from users may necessitate changes.
|
|
func (p *ClientProxy) SessionCreatedTime() time.Time {
|
|
return p.createTime
|
|
}
|
|
|
|
// SessionExpiration returns the expiration time of the session
|
|
//
|
|
// EXPERIMENTAL: While this API is not expected to change, it is new and
|
|
// feedback from users may necessitate changes.
|
|
func (p *ClientProxy) SessionExpiration() time.Time {
|
|
return p.expiration
|
|
}
|
|
|
|
// ConnectionsLeft returns the number of connections left in the session, or -1
|
|
// if unlimited.
|
|
//
|
|
// EXPERIMENTAL: While this API is not expected to change, it is new and
|
|
// feedback from users may necessitate changes.
|
|
func (p *ClientProxy) ConnectionsLeft() int32 {
|
|
return p.connectionsLeft.Load()
|
|
}
|