diff --git a/internal/servers/worker/session.go b/internal/servers/worker/session.go index ebd5da269c..e387df6a80 100644 --- a/internal/servers/worker/session.go +++ b/internal/servers/worker/session.go @@ -15,10 +15,9 @@ import ( "github.com/hashicorp/boundary/internal/session" ) -const ( - validateSessionTimeout = 90 * time.Second - errMakeSessionCloseInfoNilCloseInfo = "nil closeInfo supplied to makeSessionCloseInfo, this is a bug, please report it" -) +const validateSessionTimeout = 90 * time.Second + +var errMakeSessionCloseInfoNilCloseInfo = errors.New("nil closeInfo supplied to makeSessionCloseInfo, this is a bug, please report it") type connInfo struct { id string @@ -267,17 +266,41 @@ func (w *Worker) closeConnections(ctx context.Context, closeInfo map[string]stri } w.logger.Trace("marking connections as closed", "session_and_connection_ids", fmt.Sprintf("%#v", closeInfo)) - response, err := w.closeConnection(ctx, w.makeCloseConnectionRequest(closeInfo)) + + // How we handle close info depends on whether or not we succeeded with + // marking them closed on the controller. + var sessionCloseInfo map[string][]*pbs.CloseConnectionResponseData + var err error + + // TODO: This, along with the status call to the controller, probably needs a + // bit of formalization in terms of how we handle timeouts. For now, this + // just ensures consistency with the same status call in that it times out + // within an adequate period of time. + closeConnCtx, closeConnCancel := context.WithTimeout(ctx, statusTimeout) + defer closeConnCancel() + response, err := w.closeConnection(closeConnCtx, w.makeCloseConnectionRequest(closeInfo)) if err != nil { w.logger.Error("error marking connections closed", "error", err) w.logger.Warn( "error contacting controller, connections will be closed only on worker", "session_and_connection_ids", fmt.Sprintf("%#v", closeInfo), ) + + // Since we could not reach the controller, we have to make a "fake" response set. + sessionCloseInfo, err = w.makeFakeSessionCloseInfo(closeInfo) + } else { + // Connection succeeded, so we can proceed with making the sessionCloseInfo + // off of the response data. + sessionCloseInfo, err = w.makeSessionCloseInfo(closeInfo, response) + } + + if err != nil { + w.logger.Error(err.Error()) + w.logger.Error("serious error in processing return data from controller, aborting marking connections as closed") } // Mark connections as closed - closedIds, errs := w.setCloseTimeForResponse(w.makeSessionCloseInfo(closeInfo, response)) + closedIds, errs := w.setCloseTimeForResponse(sessionCloseInfo) if len(errs) > 0 { for _, err := range errs { w.logger.Error("error marking connection closed in state", "err", err) @@ -316,11 +339,9 @@ func (w *Worker) makeCloseConnectionRequest(closeInfo map[string]string) *pbs.Cl func (w *Worker) makeSessionCloseInfo( closeInfo map[string]string, response *pbs.CloseConnectionResponse, -) map[string][]*pbs.CloseConnectionResponseData { +) (map[string][]*pbs.CloseConnectionResponseData, error) { if closeInfo == nil { - // Should never happen, panic if it does. Results will be - // undefined. - panic(errMakeSessionCloseInfoNilCloseInfo) + return nil, errMakeSessionCloseInfoNilCloseInfo } result := make(map[string][]*pbs.CloseConnectionResponseData) @@ -328,7 +349,27 @@ func (w *Worker) makeSessionCloseInfo( result[closeInfo[v.GetConnectionId()]] = append(result[closeInfo[v.GetConnectionId()]], v) } - return result + return result, nil +} + +// makeFakeSessionCloseInfo makes a "fake" makeFakeSessionCloseInfo, intended +// for use when we can't contact the controller. +func (w *Worker) makeFakeSessionCloseInfo( + closeInfo map[string]string, +) (map[string][]*pbs.CloseConnectionResponseData, error) { + if closeInfo == nil { + return nil, errMakeSessionCloseInfoNilCloseInfo + } + + result := make(map[string][]*pbs.CloseConnectionResponseData) + for connectionId, sessionId := range closeInfo { + result[sessionId] = append(result[sessionId], &pbs.CloseConnectionResponseData{ + ConnectionId: connectionId, + Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED, + }) + } + + return result, nil } // setCloseTimeForResponse iterates a CloseConnectionResponse and diff --git a/internal/servers/worker/session_test.go b/internal/servers/worker/session_test.go index 4d89038fd8..58f2559d5f 100644 --- a/internal/servers/worker/session_test.go +++ b/internal/servers/worker/session_test.go @@ -41,22 +41,58 @@ func TestMakeSessionCloseInfo(t *testing.T) { {ConnectionId: "bar", Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED}, }, } - actual := new(Worker).makeSessionCloseInfo(closeInfo, response) + actual, err := new(Worker).makeSessionCloseInfo(closeInfo, response) + require.NoError(err) require.Equal(expected, actual) } -func TestMakeSessionCloseInfoPanicIfCloseInfoNil(t *testing.T) { +func TestMakeSessionCloseInfoErrorIfCloseInfoNil(t *testing.T) { require := require.New(t) - require.Panics(func() { - new(Worker).makeSessionCloseInfo(nil, nil) - }) + actual, err := new(Worker).makeSessionCloseInfo(nil, nil) + require.Nil(actual) + require.ErrorIs(err, errMakeSessionCloseInfoNilCloseInfo) } func TestMakeSessionCloseInfoEmpty(t *testing.T) { require := require.New(t) + actual, err := new(Worker).makeSessionCloseInfo(make(map[string]string), nil) + require.NoError(err) require.Equal( make(map[string][]*pbs.CloseConnectionResponseData), - new(Worker).makeSessionCloseInfo(make(map[string]string), nil), + actual, + ) +} + +func TestMakeFakeSessionCloseInfo(t *testing.T) { + require := require.New(t) + closeInfo := map[string]string{"foo": "one", "bar": "two"} + expected := map[string][]*pbs.CloseConnectionResponseData{ + "one": { + {ConnectionId: "foo", Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED}, + }, + "two": { + {ConnectionId: "bar", Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED}, + }, + } + actual, err := new(Worker).makeFakeSessionCloseInfo(closeInfo) + require.NoError(err) + require.Equal(expected, actual) +} + +func TestMakeFakeSessionCloseInfoPanicIfCloseInfoNil(t *testing.T) { + require := require.New(t) + actual, err := new(Worker).makeFakeSessionCloseInfo(nil) + require.Nil(actual) + require.ErrorIs(err, errMakeSessionCloseInfoNilCloseInfo) +} + +func TestMakeFakeSessionCloseInfoEmpty(t *testing.T) { + require := require.New(t) + actual, err := new(Worker).makeFakeSessionCloseInfo(make(map[string]string)) + require.NoError(err) + require.Equal( + make(map[string][]*pbs.CloseConnectionResponseData), + actual, ) } diff --git a/internal/servers/worker/status.go b/internal/servers/worker/status.go index 96a670041d..821ceac7ba 100644 --- a/internal/servers/worker/status.go +++ b/internal/servers/worker/status.go @@ -16,7 +16,7 @@ import ( // In the future we could make this configurable const ( statusInterval = 2 * time.Second - statusTimeout = 10 * time.Second + statusTimeout = 5 * time.Second defaultStatusGracePeriod = 30 * time.Second statusGracePeriodEnvVar = "BOUNDARY_STATUS_GRACE_PERIOD" ) diff --git a/internal/servers/worker/testing.go b/internal/servers/worker/testing.go index 19b5755e60..c41b4d4b4b 100644 --- a/internal/servers/worker/testing.go +++ b/internal/servers/worker/testing.go @@ -5,9 +5,11 @@ import ( "fmt" "net" "testing" + "time" "github.com/hashicorp/boundary/internal/cmd/base" "github.com/hashicorp/boundary/internal/cmd/config" + pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping" "github.com/hashicorp/vault/sdk/helper/base62" @@ -76,6 +78,64 @@ func (tw *TestWorker) ProxyAddrs() []string { return tw.addrs } +// TestSessionInfo provides detail about a particular session from +// the worker's local session state. This detail is a point-in-time +// snapshot of what's in sessionInfoMap for a particular session, and +// may not contain all of the information that is contained within +// it, or the underlying connInfoMap. Only details that are really +// important to testing are passed along. +type TestSessionInfo struct { + Id string + Status pbs.SESSIONSTATUS + + // Connections is indexed by connection ID, which is also included + // within TestConnectionInfo for convenience. + Connections map[string]TestConnectionInfo +} + +// TestConnectionInfo provides detail about a particular connection +// as a part of TestSessionInfo. See that struct for details about +// the purpose of this data and how it's gathered. +type TestConnectionInfo struct { + Id string + Status pbs.CONNECTIONSTATUS + CloseTime time.Time +} + +// LookupSession returns session info from the worker's local session +// state. +// +// The return boolean will be true if the session was found, false if +// it wasn't. +// +// See TestSessionInfo for details on how to use this info. +func (tw *TestWorker) LookupSession(id string) (TestSessionInfo, bool) { + var result TestSessionInfo + raw, ok := tw.w.sessionInfoMap.Load(id) + if !ok { + return result, false + } + + sess := raw.(*sessionInfo) + sess.RLock() + defer sess.RUnlock() + + conns := make(map[string]TestConnectionInfo) + for _, conn := range sess.connInfoMap { + conns[conn.id] = TestConnectionInfo{ + Id: conn.id, + Status: conn.status, + CloseTime: conn.closeTime, + } + } + + result.Id = sess.id + result.Status = sess.status + result.Connections = conns + + return result, true +} + // Shutdown runs any cleanup functions; be sure to run this after your test is // done func (tw *TestWorker) Shutdown() { diff --git a/internal/servers/worker/testing_test.go b/internal/servers/worker/testing_test.go new file mode 100644 index 0000000000..5befad1eff --- /dev/null +++ b/internal/servers/worker/testing_test.go @@ -0,0 +1,61 @@ +package worker + +import ( + "sync" + "testing" + "time" + + pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" + "github.com/stretchr/testify/require" +) + +func TestTestWorkerLookupSession(t *testing.T) { + require := require.New(t) + // This loads the golang reference time, see those docs for more details. We + // just use this as a stable non-zero time. + refTime, err := time.Parse(time.RFC3339, "2006-01-02T15:04:05+07:00") + require.NoError(err) + + tw := new(TestWorker) + tw.w = &Worker{ + sessionInfoMap: new(sync.Map), + } + tw.w.sessionInfoMap.Store("foo", &sessionInfo{ + id: "foo", + status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE, + connInfoMap: map[string]*connInfo{ + "one": &connInfo{ + id: "one", + status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED, + closeTime: refTime, + }, + }, + }) + + expected := TestSessionInfo{ + Id: "foo", + Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE, + Connections: map[string]TestConnectionInfo{ + "one": TestConnectionInfo{ + Id: "one", + Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED, + CloseTime: refTime, + }, + }, + } + + actual, ok := tw.LookupSession("foo") + require.True(ok) + require.Equal(expected, actual) +} + +func TestTestWorkerLookupSessionMissing(t *testing.T) { + require := require.New(t) + tw := new(TestWorker) + tw.w = &Worker{ + sessionInfoMap: new(sync.Map), + } + actual, ok := tw.LookupSession("missing") + require.False(ok) + require.Equal(TestSessionInfo{}, actual) +} diff --git a/internal/session/connection_state.go b/internal/session/connection_state.go index 858bf9408b..b92b75e49a 100644 --- a/internal/session/connection_state.go +++ b/internal/session/connection_state.go @@ -19,9 +19,10 @@ const ( type ConnectionStatus string const ( - StatusAuthorized ConnectionStatus = "authorized" - StatusConnected ConnectionStatus = "connected" - StatusClosed ConnectionStatus = "closed" + StatusAuthorized ConnectionStatus = "authorized" + StatusConnected ConnectionStatus = "connected" + StatusClosed ConnectionStatus = "closed" + StatusUnspecified ConnectionStatus = "unspecified" // Utility state not valid in the DB ) // String representation of the state's status @@ -42,6 +43,20 @@ func (s ConnectionStatus) ProtoVal() workerpbs.CONNECTIONSTATUS { return workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_UNSPECIFIED } +// ConnectionStatusFromProtoVal is the reverse of +// ConnectionStatus.ProtoVal. +func ConnectionStatusFromProtoVal(s workerpbs.CONNECTIONSTATUS) ConnectionStatus { + switch s { + case workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_AUTHORIZED: + return StatusAuthorized + case workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CONNECTED: + return StatusConnected + case workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED: + return StatusClosed + } + return StatusUnspecified +} + // ConnectionState of the state of the connection type ConnectionState struct { // ConnectionId is used to access the state via an API diff --git a/internal/tests/cluster/session_cleanup_test.go b/internal/tests/cluster/session_cleanup_test.go index 4b5f773d6e..aa81c0a980 100644 --- a/internal/tests/cluster/session_cleanup_test.go +++ b/internal/tests/cluster/session_cleanup_test.go @@ -11,6 +11,7 @@ import ( "io" "net" "net/http" + "reflect" "strconv" "testing" "time" @@ -22,18 +23,37 @@ import ( "github.com/hashicorp/boundary/internal/proxy" "github.com/hashicorp/boundary/internal/servers/controller" "github.com/hashicorp/boundary/internal/servers/worker" + "github.com/hashicorp/boundary/internal/session" "github.com/hashicorp/dawdle" "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-hclog" "github.com/hashicorp/vault/sdk/helper/base62" "github.com/mr-tron/base58" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "nhooyr.io/websocket" "nhooyr.io/websocket/wspb" ) -const testSendRecvSendMax = 60 +const ( + testSendRecvSendMax = 60 + defaultGracePeriod = time.Second * 30 + expectConnectionStateOnWorkerTimeout = defaultGracePeriod * 2 + + // This is the interval that we check states on in the worker. It + // needs to be particularly granular to ensure that we allow for + // adequate time to catch any edge cases where the connection state + // disappears from the worker before we can update the state. + // + // As of this writing, the (hardcoded) status request interval on + // the worker is 2 seconds, and a session is removed from the state + // when it has no connections left on the *next* pass. A one second + // interval would only mean two chances to check with a high + // possibility of skew; while it seems okay I'm still not 100% + // comfortable with this little resolution. + expectConnectionStateOnWorkerInterval = time.Millisecond * 100 +) func TestWorkerSessionCleanup(t *testing.T) { require := require.New(t) @@ -99,7 +119,7 @@ func TestWorkerSessionCleanup(t *testing.T) { require.NotNil(tgt) // Authorize and connect - sess := newTestSession(ctx, t, tcl, "ttcp_1234567890") + sess := newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") sConn := sess.Connect(ctx, t, logger) // Run initial send/receive test, make sure things are working @@ -114,10 +134,13 @@ func TestWorkerSessionCleanup(t *testing.T) { // Assert we have no connections left (should be default behavior) sess.TestNoConnectionsLeft(t) + // Assert connection has been removed from the local worker state + sess.ExpectConnectionStateOnWorker(ctx, t, w1, session.StatusClosed) + // Resume the connection, and reconnect. proxy.Resume() - time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy - sess = newTestSession(ctx, t, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup() + time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy + sess = newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup() sConn = sess.Connect(ctx, t, logger) sConn.TestSendRecvAll(t) } @@ -219,7 +242,7 @@ func TestWorkerSessionCleanupMultiController(t *testing.T) { require.NotNil(tgt) // Authorize and connect - sess := newTestSession(ctx, t, tcl, "ttcp_1234567890") + sess := newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") sConn := sess.Connect(ctx, t, logger) // Run initial send/receive test, make sure things are working @@ -240,21 +263,29 @@ func TestWorkerSessionCleanupMultiController(t *testing.T) { p1.Pause() sConn.TestSendRecvFail(t) + // Assert we have no connections left (should be default behavior) + sess.TestNoConnectionsLeft(t) + + // Assert connection has been removed from the local worker state + sess.ExpectConnectionStateOnWorker(ctx, t, w1, session.StatusClosed) + // Finally resume both, try again. Should behave as per normal. p1.Resume() p2.Resume() - time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy - sess = newTestSession(ctx, t, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup() + time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy + sess = newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup() sConn = sess.Connect(ctx, t, logger) sConn.TestSendRecvAll(t) } // testSession represents an authorized session. type testSession struct { + sessionId string workerAddr string transport *http.Transport tofuToken string connectionsLeft int32 + logger hclog.Logger } // newTestSession authorizes a session and creates all of the data @@ -262,6 +293,7 @@ type testSession struct { func newTestSession( ctx context.Context, t *testing.T, + logger hclog.Logger, tcl *targets.Client, targetId string, ) *testSession { @@ -271,7 +303,10 @@ func newTestSession( require.NoError(err) require.NotNil(sar) - s := new(testSession) + s := &testSession{ + sessionId: sar.Item.SessionId, + logger: logger, + } authzString := sar.GetItem().(*targets.SessionAuthorization).AuthorizationToken marshaled, err := base58.FastBase58Decoding(authzString) require.NoError(err) @@ -363,6 +398,88 @@ func (s *testSession) TestNoConnectionsLeft(t *testing.T) { require.Zero(t, s.connectionsLeft) } +// ExpectConnectionStateOnWorker waits until all connections in a +// session have transitioned to a particular state on the worker. +func (s *testSession) ExpectConnectionStateOnWorker( + ctx context.Context, + t *testing.T, + tw *worker.TestWorker, + expectState session.ConnectionStatus, +) { + t.Helper() + require := require.New(t) + assert := assert.New(t) + + ctx, cancel := context.WithTimeout(ctx, expectConnectionStateOnWorkerTimeout) + defer cancel() + + // This is just for initialization of the actual state set. + const sessionStatusUnknown session.ConnectionStatus = "unknown" + + connIds := s.testWorkerConnectionIds(t, tw) + // Make a set of states, 1 per connection + actualStates := make(map[string]session.ConnectionStatus, len(connIds)) + for _, id := range connIds { + actualStates[id] = sessionStatusUnknown + } + + // Make expect set for comparison + expectStates := make(map[string]session.ConnectionStatus, len(connIds)) + for _, id := range connIds { + expectStates[id] = expectState + } + + for { + if ctx.Err() != nil { + break + } + + for _, conn := range s.testWorkerConnectionInfo(t, tw) { + actualStates[conn.Id] = session.ConnectionStatusFromProtoVal(conn.Status) + } + + if reflect.DeepEqual(expectStates, actualStates) { + break + } + + time.Sleep(expectConnectionStateOnWorkerInterval) + } + + // "non-fatal" assert here, so that we can surface both timeouts + // and invalid state + assert.NoError(ctx.Err()) + + // Assert + require.Equal(expectStates, actualStates) + s.logger.Debug("successfully asserted all connection states on worker", "expected_states", expectStates, "actual_states", actualStates) +} + +func (s *testSession) testWorkerConnectionInfo(t *testing.T, tw *worker.TestWorker) map[string]worker.TestConnectionInfo { + t.Helper() + require := require.New(t) + si, ok := tw.LookupSession(s.sessionId) + // This is always an error if the session has been removed from the + // local state. + require.True(ok) + + // Likewise, we require the helper to always be used with + // connections. + require.Greater(len(si.Connections), 0, "should have at least one connection") + + return si.Connections +} + +func (s *testSession) testWorkerConnectionIds(t *testing.T, tw *worker.TestWorker) []string { + t.Helper() + conns := s.testWorkerConnectionInfo(t, tw) + result := make([]string, 0, len(conns)) + for _, conn := range conns { + result = append(result, conn.Id) + } + + return result +} + // testSessionConnection abstracts a connected session. type testSessionConnection struct { conn net.Conn