test(session): Speed up some tests around dead worker cleanup

This removes or reduces the time that is waited in a number of tests by
allowing the grace period time be a configuration option on the
connection repository.
pull/1919/head
Timothy Messier 4 years ago
parent 79670180a7
commit 9aa2d4cd84
No known key found for this signature in database
GPG Key ID: EFD2F184F7600572

@ -303,7 +303,7 @@ func (c *Controller) registerJobs() error {
// registerSessionConnectionCleanupJob is a helper method to abstract
// registering the session connection cleanup job specifically.
func (c *Controller) registerSessionConnectionCleanupJob() error {
sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(c.ConnectionRepoFn, int(c.conf.StatusGracePeriodDuration.Seconds()))
sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(c.ConnectionRepoFn, c.conf.StatusGracePeriodDuration)
if err != nil {
return fmt.Errorf("error creating session cleanup job: %w", err)
}

@ -3,7 +3,6 @@ package controller_test
import (
"encoding/json"
"testing"
"time"
"github.com/hashicorp/boundary/api/authmethods"
"github.com/hashicorp/boundary/api/authtokens"
@ -38,7 +37,6 @@ func TestAuthenticationMulti(t *testing.T) {
require.NoError(json.Unmarshal(token1Result.GetRawAttributes(), token1))
require.NotNil(token1)
time.Sleep(5 * time.Second)
auth = authmethods.NewClient(c2.Client())
token2Result, err := auth.Authenticate(c2.Context(), c2.Server().DevPasswordAuthMethodId, "login", map[string]interface{}{"login_name": c2.Server().DevLoginName, "password": c2.Server().DevPassword})
require.Nil(err)

@ -27,7 +27,7 @@ type sessionConnectionCleanupJob struct {
// The amount of time to give disconnected workers before marking
// their connections as closed.
gracePeriod int
gracePeriod time.Duration
// The total number of connections closed in the last run.
totalClosed int
@ -36,7 +36,7 @@ type sessionConnectionCleanupJob struct {
// newSessionConnectionCleanupJob instantiates the session cleanup job.
func newSessionConnectionCleanupJob(
connectionRepoFn common.ConnectionRepoFactory,
gracePeriod int,
gracePeriod time.Duration,
) (*sessionConnectionCleanupJob, error) {
const op = "controller.newNewSessionConnectionCleanupJob"
switch {
@ -44,7 +44,7 @@ func newSessionConnectionCleanupJob(
return nil, errors.NewDeprecated(errors.InvalidParameter, op, "missing connectionRepoFn")
case gracePeriod < session.DeadWorkerConnCloseMinGrace:
return nil, errors.NewDeprecated(
errors.InvalidParameter, op, fmt.Sprintf("invalid gracePeriod, must be greater than %d", session.DeadWorkerConnCloseMinGrace))
errors.InvalidParameter, op, fmt.Sprintf("invalid gracePeriod, must be greater than %s", session.DeadWorkerConnCloseMinGrace))
}
return &sessionConnectionCleanupJob{
@ -102,7 +102,7 @@ func (j *sessionConnectionCleanupJob) Run(ctx context.Context) error {
event.WithInfo(
"private_id", result.ServerId,
"update_time", result.LastUpdateTime,
"grace_period_seconds", j.gracePeriod,
"grace_period_seconds", j.gracePeriod.Seconds(),
"number_connections_closed", result.NumberConnectionsClosed,
))

@ -26,6 +26,9 @@ var _ = scheduler.Job(new(sessionConnectionCleanupJob))
func TestSessionConnectionCleanupJob(t *testing.T) {
t.Parallel()
ctx := context.Background()
const gracePeriod = 1 * time.Second
require, assert := require.New(t), assert.New(t)
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
@ -35,7 +38,7 @@ func TestSessionConnectionCleanupJob(t *testing.T) {
serversRepo, err := servers.NewRepository(rw, rw, kms)
require.NoError(err)
sessionRepo, err := session.NewRepository(rw, rw, kms)
connectionRepo, err := session.NewConnectionRepository(ctx, rw, rw, kms)
connectionRepo, err := session.NewConnectionRepository(ctx, rw, rw, kms, session.WithDeadWorkerConnCloseMinGrace(gracePeriod))
require.NoError(err)
numConns := 12
@ -101,10 +104,11 @@ func TestSessionConnectionCleanupJob(t *testing.T) {
func() (*session.ConnectionRepository, error) { return connectionRepo, nil },
session.DeadWorkerConnCloseMinGrace,
)
job.gracePeriod = gracePeriod // by-pass factory assert so we dont have to wait so long
require.NoError(err)
// sleep the status grace period.
time.Sleep(time.Second * time.Duration(session.DeadWorkerConnCloseMinGrace))
time.Sleep(gracePeriod)
// Push an upsert to the first worker so that its status has been
// updated.
@ -160,7 +164,7 @@ func TestSessionConnectionCleanupJobNewJobErr(t *testing.T) {
ctx,
errors.WithCode(errors.InvalidParameter),
errors.WithOp(op),
errors.WithMsg(fmt.Sprintf("invalid gracePeriod, must be greater than %d", session.DeadWorkerConnCloseMinGrace)),
errors.WithMsg(fmt.Sprintf("invalid gracePeriod, must be greater than %s", session.DeadWorkerConnCloseMinGrace)),
))
require.Nil(job)
}

@ -21,22 +21,24 @@ type Option func(*options)
// options = how options are represented
type options struct {
withLimit int
withOrderByCreateTime db.OrderBy
withScopeIds []string
withUserId string
withExpirationTime *timestamp.Timestamp
withTestTofu []byte
withListingConvert bool
withSessionIds []string
withServerId string
withDbOpts []db.Option
withWorkerStateDelay time.Duration
withLimit int
withOrderByCreateTime db.OrderBy
withScopeIds []string
withUserId string
withExpirationTime *timestamp.Timestamp
withTestTofu []byte
withListingConvert bool
withSessionIds []string
withServerId string
withDbOpts []db.Option
withWorkerStateDelay time.Duration
withDeadWorkerConnCloseMinGrace time.Duration
}
func getDefaultOptions() options {
return options{
withWorkerStateDelay: 10 * time.Second,
withWorkerStateDelay: 10 * time.Second,
withDeadWorkerConnCloseMinGrace: DeadWorkerConnCloseMinGrace,
}
}
@ -114,10 +116,19 @@ func WithDbOpts(opts ...db.Option) Option {
}
}
// WithWorkerStateDelay is used byt queries to account for a delay in state
// WithWorkerStateDelay is used by queries to account for a delay in state
// propagation between worker and controller.
func WithWorkerStateDelay(d time.Duration) Option {
return func(o *options) {
o.withWorkerStateDelay = d
}
}
// WithDeadWorkerConnCloseMinGrace is used to set the minimum allowable setting
// for the CloseConnectionsForDeadWorkers method. This defaults to the default
// server liveness setting.
func WithDeadWorkerConnCloseMinGrace(d time.Duration) Option {
return func(o *options) {
o.withDeadWorkerConnCloseMinGrace = d
}
}

@ -310,7 +310,7 @@ where
dead_servers (server_id, last_update_time) as (
select private_id, update_time
from server
where update_time < wt_sub_seconds_from_now(?)
where update_time < wt_sub_seconds_from_now(@grace_period_seconds)
),
closed_connections (connection_id, server_id) as (
update session_connection

@ -16,6 +16,11 @@ import (
"google.golang.org/grpc/status"
)
// DeadWorkerConnCloseMinGrace is the minimum allowable setting for
// the CloseConnectionsForDeadWorkers method. This is synced with
// the default server liveness setting.
var DeadWorkerConnCloseMinGrace = servers.DefaultLiveness
// ConnectionRepository is the session connection database repository.
type ConnectionRepository struct {
reader db.Reader
@ -28,6 +33,11 @@ type ConnectionRepository struct {
// workerStateDelay is used by queries to account for a delay in state propagation between
// worker and controller
workerStateDelay time.Duration
// deadWorkerConnCloseMinGrace is the minimum allowable setting for
// the CloseConnectionsForDeadWorkers method. This defaults to
// the default server liveness setting.
deadWorkerConnCloseMinGrace time.Duration
}
// NewConnectionRepository creates a new session Connection Repository. Supports the options: WithLimit
@ -48,12 +58,14 @@ func NewConnectionRepository(ctx context.Context, r db.Reader, w db.Writer, kms
// zero signals the boundary defaults should be used.
opts.withLimit = db.DefaultLimit
}
return &ConnectionRepository{
reader: r,
writer: w,
kms: kms,
defaultLimit: opts.withLimit,
workerStateDelay: opts.withWorkerStateDelay,
reader: r,
writer: w,
kms: kms,
defaultLimit: opts.withLimit,
workerStateDelay: opts.withWorkerStateDelay,
deadWorkerConnCloseMinGrace: opts.withDeadWorkerConnCloseMinGrace,
}, nil
}
@ -134,11 +146,6 @@ func (r *ConnectionRepository) AuthorizeConnection(ctx context.Context, sessionI
return &connection, connectionStates, nil
}
// deadWorkerConnCloseMinGrace is the minimum allowable setting for
// the CloseConnectionsForDeadWorkers method. This is synced with
// the default server liveness setting.
var DeadWorkerConnCloseMinGrace = int(servers.DefaultLiveness.Seconds())
// LookupConnection will look up a connection in the repository and return the connection
// with its states. If the connection is not found, it will return nil, nil, nil.
// No options are currently supported.
@ -362,20 +369,23 @@ type CloseConnectionsForDeadWorkersResult struct {
// sending status updates to the controller(s).
//
// The only input to the method is the grace period, in seconds.
func (r *ConnectionRepository) CloseConnectionsForDeadWorkers(ctx context.Context, gracePeriod int) ([]CloseConnectionsForDeadWorkersResult, error) {
func (r *ConnectionRepository) CloseConnectionsForDeadWorkers(ctx context.Context, gracePeriod time.Duration) ([]CloseConnectionsForDeadWorkersResult, error) {
const op = "session.(ConnectionRepository).CloseConnectionsForDeadWorkers"
if gracePeriod < DeadWorkerConnCloseMinGrace {
if gracePeriod < r.deadWorkerConnCloseMinGrace {
return nil, errors.New(ctx,
errors.InvalidParameter, op, fmt.Sprintf("gracePeriod must be at least %d seconds", DeadWorkerConnCloseMinGrace))
errors.InvalidParameter, op, fmt.Sprintf("gracePeriod must be at least %s", r.deadWorkerConnCloseMinGrace))
}
args := []interface{}{
sql.Named("grace_period_seconds", gracePeriod.Seconds()),
}
results := make([]CloseConnectionsForDeadWorkersResult, 0)
_, err := r.writer.DoTx(
ctx,
db.StdRetryCnt,
db.ExpBackoff{},
func(reader db.Reader, w db.Writer) error {
rows, err := w.Query(ctx, closeConnectionsForDeadServersCte, []interface{}{gracePeriod})
rows, err := w.Query(ctx, closeConnectionsForDeadServersCte, args)
if err != nil {
return errors.Wrap(ctx, err, op)
}

@ -446,7 +446,8 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) {
iamRepo := iam.TestRepo(t, conn, wrapper)
kms := kms.TestKms(t, conn, wrapper)
repo, err := NewRepository(rw, rw, kms)
connRepo, err := NewConnectionRepository(ctx, rw, rw, kms)
deadWorkerConnCloseMinGrace := 1 * time.Second
connRepo, err := NewConnectionRepository(ctx, rw, rw, kms, WithDeadWorkerConnCloseMinGrace(deadWorkerConnCloseMinGrace))
require.NoError(err)
serversRepo, err := servers.NewRepository(rw, rw, kms)
require.NoError(err)
@ -548,10 +549,6 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) {
}
}
// There is a 15 second delay to account for time for the connections to
// transition
time.Sleep(15 * time.Second)
// updateServer is a helper for updating the update time for our
// servers. The controller is read back so that we can reference
// the most up-to-date fields.
@ -616,11 +613,11 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) {
// Now try some scenarios.
{
// First, test the error/validation case.
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, 0)
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, -1)
require.Equal(err, errors.E(ctx,
errors.WithCode(errors.InvalidParameter),
errors.WithOp("session.(ConnectionRepository).CloseConnectionsForDeadWorkers"),
errors.WithMsg(fmt.Sprintf("gracePeriod must be at least %d seconds", DeadWorkerConnCloseMinGrace)),
errors.WithMsg(fmt.Sprintf("gracePeriod must be at least %s", deadWorkerConnCloseMinGrace)),
))
require.Nil(result)
}
@ -632,7 +629,7 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) {
worker3 = updateServer(t, worker3)
updateServer(t, worker4) // no re-assignment here because we never reference the server again
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, DeadWorkerConnCloseMinGrace)
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, deadWorkerConnCloseMinGrace)
require.NoError(err)
require.Empty(result)
// Expect appropriate split connection state on worker1
@ -647,12 +644,12 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) {
// Now try a zero case - similar to the basis, but only in that no results
// are expected to be returned for workers with no connections, even if
// they are dead. Here, the server with no connections is worker #4.
time.Sleep(time.Second * time.Duration(DeadWorkerConnCloseMinGrace))
time.Sleep(deadWorkerConnCloseMinGrace)
worker1 = updateServer(t, worker1)
worker2 = updateServer(t, worker2)
worker3 = updateServer(t, worker3)
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, DeadWorkerConnCloseMinGrace)
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, deadWorkerConnCloseMinGrace)
require.NoError(err)
require.Empty(result)
// Expect appropriate split connection state on worker1
@ -666,11 +663,11 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) {
{
// The first induction is letting the first worker "die" by not updating it
// too. All of its authorized and connected connections should be dead.
time.Sleep(time.Second * time.Duration(DeadWorkerConnCloseMinGrace))
time.Sleep(deadWorkerConnCloseMinGrace)
worker2 = updateServer(t, worker2)
worker3 = updateServer(t, worker3)
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, DeadWorkerConnCloseMinGrace)
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, deadWorkerConnCloseMinGrace)
require.NoError(err)
// Assert that we have one result with the appropriate ID and
// number of connections closed. Due to how things are
@ -693,9 +690,9 @@ func TestRepository_CloseConnectionsForDeadWorkers(t *testing.T) {
// The final case is having the other two workers die. After
// this, we should have all connections closed with the
// appropriate message from the next two servers acted on.
time.Sleep(time.Second * time.Duration(DeadWorkerConnCloseMinGrace))
time.Sleep(deadWorkerConnCloseMinGrace)
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, DeadWorkerConnCloseMinGrace)
result, err := connRepo.CloseConnectionsForDeadWorkers(ctx, deadWorkerConnCloseMinGrace)
require.NoError(err)
// Assert that we have one result with the appropriate ID and number of connections closed.
require.Equal([]CloseConnectionsForDeadWorkersResult{

Loading…
Cancel
Save