From 48e55f156a0fbdcb4e1e711b04271e57bc8f952e Mon Sep 17 00:00:00 2001 From: Chris Marchesi Date: Wed, 30 Jun 2021 10:53:54 -0700 Subject: [PATCH] worker: ensure connections are closed in local state on controller fail (#1369) This backports some test work that is being added in #1340 to assert that the worker is actually marking connections closed in its *local* state on a failure to contact the controller. This was missed in #1357 during some late-stage refactoring. Summary of the changes: * If we can't actually contact the controller during connection closure to mark the connections as closed, we actually construct a "fake" response that just simply includes a closed state for all connections requested. This is then used to update the local connection state. * Added a 3 second timeout to the CloseConnection request to the controller to ensure the call to close connections completed in a timely manner and did not hang indefinitely. Testing enhancements: * Added a new helper in TestHelper called LookupSession that allows the lookup of a session from the worker's internal session state. * This is used in the new ExpectConnectionStateOnWorker check in the E2E test, which is part of a larger amount of enhancements to the E2E test that are coming in #1340. * Added a ConnectionStatusFromProtoVal helper function to the session package to allow the conversion of protobuf connection status values to their higher-level counterparts; essentially it is a reverse of ConnectionStatus.ProtoVal. This is used in some of our tests (with more coming in #1340) to just help with keeping things more idiomatic. Finally, there is some minor error refactoring, and we also change the timeout on internal status and connection close requests to the controller to 5 seconds. --- internal/servers/worker/session.go | 63 +++++++-- internal/servers/worker/session_test.go | 48 ++++++- internal/servers/worker/status.go | 2 +- internal/servers/worker/testing.go | 60 ++++++++ internal/servers/worker/testing_test.go | 61 ++++++++ internal/session/connection_state.go | 21 ++- .../tests/cluster/session_cleanup_test.go | 133 ++++++++++++++++-- 7 files changed, 359 insertions(+), 29 deletions(-) create mode 100644 internal/servers/worker/testing_test.go diff --git a/internal/servers/worker/session.go b/internal/servers/worker/session.go index ebd5da269c..e387df6a80 100644 --- a/internal/servers/worker/session.go +++ b/internal/servers/worker/session.go @@ -15,10 +15,9 @@ import ( "github.com/hashicorp/boundary/internal/session" ) -const ( - validateSessionTimeout = 90 * time.Second - errMakeSessionCloseInfoNilCloseInfo = "nil closeInfo supplied to makeSessionCloseInfo, this is a bug, please report it" -) +const validateSessionTimeout = 90 * time.Second + +var errMakeSessionCloseInfoNilCloseInfo = errors.New("nil closeInfo supplied to makeSessionCloseInfo, this is a bug, please report it") type connInfo struct { id string @@ -267,17 +266,41 @@ func (w *Worker) closeConnections(ctx context.Context, closeInfo map[string]stri } w.logger.Trace("marking connections as closed", "session_and_connection_ids", fmt.Sprintf("%#v", closeInfo)) - response, err := w.closeConnection(ctx, w.makeCloseConnectionRequest(closeInfo)) + + // How we handle close info depends on whether or not we succeeded with + // marking them closed on the controller. + var sessionCloseInfo map[string][]*pbs.CloseConnectionResponseData + var err error + + // TODO: This, along with the status call to the controller, probably needs a + // bit of formalization in terms of how we handle timeouts. For now, this + // just ensures consistency with the same status call in that it times out + // within an adequate period of time. + closeConnCtx, closeConnCancel := context.WithTimeout(ctx, statusTimeout) + defer closeConnCancel() + response, err := w.closeConnection(closeConnCtx, w.makeCloseConnectionRequest(closeInfo)) if err != nil { w.logger.Error("error marking connections closed", "error", err) w.logger.Warn( "error contacting controller, connections will be closed only on worker", "session_and_connection_ids", fmt.Sprintf("%#v", closeInfo), ) + + // Since we could not reach the controller, we have to make a "fake" response set. + sessionCloseInfo, err = w.makeFakeSessionCloseInfo(closeInfo) + } else { + // Connection succeeded, so we can proceed with making the sessionCloseInfo + // off of the response data. + sessionCloseInfo, err = w.makeSessionCloseInfo(closeInfo, response) + } + + if err != nil { + w.logger.Error(err.Error()) + w.logger.Error("serious error in processing return data from controller, aborting marking connections as closed") } // Mark connections as closed - closedIds, errs := w.setCloseTimeForResponse(w.makeSessionCloseInfo(closeInfo, response)) + closedIds, errs := w.setCloseTimeForResponse(sessionCloseInfo) if len(errs) > 0 { for _, err := range errs { w.logger.Error("error marking connection closed in state", "err", err) @@ -316,11 +339,9 @@ func (w *Worker) makeCloseConnectionRequest(closeInfo map[string]string) *pbs.Cl func (w *Worker) makeSessionCloseInfo( closeInfo map[string]string, response *pbs.CloseConnectionResponse, -) map[string][]*pbs.CloseConnectionResponseData { +) (map[string][]*pbs.CloseConnectionResponseData, error) { if closeInfo == nil { - // Should never happen, panic if it does. Results will be - // undefined. - panic(errMakeSessionCloseInfoNilCloseInfo) + return nil, errMakeSessionCloseInfoNilCloseInfo } result := make(map[string][]*pbs.CloseConnectionResponseData) @@ -328,7 +349,27 @@ func (w *Worker) makeSessionCloseInfo( result[closeInfo[v.GetConnectionId()]] = append(result[closeInfo[v.GetConnectionId()]], v) } - return result + return result, nil +} + +// makeFakeSessionCloseInfo makes a "fake" makeFakeSessionCloseInfo, intended +// for use when we can't contact the controller. +func (w *Worker) makeFakeSessionCloseInfo( + closeInfo map[string]string, +) (map[string][]*pbs.CloseConnectionResponseData, error) { + if closeInfo == nil { + return nil, errMakeSessionCloseInfoNilCloseInfo + } + + result := make(map[string][]*pbs.CloseConnectionResponseData) + for connectionId, sessionId := range closeInfo { + result[sessionId] = append(result[sessionId], &pbs.CloseConnectionResponseData{ + ConnectionId: connectionId, + Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED, + }) + } + + return result, nil } // setCloseTimeForResponse iterates a CloseConnectionResponse and diff --git a/internal/servers/worker/session_test.go b/internal/servers/worker/session_test.go index 4d89038fd8..58f2559d5f 100644 --- a/internal/servers/worker/session_test.go +++ b/internal/servers/worker/session_test.go @@ -41,22 +41,58 @@ func TestMakeSessionCloseInfo(t *testing.T) { {ConnectionId: "bar", Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED}, }, } - actual := new(Worker).makeSessionCloseInfo(closeInfo, response) + actual, err := new(Worker).makeSessionCloseInfo(closeInfo, response) + require.NoError(err) require.Equal(expected, actual) } -func TestMakeSessionCloseInfoPanicIfCloseInfoNil(t *testing.T) { +func TestMakeSessionCloseInfoErrorIfCloseInfoNil(t *testing.T) { require := require.New(t) - require.Panics(func() { - new(Worker).makeSessionCloseInfo(nil, nil) - }) + actual, err := new(Worker).makeSessionCloseInfo(nil, nil) + require.Nil(actual) + require.ErrorIs(err, errMakeSessionCloseInfoNilCloseInfo) } func TestMakeSessionCloseInfoEmpty(t *testing.T) { require := require.New(t) + actual, err := new(Worker).makeSessionCloseInfo(make(map[string]string), nil) + require.NoError(err) require.Equal( make(map[string][]*pbs.CloseConnectionResponseData), - new(Worker).makeSessionCloseInfo(make(map[string]string), nil), + actual, + ) +} + +func TestMakeFakeSessionCloseInfo(t *testing.T) { + require := require.New(t) + closeInfo := map[string]string{"foo": "one", "bar": "two"} + expected := map[string][]*pbs.CloseConnectionResponseData{ + "one": { + {ConnectionId: "foo", Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED}, + }, + "two": { + {ConnectionId: "bar", Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED}, + }, + } + actual, err := new(Worker).makeFakeSessionCloseInfo(closeInfo) + require.NoError(err) + require.Equal(expected, actual) +} + +func TestMakeFakeSessionCloseInfoPanicIfCloseInfoNil(t *testing.T) { + require := require.New(t) + actual, err := new(Worker).makeFakeSessionCloseInfo(nil) + require.Nil(actual) + require.ErrorIs(err, errMakeSessionCloseInfoNilCloseInfo) +} + +func TestMakeFakeSessionCloseInfoEmpty(t *testing.T) { + require := require.New(t) + actual, err := new(Worker).makeFakeSessionCloseInfo(make(map[string]string)) + require.NoError(err) + require.Equal( + make(map[string][]*pbs.CloseConnectionResponseData), + actual, ) } diff --git a/internal/servers/worker/status.go b/internal/servers/worker/status.go index 96a670041d..821ceac7ba 100644 --- a/internal/servers/worker/status.go +++ b/internal/servers/worker/status.go @@ -16,7 +16,7 @@ import ( // In the future we could make this configurable const ( statusInterval = 2 * time.Second - statusTimeout = 10 * time.Second + statusTimeout = 5 * time.Second defaultStatusGracePeriod = 30 * time.Second statusGracePeriodEnvVar = "BOUNDARY_STATUS_GRACE_PERIOD" ) diff --git a/internal/servers/worker/testing.go b/internal/servers/worker/testing.go index 19b5755e60..c41b4d4b4b 100644 --- a/internal/servers/worker/testing.go +++ b/internal/servers/worker/testing.go @@ -5,9 +5,11 @@ import ( "fmt" "net" "testing" + "time" "github.com/hashicorp/boundary/internal/cmd/base" "github.com/hashicorp/boundary/internal/cmd/config" + pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping" "github.com/hashicorp/vault/sdk/helper/base62" @@ -76,6 +78,64 @@ func (tw *TestWorker) ProxyAddrs() []string { return tw.addrs } +// TestSessionInfo provides detail about a particular session from +// the worker's local session state. This detail is a point-in-time +// snapshot of what's in sessionInfoMap for a particular session, and +// may not contain all of the information that is contained within +// it, or the underlying connInfoMap. Only details that are really +// important to testing are passed along. +type TestSessionInfo struct { + Id string + Status pbs.SESSIONSTATUS + + // Connections is indexed by connection ID, which is also included + // within TestConnectionInfo for convenience. + Connections map[string]TestConnectionInfo +} + +// TestConnectionInfo provides detail about a particular connection +// as a part of TestSessionInfo. See that struct for details about +// the purpose of this data and how it's gathered. +type TestConnectionInfo struct { + Id string + Status pbs.CONNECTIONSTATUS + CloseTime time.Time +} + +// LookupSession returns session info from the worker's local session +// state. +// +// The return boolean will be true if the session was found, false if +// it wasn't. +// +// See TestSessionInfo for details on how to use this info. +func (tw *TestWorker) LookupSession(id string) (TestSessionInfo, bool) { + var result TestSessionInfo + raw, ok := tw.w.sessionInfoMap.Load(id) + if !ok { + return result, false + } + + sess := raw.(*sessionInfo) + sess.RLock() + defer sess.RUnlock() + + conns := make(map[string]TestConnectionInfo) + for _, conn := range sess.connInfoMap { + conns[conn.id] = TestConnectionInfo{ + Id: conn.id, + Status: conn.status, + CloseTime: conn.closeTime, + } + } + + result.Id = sess.id + result.Status = sess.status + result.Connections = conns + + return result, true +} + // Shutdown runs any cleanup functions; be sure to run this after your test is // done func (tw *TestWorker) Shutdown() { diff --git a/internal/servers/worker/testing_test.go b/internal/servers/worker/testing_test.go new file mode 100644 index 0000000000..5befad1eff --- /dev/null +++ b/internal/servers/worker/testing_test.go @@ -0,0 +1,61 @@ +package worker + +import ( + "sync" + "testing" + "time" + + pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services" + "github.com/stretchr/testify/require" +) + +func TestTestWorkerLookupSession(t *testing.T) { + require := require.New(t) + // This loads the golang reference time, see those docs for more details. We + // just use this as a stable non-zero time. + refTime, err := time.Parse(time.RFC3339, "2006-01-02T15:04:05+07:00") + require.NoError(err) + + tw := new(TestWorker) + tw.w = &Worker{ + sessionInfoMap: new(sync.Map), + } + tw.w.sessionInfoMap.Store("foo", &sessionInfo{ + id: "foo", + status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE, + connInfoMap: map[string]*connInfo{ + "one": &connInfo{ + id: "one", + status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED, + closeTime: refTime, + }, + }, + }) + + expected := TestSessionInfo{ + Id: "foo", + Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE, + Connections: map[string]TestConnectionInfo{ + "one": TestConnectionInfo{ + Id: "one", + Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED, + CloseTime: refTime, + }, + }, + } + + actual, ok := tw.LookupSession("foo") + require.True(ok) + require.Equal(expected, actual) +} + +func TestTestWorkerLookupSessionMissing(t *testing.T) { + require := require.New(t) + tw := new(TestWorker) + tw.w = &Worker{ + sessionInfoMap: new(sync.Map), + } + actual, ok := tw.LookupSession("missing") + require.False(ok) + require.Equal(TestSessionInfo{}, actual) +} diff --git a/internal/session/connection_state.go b/internal/session/connection_state.go index 858bf9408b..b92b75e49a 100644 --- a/internal/session/connection_state.go +++ b/internal/session/connection_state.go @@ -19,9 +19,10 @@ const ( type ConnectionStatus string const ( - StatusAuthorized ConnectionStatus = "authorized" - StatusConnected ConnectionStatus = "connected" - StatusClosed ConnectionStatus = "closed" + StatusAuthorized ConnectionStatus = "authorized" + StatusConnected ConnectionStatus = "connected" + StatusClosed ConnectionStatus = "closed" + StatusUnspecified ConnectionStatus = "unspecified" // Utility state not valid in the DB ) // String representation of the state's status @@ -42,6 +43,20 @@ func (s ConnectionStatus) ProtoVal() workerpbs.CONNECTIONSTATUS { return workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_UNSPECIFIED } +// ConnectionStatusFromProtoVal is the reverse of +// ConnectionStatus.ProtoVal. +func ConnectionStatusFromProtoVal(s workerpbs.CONNECTIONSTATUS) ConnectionStatus { + switch s { + case workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_AUTHORIZED: + return StatusAuthorized + case workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CONNECTED: + return StatusConnected + case workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED: + return StatusClosed + } + return StatusUnspecified +} + // ConnectionState of the state of the connection type ConnectionState struct { // ConnectionId is used to access the state via an API diff --git a/internal/tests/cluster/session_cleanup_test.go b/internal/tests/cluster/session_cleanup_test.go index 4b5f773d6e..aa81c0a980 100644 --- a/internal/tests/cluster/session_cleanup_test.go +++ b/internal/tests/cluster/session_cleanup_test.go @@ -11,6 +11,7 @@ import ( "io" "net" "net/http" + "reflect" "strconv" "testing" "time" @@ -22,18 +23,37 @@ import ( "github.com/hashicorp/boundary/internal/proxy" "github.com/hashicorp/boundary/internal/servers/controller" "github.com/hashicorp/boundary/internal/servers/worker" + "github.com/hashicorp/boundary/internal/session" "github.com/hashicorp/dawdle" "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-hclog" "github.com/hashicorp/vault/sdk/helper/base62" "github.com/mr-tron/base58" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "nhooyr.io/websocket" "nhooyr.io/websocket/wspb" ) -const testSendRecvSendMax = 60 +const ( + testSendRecvSendMax = 60 + defaultGracePeriod = time.Second * 30 + expectConnectionStateOnWorkerTimeout = defaultGracePeriod * 2 + + // This is the interval that we check states on in the worker. It + // needs to be particularly granular to ensure that we allow for + // adequate time to catch any edge cases where the connection state + // disappears from the worker before we can update the state. + // + // As of this writing, the (hardcoded) status request interval on + // the worker is 2 seconds, and a session is removed from the state + // when it has no connections left on the *next* pass. A one second + // interval would only mean two chances to check with a high + // possibility of skew; while it seems okay I'm still not 100% + // comfortable with this little resolution. + expectConnectionStateOnWorkerInterval = time.Millisecond * 100 +) func TestWorkerSessionCleanup(t *testing.T) { require := require.New(t) @@ -99,7 +119,7 @@ func TestWorkerSessionCleanup(t *testing.T) { require.NotNil(tgt) // Authorize and connect - sess := newTestSession(ctx, t, tcl, "ttcp_1234567890") + sess := newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") sConn := sess.Connect(ctx, t, logger) // Run initial send/receive test, make sure things are working @@ -114,10 +134,13 @@ func TestWorkerSessionCleanup(t *testing.T) { // Assert we have no connections left (should be default behavior) sess.TestNoConnectionsLeft(t) + // Assert connection has been removed from the local worker state + sess.ExpectConnectionStateOnWorker(ctx, t, w1, session.StatusClosed) + // Resume the connection, and reconnect. proxy.Resume() - time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy - sess = newTestSession(ctx, t, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup() + time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy + sess = newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup() sConn = sess.Connect(ctx, t, logger) sConn.TestSendRecvAll(t) } @@ -219,7 +242,7 @@ func TestWorkerSessionCleanupMultiController(t *testing.T) { require.NotNil(tgt) // Authorize and connect - sess := newTestSession(ctx, t, tcl, "ttcp_1234567890") + sess := newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") sConn := sess.Connect(ctx, t, logger) // Run initial send/receive test, make sure things are working @@ -240,21 +263,29 @@ func TestWorkerSessionCleanupMultiController(t *testing.T) { p1.Pause() sConn.TestSendRecvFail(t) + // Assert we have no connections left (should be default behavior) + sess.TestNoConnectionsLeft(t) + + // Assert connection has been removed from the local worker state + sess.ExpectConnectionStateOnWorker(ctx, t, w1, session.StatusClosed) + // Finally resume both, try again. Should behave as per normal. p1.Resume() p2.Resume() - time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy - sess = newTestSession(ctx, t, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup() + time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy + sess = newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup() sConn = sess.Connect(ctx, t, logger) sConn.TestSendRecvAll(t) } // testSession represents an authorized session. type testSession struct { + sessionId string workerAddr string transport *http.Transport tofuToken string connectionsLeft int32 + logger hclog.Logger } // newTestSession authorizes a session and creates all of the data @@ -262,6 +293,7 @@ type testSession struct { func newTestSession( ctx context.Context, t *testing.T, + logger hclog.Logger, tcl *targets.Client, targetId string, ) *testSession { @@ -271,7 +303,10 @@ func newTestSession( require.NoError(err) require.NotNil(sar) - s := new(testSession) + s := &testSession{ + sessionId: sar.Item.SessionId, + logger: logger, + } authzString := sar.GetItem().(*targets.SessionAuthorization).AuthorizationToken marshaled, err := base58.FastBase58Decoding(authzString) require.NoError(err) @@ -363,6 +398,88 @@ func (s *testSession) TestNoConnectionsLeft(t *testing.T) { require.Zero(t, s.connectionsLeft) } +// ExpectConnectionStateOnWorker waits until all connections in a +// session have transitioned to a particular state on the worker. +func (s *testSession) ExpectConnectionStateOnWorker( + ctx context.Context, + t *testing.T, + tw *worker.TestWorker, + expectState session.ConnectionStatus, +) { + t.Helper() + require := require.New(t) + assert := assert.New(t) + + ctx, cancel := context.WithTimeout(ctx, expectConnectionStateOnWorkerTimeout) + defer cancel() + + // This is just for initialization of the actual state set. + const sessionStatusUnknown session.ConnectionStatus = "unknown" + + connIds := s.testWorkerConnectionIds(t, tw) + // Make a set of states, 1 per connection + actualStates := make(map[string]session.ConnectionStatus, len(connIds)) + for _, id := range connIds { + actualStates[id] = sessionStatusUnknown + } + + // Make expect set for comparison + expectStates := make(map[string]session.ConnectionStatus, len(connIds)) + for _, id := range connIds { + expectStates[id] = expectState + } + + for { + if ctx.Err() != nil { + break + } + + for _, conn := range s.testWorkerConnectionInfo(t, tw) { + actualStates[conn.Id] = session.ConnectionStatusFromProtoVal(conn.Status) + } + + if reflect.DeepEqual(expectStates, actualStates) { + break + } + + time.Sleep(expectConnectionStateOnWorkerInterval) + } + + // "non-fatal" assert here, so that we can surface both timeouts + // and invalid state + assert.NoError(ctx.Err()) + + // Assert + require.Equal(expectStates, actualStates) + s.logger.Debug("successfully asserted all connection states on worker", "expected_states", expectStates, "actual_states", actualStates) +} + +func (s *testSession) testWorkerConnectionInfo(t *testing.T, tw *worker.TestWorker) map[string]worker.TestConnectionInfo { + t.Helper() + require := require.New(t) + si, ok := tw.LookupSession(s.sessionId) + // This is always an error if the session has been removed from the + // local state. + require.True(ok) + + // Likewise, we require the helper to always be used with + // connections. + require.Greater(len(si.Connections), 0, "should have at least one connection") + + return si.Connections +} + +func (s *testSession) testWorkerConnectionIds(t *testing.T, tw *worker.TestWorker) []string { + t.Helper() + conns := s.testWorkerConnectionInfo(t, tw) + result := make([]string, 0, len(conns)) + for _, conn := range conns { + result = append(result, conn.Id) + } + + return result +} + // testSessionConnection abstracts a connected session. type testSessionConnection struct { conn net.Conn