diff --git a/internal/servers/controller/controller.go b/internal/servers/controller/controller.go index 1c1a93791b..0268302c79 100644 --- a/internal/servers/controller/controller.go +++ b/internal/servers/controller/controller.go @@ -303,7 +303,7 @@ func (c *Controller) registerJobs() error { // registerSessionConnectionCleanupJob is a helper method to abstract // registering the session connection cleanup job specifically. func (c *Controller) registerSessionConnectionCleanupJob() error { - sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(c.ConnectionRepoFn, int(c.conf.StatusGracePeriodDuration.Seconds())) + sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(c.ConnectionRepoFn, c.conf.StatusGracePeriodDuration) if err != nil { return fmt.Errorf("error creating session cleanup job: %w", err) } diff --git a/internal/servers/controller/multi_test.go b/internal/servers/controller/multi_test.go index bbe874a9eb..c2d7cec213 100644 --- a/internal/servers/controller/multi_test.go +++ b/internal/servers/controller/multi_test.go @@ -3,7 +3,6 @@ package controller_test import ( "encoding/json" "testing" - "time" "github.com/hashicorp/boundary/api/authmethods" "github.com/hashicorp/boundary/api/authtokens" @@ -38,7 +37,6 @@ func TestAuthenticationMulti(t *testing.T) { require.NoError(json.Unmarshal(token1Result.GetRawAttributes(), token1)) require.NotNil(token1) - time.Sleep(5 * time.Second) auth = authmethods.NewClient(c2.Client()) token2Result, err := auth.Authenticate(c2.Context(), c2.Server().DevPasswordAuthMethodId, "login", map[string]interface{}{"login_name": c2.Server().DevLoginName, "password": c2.Server().DevPassword}) require.Nil(err) diff --git a/internal/servers/controller/session_cleanup_job.go b/internal/servers/controller/session_cleanup_job.go index 534065c58b..1d63e22e8c 100644 --- a/internal/servers/controller/session_cleanup_job.go +++ b/internal/servers/controller/session_cleanup_job.go @@ -27,7 +27,7 @@ type sessionConnectionCleanupJob struct { // The amount of time to give disconnected workers before marking // their connections as closed. - gracePeriod int + gracePeriod time.Duration // The total number of connections closed in the last run. totalClosed int @@ -36,7 +36,7 @@ type sessionConnectionCleanupJob struct { // newSessionConnectionCleanupJob instantiates the session cleanup job. func newSessionConnectionCleanupJob( connectionRepoFn common.ConnectionRepoFactory, - gracePeriod int, + gracePeriod time.Duration, ) (*sessionConnectionCleanupJob, error) { const op = "controller.newNewSessionConnectionCleanupJob" switch { @@ -44,7 +44,7 @@ func newSessionConnectionCleanupJob( return nil, errors.NewDeprecated(errors.InvalidParameter, op, "missing connectionRepoFn") case gracePeriod < session.DeadWorkerConnCloseMinGrace: return nil, errors.NewDeprecated( - errors.InvalidParameter, op, fmt.Sprintf("invalid gracePeriod, must be greater than %d", session.DeadWorkerConnCloseMinGrace)) + errors.InvalidParameter, op, fmt.Sprintf("invalid gracePeriod, must be greater than %s", session.DeadWorkerConnCloseMinGrace)) } return &sessionConnectionCleanupJob{ @@ -102,7 +102,7 @@ func (j *sessionConnectionCleanupJob) Run(ctx context.Context) error { event.WithInfo( "private_id", result.ServerId, "update_time", result.LastUpdateTime, - "grace_period_seconds", j.gracePeriod, + "grace_period_seconds", j.gracePeriod.Seconds(), "number_connections_closed", result.NumberConnectionsClosed, )) diff --git a/internal/servers/controller/session_cleanup_job_test.go b/internal/servers/controller/session_cleanup_job_test.go index 721389267f..094a0602d3 100644 --- a/internal/servers/controller/session_cleanup_job_test.go +++ b/internal/servers/controller/session_cleanup_job_test.go @@ -26,6 +26,9 @@ var _ = scheduler.Job(new(sessionConnectionCleanupJob)) func TestSessionConnectionCleanupJob(t *testing.T) { t.Parallel() ctx := context.Background() + + const gracePeriod = 1 * time.Second + require, assert := require.New(t), assert.New(t) conn, _ := db.TestSetup(t, "postgres") rw := db.New(conn) @@ -35,7 +38,7 @@ func TestSessionConnectionCleanupJob(t *testing.T) { serversRepo, err := servers.NewRepository(rw, rw, kms) require.NoError(err) sessionRepo, err := session.NewRepository(rw, rw, kms) - connectionRepo, err := session.NewConnectionRepository(ctx, rw, rw, kms) + connectionRepo, err := session.NewConnectionRepository(ctx, rw, rw, kms, session.WithDeadWorkerConnCloseMinGrace(gracePeriod)) require.NoError(err) numConns := 12 @@ -101,10 +104,11 @@ func TestSessionConnectionCleanupJob(t *testing.T) { func() (*session.ConnectionRepository, error) { return connectionRepo, nil }, session.DeadWorkerConnCloseMinGrace, ) + job.gracePeriod = gracePeriod // by-pass factory assert so we dont have to wait so long require.NoError(err) // sleep the status grace period. - time.Sleep(time.Second * time.Duration(session.DeadWorkerConnCloseMinGrace)) + time.Sleep(gracePeriod) // Push an upsert to the first worker so that its status has been // updated. @@ -160,7 +164,7 @@ func TestSessionConnectionCleanupJobNewJobErr(t *testing.T) { ctx, errors.WithCode(errors.InvalidParameter), errors.WithOp(op), - errors.WithMsg(fmt.Sprintf("invalid gracePeriod, must be greater than %d", session.DeadWorkerConnCloseMinGrace)), + errors.WithMsg(fmt.Sprintf("invalid gracePeriod, must be greater than %s", session.DeadWorkerConnCloseMinGrace)), )) require.Nil(job) } diff --git a/internal/session/options.go b/internal/session/options.go index 7d10ae8189..715d67cd74 100644 --- a/internal/session/options.go +++ b/internal/session/options.go @@ -21,22 +21,24 @@ type Option func(*options) // options = how options are represented type options struct { - withLimit int - withOrderByCreateTime db.OrderBy - withScopeIds []string - withUserId string - withExpirationTime *timestamp.Timestamp - withTestTofu []byte - withListingConvert bool - withSessionIds []string - withServerId string - withDbOpts []db.Option - withWorkerStateDelay time.Duration + withLimit int + withOrderByCreateTime db.OrderBy + withScopeIds []string + withUserId string + withExpirationTime *timestamp.Timestamp + withTestTofu []byte + withListingConvert bool + withSessionIds []string + withServerId string + withDbOpts []db.Option + withWorkerStateDelay time.Duration + withDeadWorkerConnCloseMinGrace time.Duration } func getDefaultOptions() options { return options{ - withWorkerStateDelay: 10 * time.Second, + withWorkerStateDelay: 10 * time.Second, + withDeadWorkerConnCloseMinGrace: DeadWorkerConnCloseMinGrace, } } @@ -114,10 +116,19 @@ func WithDbOpts(opts ...db.Option) Option { } } -// WithWorkerStateDelay is used byt queries to account for a delay in state +// WithWorkerStateDelay is used by queries to account for a delay in state // propagation between worker and controller. func WithWorkerStateDelay(d time.Duration) Option { return func(o *options) { o.withWorkerStateDelay = d } } + +// WithDeadWorkerConnCloseMinGrace is used to set the minimum allowable setting +// for the CloseConnectionsForDeadWorkers method. This defaults to the default +// server liveness setting. +func WithDeadWorkerConnCloseMinGrace(d time.Duration) Option { + return func(o *options) { + o.withDeadWorkerConnCloseMinGrace = d + } +} diff --git a/internal/session/query.go b/internal/session/query.go index 39066ac80a..430ae4a08d 100644 --- a/internal/session/query.go +++ b/internal/session/query.go @@ -310,7 +310,7 @@ where dead_servers (server_id, last_update_time) as ( select private_id, update_time from server - where update_time < wt_sub_seconds_from_now(?) + where update_time < wt_sub_seconds_from_now(@grace_period_seconds) ), closed_connections (connection_id, server_id) as ( update session_connection diff --git a/internal/session/repository_connection.go b/internal/session/repository_connection.go index 4f384f9001..b42e7ad933 100644 --- a/internal/session/repository_connection.go +++ b/internal/session/repository_connection.go @@ -16,6 +16,11 @@ import ( "google.golang.org/grpc/status" ) +// DeadWorkerConnCloseMinGrace is the minimum allowable setting for +// the CloseConnectionsForDeadWorkers method. This is synced with +// the default server liveness setting. +var DeadWorkerConnCloseMinGrace = servers.DefaultLiveness + // ConnectionRepository is the session connection database repository. type ConnectionRepository struct { reader db.Reader @@ -28,6 +33,11 @@ type ConnectionRepository struct { // workerStateDelay is used by queries to account for a delay in state propagation between // worker and controller workerStateDelay time.Duration + + // deadWorkerConnCloseMinGrace is the minimum allowable setting for + // the CloseConnectionsForDeadWorkers method. This defaults to + // the default server liveness setting. + deadWorkerConnCloseMinGrace time.Duration } // NewConnectionRepository creates a new session Connection Repository. Supports the options: WithLimit @@ -48,12 +58,14 @@ func NewConnectionRepository(ctx context.Context, r db.Reader, w db.Writer, kms // zero signals the boundary defaults should be used. opts.withLimit = db.DefaultLimit } + return &ConnectionRepository{ - reader: r, - writer: w, - kms: kms, - defaultLimit: opts.withLimit, - workerStateDelay: opts.withWorkerStateDelay, + reader: r, + writer: w, + kms: kms, + defaultLimit: opts.withLimit, + workerStateDelay: opts.withWorkerStateDelay, + deadWorkerConnCloseMinGrace: opts.withDeadWorkerConnCloseMinGrace, }, nil } @@ -134,11 +146,6 @@ func (r *ConnectionRepository) AuthorizeConnection(ctx context.Context, sessionI return &connection, connectionStates, nil } -// deadWorkerConnCloseMinGrace is the minimum allowable setting for -// the CloseConnectionsForDeadWorkers method. This is synced with -// the default server liveness setting. -var DeadWorkerConnCloseMinGrace = int(servers.DefaultLiveness.Seconds()) - // LookupConnection will look up a connection in the repository and return the connection // with its states. If the connection is not found, it will return nil, nil, nil. // No options are currently supported. @@ -362,20 +369,23 @@ type CloseConnectionsForDeadWorkersResult struct { // sending status updates to the controller(s). // // The only input to the method is the grace period, in seconds. -func (r *ConnectionRepository) CloseConnectionsForDeadWorkers(ctx context.Context, gracePeriod int) ([]CloseConnectionsForDeadWorkersResult, error) { +func (r *ConnectionRepository) CloseConnectionsForDeadWorkers(ctx context.Context, gracePeriod time.Duration) ([]CloseConnectionsForDeadWorkersResult, error) { const op = "session.(ConnectionRepository).CloseConnectionsForDeadWorkers" - if gracePeriod < DeadWorkerConnCloseMinGrace { + if gracePeriod < r.deadWorkerConnCloseMinGrace { return nil, errors.New(ctx, - errors.InvalidParameter, op, fmt.Sprintf("gracePeriod must be at least %d seconds", DeadWorkerConnCloseMinGrace)) + errors.InvalidParameter, op, fmt.Sprintf("gracePeriod must be at least %s", r.deadWorkerConnCloseMinGrace)) } + args := []interface{}{ + sql.Named("grace_period_seconds", gracePeriod.Seconds()), + } results := make([]CloseConnectionsForDeadWorkersResult, 0) _, err := r.writer.DoTx( ctx, db.StdRetryCnt, db.ExpBackoff{}, func(reader db.Reader, w db.Writer) error { - rows, err := w.Query(ctx, closeConnectionsForDeadServersCte, []interface{}{gracePeriod}) + rows, err := w.Query(ctx, closeConnectionsForDeadServersCte, args) if err != nil { return errors.Wrap(ctx, err, op) } diff --git a/internal/session/repository_connection_test.go b/internal/session/repository_connection_test.go index 10ffb29ec7..764c9a831d 100644 --- a/internal/session/repository_connection_test.go +++ b/internal/session/repository_connection_test.go @@ -446,7 +446,8 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) { iamRepo := iam.TestRepo(t, conn, wrapper) kms := kms.TestKms(t, conn, wrapper) repo, err := NewRepository(rw, rw, kms) - connRepo, err := NewConnectionRepository(ctx, rw, rw, kms) + deadWorkerConnCloseMinGrace := 1 * time.Second + connRepo, err := NewConnectionRepository(ctx, rw, rw, kms, WithDeadWorkerConnCloseMinGrace(deadWorkerConnCloseMinGrace)) require.NoError(err) serversRepo, err := servers.NewRepository(rw, rw, kms) require.NoError(err) @@ -548,10 +549,6 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) { } } - // There is a 15 second delay to account for time for the connections to - // transition - time.Sleep(15 * time.Second) - // updateServer is a helper for updating the update time for our // servers. The controller is read back so that we can reference // the most up-to-date fields. @@ -616,11 +613,11 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) { // Now try some scenarios. { // First, test the error/validation case. - result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, 0) + result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, -1) require.Equal(err, errors.E(ctx, errors.WithCode(errors.InvalidParameter), errors.WithOp("session.(ConnectionRepository).CloseConnectionsForDeadWorkers"), - errors.WithMsg(fmt.Sprintf("gracePeriod must be at least %d seconds", DeadWorkerConnCloseMinGrace)), + errors.WithMsg(fmt.Sprintf("gracePeriod must be at least %s", deadWorkerConnCloseMinGrace)), )) require.Nil(result) } @@ -632,7 +629,7 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) { worker3 = updateServer(t, worker3) updateServer(t, worker4) // no re-assignment here because we never reference the server again - result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, DeadWorkerConnCloseMinGrace) + result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, deadWorkerConnCloseMinGrace) require.NoError(err) require.Empty(result) // Expect appropriate split connection state on worker1 @@ -647,12 +644,12 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) { // Now try a zero case - similar to the basis, but only in that no results // are expected to be returned for workers with no connections, even if // they are dead. Here, the server with no connections is worker #4. - time.Sleep(time.Second * time.Duration(DeadWorkerConnCloseMinGrace)) + time.Sleep(deadWorkerConnCloseMinGrace) worker1 = updateServer(t, worker1) worker2 = updateServer(t, worker2) worker3 = updateServer(t, worker3) - result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, DeadWorkerConnCloseMinGrace) + result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, deadWorkerConnCloseMinGrace) require.NoError(err) require.Empty(result) // Expect appropriate split connection state on worker1 @@ -666,11 +663,11 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) { { // The first induction is letting the first worker "die" by not updating it // too. All of its authorized and connected connections should be dead. - time.Sleep(time.Second * time.Duration(DeadWorkerConnCloseMinGrace)) + time.Sleep(deadWorkerConnCloseMinGrace) worker2 = updateServer(t, worker2) worker3 = updateServer(t, worker3) - result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, DeadWorkerConnCloseMinGrace) + result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, deadWorkerConnCloseMinGrace) require.NoError(err) // Assert that we have one result with the appropriate ID and // number of connections closed. Due to how things are @@ -693,9 +690,9 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) { // The final case is having the other two workers die. After // this, we should have all connections closed with the // appropriate message from the next two servers acted on. - time.Sleep(time.Second * time.Duration(DeadWorkerConnCloseMinGrace)) + time.Sleep(deadWorkerConnCloseMinGrace) - result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, DeadWorkerConnCloseMinGrace) + result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, deadWorkerConnCloseMinGrace) require.NoError(err) // Assert that we have one result with the appropriate ID and number of connections closed. require.Equal([]CloseConnectionsForDeadWorkersResult{