worker: ensure connections are closed in local state on controller fail (#1369)

This backports some test work that is being added in #1340 to assert
that the worker is actually marking connections closed in its *local*
state on a failure to contact the controller. This was missed in #1357
during some late-stage refactoring.

Summary of the changes:

* If we can't actually contact the controller during connection closure
to mark the connections as closed, we actually construct a "fake"
response that just simply includes a closed state for all connections
requested. This is then used to update the local connection state.

* Added a 3 second timeout to the CloseConnection request to the
controller to ensure the call to close connections completed in a
timely manner and did not hang indefinitely.

Testing enhancements:

* Added a new helper in TestHelper called LookupSession that allows the
lookup of a session from the worker's internal session state.

* This is used in the new ExpectConnectionStateOnWorker check in the E2E
test, which is part of a larger amount of enhancements to the E2E test
that are coming in #1340.

* Added a ConnectionStatusFromProtoVal helper function to the session
package to allow the conversion of protobuf connection status values to
their higher-level counterparts; essentially it is a reverse of
ConnectionStatus.ProtoVal. This is used in some of our tests (with more
coming in #1340) to just help with keeping things more idiomatic.

Finally, there is some minor error refactoring, and we also change the
timeout on internal status and connection close requests to the
controller to 5 seconds.
build-48e55f156a0fbdcb4e1e711b04271e57bc8f952e-7706fefd870195c1
Chris Marchesi 5 years ago committed by GitHub
parent b3a74584ca
commit 48e55f156a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -15,10 +15,9 @@ import (
"github.com/hashicorp/boundary/internal/session"
)
const (
validateSessionTimeout = 90 * time.Second
errMakeSessionCloseInfoNilCloseInfo = "nil closeInfo supplied to makeSessionCloseInfo, this is a bug, please report it"
)
const validateSessionTimeout = 90 * time.Second
var errMakeSessionCloseInfoNilCloseInfo = errors.New("nil closeInfo supplied to makeSessionCloseInfo, this is a bug, please report it")
type connInfo struct {
id string
@ -267,17 +266,41 @@ func (w *Worker) closeConnections(ctx context.Context, closeInfo map[string]stri
}
w.logger.Trace("marking connections as closed", "session_and_connection_ids", fmt.Sprintf("%#v", closeInfo))
response, err := w.closeConnection(ctx, w.makeCloseConnectionRequest(closeInfo))
// How we handle close info depends on whether or not we succeeded with
// marking them closed on the controller.
var sessionCloseInfo map[string][]*pbs.CloseConnectionResponseData
var err error
// TODO: This, along with the status call to the controller, probably needs a
// bit of formalization in terms of how we handle timeouts. For now, this
// just ensures consistency with the same status call in that it times out
// within an adequate period of time.
closeConnCtx, closeConnCancel := context.WithTimeout(ctx, statusTimeout)
defer closeConnCancel()
response, err := w.closeConnection(closeConnCtx, w.makeCloseConnectionRequest(closeInfo))
if err != nil {
w.logger.Error("error marking connections closed", "error", err)
w.logger.Warn(
"error contacting controller, connections will be closed only on worker",
"session_and_connection_ids", fmt.Sprintf("%#v", closeInfo),
)
// Since we could not reach the controller, we have to make a "fake" response set.
sessionCloseInfo, err = w.makeFakeSessionCloseInfo(closeInfo)
} else {
// Connection succeeded, so we can proceed with making the sessionCloseInfo
// off of the response data.
sessionCloseInfo, err = w.makeSessionCloseInfo(closeInfo, response)
}
if err != nil {
w.logger.Error(err.Error())
w.logger.Error("serious error in processing return data from controller, aborting marking connections as closed")
}
// Mark connections as closed
closedIds, errs := w.setCloseTimeForResponse(w.makeSessionCloseInfo(closeInfo, response))
closedIds, errs := w.setCloseTimeForResponse(sessionCloseInfo)
if len(errs) > 0 {
for _, err := range errs {
w.logger.Error("error marking connection closed in state", "err", err)
@ -316,11 +339,9 @@ func (w *Worker) makeCloseConnectionRequest(closeInfo map[string]string) *pbs.Cl
func (w *Worker) makeSessionCloseInfo(
closeInfo map[string]string,
response *pbs.CloseConnectionResponse,
) map[string][]*pbs.CloseConnectionResponseData {
) (map[string][]*pbs.CloseConnectionResponseData, error) {
if closeInfo == nil {
// Should never happen, panic if it does. Results will be
// undefined.
panic(errMakeSessionCloseInfoNilCloseInfo)
return nil, errMakeSessionCloseInfoNilCloseInfo
}
result := make(map[string][]*pbs.CloseConnectionResponseData)
@ -328,7 +349,27 @@ func (w *Worker) makeSessionCloseInfo(
result[closeInfo[v.GetConnectionId()]] = append(result[closeInfo[v.GetConnectionId()]], v)
}
return result
return result, nil
}
// makeFakeSessionCloseInfo makes a "fake" makeFakeSessionCloseInfo, intended
// for use when we can't contact the controller.
func (w *Worker) makeFakeSessionCloseInfo(
closeInfo map[string]string,
) (map[string][]*pbs.CloseConnectionResponseData, error) {
if closeInfo == nil {
return nil, errMakeSessionCloseInfoNilCloseInfo
}
result := make(map[string][]*pbs.CloseConnectionResponseData)
for connectionId, sessionId := range closeInfo {
result[sessionId] = append(result[sessionId], &pbs.CloseConnectionResponseData{
ConnectionId: connectionId,
Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED,
})
}
return result, nil
}
// setCloseTimeForResponse iterates a CloseConnectionResponse and

@ -41,22 +41,58 @@ func TestMakeSessionCloseInfo(t *testing.T) {
{ConnectionId: "bar", Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED},
},
}
actual := new(Worker).makeSessionCloseInfo(closeInfo, response)
actual, err := new(Worker).makeSessionCloseInfo(closeInfo, response)
require.NoError(err)
require.Equal(expected, actual)
}
func TestMakeSessionCloseInfoPanicIfCloseInfoNil(t *testing.T) {
func TestMakeSessionCloseInfoErrorIfCloseInfoNil(t *testing.T) {
require := require.New(t)
require.Panics(func() {
new(Worker).makeSessionCloseInfo(nil, nil)
})
actual, err := new(Worker).makeSessionCloseInfo(nil, nil)
require.Nil(actual)
require.ErrorIs(err, errMakeSessionCloseInfoNilCloseInfo)
}
func TestMakeSessionCloseInfoEmpty(t *testing.T) {
require := require.New(t)
actual, err := new(Worker).makeSessionCloseInfo(make(map[string]string), nil)
require.NoError(err)
require.Equal(
make(map[string][]*pbs.CloseConnectionResponseData),
new(Worker).makeSessionCloseInfo(make(map[string]string), nil),
actual,
)
}
func TestMakeFakeSessionCloseInfo(t *testing.T) {
require := require.New(t)
closeInfo := map[string]string{"foo": "one", "bar": "two"}
expected := map[string][]*pbs.CloseConnectionResponseData{
"one": {
{ConnectionId: "foo", Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED},
},
"two": {
{ConnectionId: "bar", Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED},
},
}
actual, err := new(Worker).makeFakeSessionCloseInfo(closeInfo)
require.NoError(err)
require.Equal(expected, actual)
}
func TestMakeFakeSessionCloseInfoPanicIfCloseInfoNil(t *testing.T) {
require := require.New(t)
actual, err := new(Worker).makeFakeSessionCloseInfo(nil)
require.Nil(actual)
require.ErrorIs(err, errMakeSessionCloseInfoNilCloseInfo)
}
func TestMakeFakeSessionCloseInfoEmpty(t *testing.T) {
require := require.New(t)
actual, err := new(Worker).makeFakeSessionCloseInfo(make(map[string]string))
require.NoError(err)
require.Equal(
make(map[string][]*pbs.CloseConnectionResponseData),
actual,
)
}

@ -16,7 +16,7 @@ import (
// In the future we could make this configurable
const (
statusInterval = 2 * time.Second
statusTimeout = 10 * time.Second
statusTimeout = 5 * time.Second
defaultStatusGracePeriod = 30 * time.Second
statusGracePeriodEnvVar = "BOUNDARY_STATUS_GRACE_PERIOD"
)

@ -5,9 +5,11 @@ import (
"fmt"
"net"
"testing"
"time"
"github.com/hashicorp/boundary/internal/cmd/base"
"github.com/hashicorp/boundary/internal/cmd/config"
pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/hashicorp/go-hclog"
wrapping "github.com/hashicorp/go-kms-wrapping"
"github.com/hashicorp/vault/sdk/helper/base62"
@ -76,6 +78,64 @@ func (tw *TestWorker) ProxyAddrs() []string {
return tw.addrs
}
// TestSessionInfo provides detail about a particular session from
// the worker's local session state. This detail is a point-in-time
// snapshot of what's in sessionInfoMap for a particular session, and
// may not contain all of the information that is contained within
// it, or the underlying connInfoMap. Only details that are really
// important to testing are passed along.
type TestSessionInfo struct {
Id string
Status pbs.SESSIONSTATUS
// Connections is indexed by connection ID, which is also included
// within TestConnectionInfo for convenience.
Connections map[string]TestConnectionInfo
}
// TestConnectionInfo provides detail about a particular connection
// as a part of TestSessionInfo. See that struct for details about
// the purpose of this data and how it's gathered.
type TestConnectionInfo struct {
Id string
Status pbs.CONNECTIONSTATUS
CloseTime time.Time
}
// LookupSession returns session info from the worker's local session
// state.
//
// The return boolean will be true if the session was found, false if
// it wasn't.
//
// See TestSessionInfo for details on how to use this info.
func (tw *TestWorker) LookupSession(id string) (TestSessionInfo, bool) {
var result TestSessionInfo
raw, ok := tw.w.sessionInfoMap.Load(id)
if !ok {
return result, false
}
sess := raw.(*sessionInfo)
sess.RLock()
defer sess.RUnlock()
conns := make(map[string]TestConnectionInfo)
for _, conn := range sess.connInfoMap {
conns[conn.id] = TestConnectionInfo{
Id: conn.id,
Status: conn.status,
CloseTime: conn.closeTime,
}
}
result.Id = sess.id
result.Status = sess.status
result.Connections = conns
return result, true
}
// Shutdown runs any cleanup functions; be sure to run this after your test is
// done
func (tw *TestWorker) Shutdown() {

@ -0,0 +1,61 @@
package worker
import (
"sync"
"testing"
"time"
pbs "github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/stretchr/testify/require"
)
func TestTestWorkerLookupSession(t *testing.T) {
require := require.New(t)
// This loads the golang reference time, see those docs for more details. We
// just use this as a stable non-zero time.
refTime, err := time.Parse(time.RFC3339, "2006-01-02T15:04:05+07:00")
require.NoError(err)
tw := new(TestWorker)
tw.w = &Worker{
sessionInfoMap: new(sync.Map),
}
tw.w.sessionInfoMap.Store("foo", &sessionInfo{
id: "foo",
status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
connInfoMap: map[string]*connInfo{
"one": &connInfo{
id: "one",
status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED,
closeTime: refTime,
},
},
})
expected := TestSessionInfo{
Id: "foo",
Status: pbs.SESSIONSTATUS_SESSIONSTATUS_ACTIVE,
Connections: map[string]TestConnectionInfo{
"one": TestConnectionInfo{
Id: "one",
Status: pbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED,
CloseTime: refTime,
},
},
}
actual, ok := tw.LookupSession("foo")
require.True(ok)
require.Equal(expected, actual)
}
func TestTestWorkerLookupSessionMissing(t *testing.T) {
require := require.New(t)
tw := new(TestWorker)
tw.w = &Worker{
sessionInfoMap: new(sync.Map),
}
actual, ok := tw.LookupSession("missing")
require.False(ok)
require.Equal(TestSessionInfo{}, actual)
}

@ -19,9 +19,10 @@ const (
type ConnectionStatus string
const (
StatusAuthorized ConnectionStatus = "authorized"
StatusConnected ConnectionStatus = "connected"
StatusClosed ConnectionStatus = "closed"
StatusAuthorized ConnectionStatus = "authorized"
StatusConnected ConnectionStatus = "connected"
StatusClosed ConnectionStatus = "closed"
StatusUnspecified ConnectionStatus = "unspecified" // Utility state not valid in the DB
)
// String representation of the state's status
@ -42,6 +43,20 @@ func (s ConnectionStatus) ProtoVal() workerpbs.CONNECTIONSTATUS {
return workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_UNSPECIFIED
}
// ConnectionStatusFromProtoVal is the reverse of
// ConnectionStatus.ProtoVal.
func ConnectionStatusFromProtoVal(s workerpbs.CONNECTIONSTATUS) ConnectionStatus {
switch s {
case workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_AUTHORIZED:
return StatusAuthorized
case workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CONNECTED:
return StatusConnected
case workerpbs.CONNECTIONSTATUS_CONNECTIONSTATUS_CLOSED:
return StatusClosed
}
return StatusUnspecified
}
// ConnectionState of the state of the connection
type ConnectionState struct {
// ConnectionId is used to access the state via an API

@ -11,6 +11,7 @@ import (
"io"
"net"
"net/http"
"reflect"
"strconv"
"testing"
"time"
@ -22,18 +23,37 @@ import (
"github.com/hashicorp/boundary/internal/proxy"
"github.com/hashicorp/boundary/internal/servers/controller"
"github.com/hashicorp/boundary/internal/servers/worker"
"github.com/hashicorp/boundary/internal/session"
"github.com/hashicorp/dawdle"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/sdk/helper/base62"
"github.com/mr-tron/base58"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wspb"
)
const testSendRecvSendMax = 60
const (
testSendRecvSendMax = 60
defaultGracePeriod = time.Second * 30
expectConnectionStateOnWorkerTimeout = defaultGracePeriod * 2
// This is the interval that we check states on in the worker. It
// needs to be particularly granular to ensure that we allow for
// adequate time to catch any edge cases where the connection state
// disappears from the worker before we can update the state.
//
// As of this writing, the (hardcoded) status request interval on
// the worker is 2 seconds, and a session is removed from the state
// when it has no connections left on the *next* pass. A one second
// interval would only mean two chances to check with a high
// possibility of skew; while it seems okay I'm still not 100%
// comfortable with this little resolution.
expectConnectionStateOnWorkerInterval = time.Millisecond * 100
)
func TestWorkerSessionCleanup(t *testing.T) {
require := require.New(t)
@ -99,7 +119,7 @@ func TestWorkerSessionCleanup(t *testing.T) {
require.NotNil(tgt)
// Authorize and connect
sess := newTestSession(ctx, t, tcl, "ttcp_1234567890")
sess := newTestSession(ctx, t, logger, tcl, "ttcp_1234567890")
sConn := sess.Connect(ctx, t, logger)
// Run initial send/receive test, make sure things are working
@ -114,10 +134,13 @@ func TestWorkerSessionCleanup(t *testing.T) {
// Assert we have no connections left (should be default behavior)
sess.TestNoConnectionsLeft(t)
// Assert connection has been removed from the local worker state
sess.ExpectConnectionStateOnWorker(ctx, t, w1, session.StatusClosed)
// Resume the connection, and reconnect.
proxy.Resume()
time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy
sess = newTestSession(ctx, t, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup()
time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy
sess = newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup()
sConn = sess.Connect(ctx, t, logger)
sConn.TestSendRecvAll(t)
}
@ -219,7 +242,7 @@ func TestWorkerSessionCleanupMultiController(t *testing.T) {
require.NotNil(tgt)
// Authorize and connect
sess := newTestSession(ctx, t, tcl, "ttcp_1234567890")
sess := newTestSession(ctx, t, logger, tcl, "ttcp_1234567890")
sConn := sess.Connect(ctx, t, logger)
// Run initial send/receive test, make sure things are working
@ -240,21 +263,29 @@ func TestWorkerSessionCleanupMultiController(t *testing.T) {
p1.Pause()
sConn.TestSendRecvFail(t)
// Assert we have no connections left (should be default behavior)
sess.TestNoConnectionsLeft(t)
// Assert connection has been removed from the local worker state
sess.ExpectConnectionStateOnWorker(ctx, t, w1, session.StatusClosed)
// Finally resume both, try again. Should behave as per normal.
p1.Resume()
p2.Resume()
time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy
sess = newTestSession(ctx, t, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup()
time.Sleep(time.Second * 10) // Sleep to wait for worker to report back as healthy
sess = newTestSession(ctx, t, logger, tcl, "ttcp_1234567890") // re-assign, other connection will close in t.Cleanup()
sConn = sess.Connect(ctx, t, logger)
sConn.TestSendRecvAll(t)
}
// testSession represents an authorized session.
type testSession struct {
sessionId string
workerAddr string
transport *http.Transport
tofuToken string
connectionsLeft int32
logger hclog.Logger
}
// newTestSession authorizes a session and creates all of the data
@ -262,6 +293,7 @@ type testSession struct {
func newTestSession(
ctx context.Context,
t *testing.T,
logger hclog.Logger,
tcl *targets.Client,
targetId string,
) *testSession {
@ -271,7 +303,10 @@ func newTestSession(
require.NoError(err)
require.NotNil(sar)
s := new(testSession)
s := &testSession{
sessionId: sar.Item.SessionId,
logger: logger,
}
authzString := sar.GetItem().(*targets.SessionAuthorization).AuthorizationToken
marshaled, err := base58.FastBase58Decoding(authzString)
require.NoError(err)
@ -363,6 +398,88 @@ func (s *testSession) TestNoConnectionsLeft(t *testing.T) {
require.Zero(t, s.connectionsLeft)
}
// ExpectConnectionStateOnWorker waits until all connections in a
// session have transitioned to a particular state on the worker.
func (s *testSession) ExpectConnectionStateOnWorker(
ctx context.Context,
t *testing.T,
tw *worker.TestWorker,
expectState session.ConnectionStatus,
) {
t.Helper()
require := require.New(t)
assert := assert.New(t)
ctx, cancel := context.WithTimeout(ctx, expectConnectionStateOnWorkerTimeout)
defer cancel()
// This is just for initialization of the actual state set.
const sessionStatusUnknown session.ConnectionStatus = "unknown"
connIds := s.testWorkerConnectionIds(t, tw)
// Make a set of states, 1 per connection
actualStates := make(map[string]session.ConnectionStatus, len(connIds))
for _, id := range connIds {
actualStates[id] = sessionStatusUnknown
}
// Make expect set for comparison
expectStates := make(map[string]session.ConnectionStatus, len(connIds))
for _, id := range connIds {
expectStates[id] = expectState
}
for {
if ctx.Err() != nil {
break
}
for _, conn := range s.testWorkerConnectionInfo(t, tw) {
actualStates[conn.Id] = session.ConnectionStatusFromProtoVal(conn.Status)
}
if reflect.DeepEqual(expectStates, actualStates) {
break
}
time.Sleep(expectConnectionStateOnWorkerInterval)
}
// "non-fatal" assert here, so that we can surface both timeouts
// and invalid state
assert.NoError(ctx.Err())
// Assert
require.Equal(expectStates, actualStates)
s.logger.Debug("successfully asserted all connection states on worker", "expected_states", expectStates, "actual_states", actualStates)
}
func (s *testSession) testWorkerConnectionInfo(t *testing.T, tw *worker.TestWorker) map[string]worker.TestConnectionInfo {
t.Helper()
require := require.New(t)
si, ok := tw.LookupSession(s.sessionId)
// This is always an error if the session has been removed from the
// local state.
require.True(ok)
// Likewise, we require the helper to always be used with
// connections.
require.Greater(len(si.Connections), 0, "should have at least one connection")
return si.Connections
}
func (s *testSession) testWorkerConnectionIds(t *testing.T, tw *worker.TestWorker) []string {
t.Helper()
conns := s.testWorkerConnectionInfo(t, tw)
result := make([]string, 0, len(conns))
for _, conn := range conns {
result = append(result, conn.Id)
}
return result
}
// testSessionConnection abstracts a connected session.
type testSessionConnection struct {
conn net.Conn

Loading…
Cancel
Save