address op state review feedback (#2453)

* address op state review feedback
pull/2461/head
Irena Rindos 3 years ago committed by GitHub
parent c73a13aa3e
commit 821d8317dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -681,7 +681,7 @@ func (s Service) AuthorizeSession(ctx context.Context, req *pbs.AuthorizeSession
// worker IDs below is used to contain their IDs in the same order. This is
// used to fetch tags for filtering. But we avoid allocation unless we
// actually need it.
selectedWorkers, err := serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true))
selectedWorkers, err := serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true))
if err != nil {
return nil, err
}

@ -9,7 +9,7 @@ alter table server_worker
drop view server_worker_aggregate;
-- Updates view created in 34/04_views.up.sql
-- Updated in 52/01_worker_operational_state.up.sql
create view server_worker_aggregate as
with worker_config_tags(worker_id, source, tags) as (
select

@ -44,7 +44,7 @@ type options struct {
WithCreateControllerLedActivationToken bool
withReleaseVersion string
withOperationalState string
withExcludeShutdownWorkers bool
withActiveWorkers bool
}
func getDefaultOptions() options {
@ -215,9 +215,9 @@ func WithOperationalState(state string) Option {
}
}
// WithExcludeShutdownWorkers provides an optional to filter out workers in shutdown
func WithExcludeShutdownWorkers(exclude bool) Option {
// WithActiveWorkers provides an optional filter to only include active workers
func WithActiveWorkers(withActive bool) Option {
return func(o *options) {
o.withExcludeShutdownWorkers = exclude
o.withActiveWorkers = withActive
}
}

@ -198,8 +198,8 @@ func Test_GetOpts(t *testing.T) {
})
t.Run("WithExcludeShutdown", func(t *testing.T) {
opts := getDefaultOptions()
assert.Empty(t, opts.withExcludeShutdownWorkers)
opts = GetOpts(WithExcludeShutdownWorkers(true))
assert.Equal(t, true, opts.withExcludeShutdownWorkers)
assert.Empty(t, opts.withActiveWorkers)
opts = GetOpts(WithActiveWorkers(true))
assert.Equal(t, true, opts.withActiveWorkers)
})
}

@ -164,7 +164,7 @@ func lookupWorker(ctx context.Context, reader db.Reader, id string) (*Worker, er
// then the last status update time is ignored.
// If WithLimit < 0, then unlimited results are returned. If WithLimit == 0, then
// default limits are used for results.
// Also supports: WithWorkerType, WithExcludeShutdownWorkers
// Also supports: WithWorkerType, WithActiveWorkers
func (r *Repository) ListWorkers(ctx context.Context, scopeIds []string, opt ...Option) ([]*Worker, error) {
const op = "server.(Repository).ListWorkers"
switch {
@ -197,9 +197,9 @@ func (r *Repository) ListWorkers(ctx context.Context, scopeIds []string, opt ...
return nil, errors.New(ctx, errors.InvalidParameter, op, fmt.Sprintf("unknown worker type %v", opts.withWorkerType))
}
if opts.withExcludeShutdownWorkers {
where = append(where, "operational_state not in (?)")
whereArgs = append(whereArgs, ShutdownOperationalState.String())
if opts.withActiveWorkers {
where = append(where, "operational_state = ?")
whereArgs = append(whereArgs, ActiveOperationalState.String())
}
limit := r.defaultLimit

@ -290,14 +290,14 @@ func TestUpsertWorkerStatus(t *testing.T) {
assert.Equal(t, "new_address", worker.GetAddress())
// Expect this worker to be returned as it is active
workers, err := repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true))
workers, err := repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true))
require.NoError(t, err)
assert.Len(t, workers, 1)
// update again with a shutdown state
wStatus3 := server.NewWorker(scope.Global.String(),
server.WithAddress("new_address"), server.WithName("config_name1"),
server.WithOperationalState("shutdown"))
server.WithOperationalState("shutdown"), server.WithReleaseVersion("Boundary v0.11.0"))
worker, err = repo.UpsertWorkerStatus(ctx, wStatus3)
require.NoError(t, err)
assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime())
@ -306,7 +306,7 @@ func TestUpsertWorkerStatus(t *testing.T) {
assert.Equal(t, "shutdown", worker.GetOperationalState())
// Should no longer see this worker in listing if we exclude shutdown workers
workers, err = repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true))
workers, err = repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true))
require.NoError(t, err)
assert.Len(t, workers, 0)
})
@ -455,7 +455,8 @@ func TestUpsertWorkerStatus(t *testing.T) {
t.Run("add another status", func(t *testing.T) {
anotherStatus := server.NewWorker(scope.Global.String(),
server.WithName("another_test_worker"),
server.WithAddress("address"))
server.WithAddress("address"),
server.WithReleaseVersion("Boundary v0.11.0"))
_, err = repo.UpsertWorkerStatus(ctx, anotherStatus)
require.NoError(t, err)
@ -468,12 +469,13 @@ func TestUpsertWorkerStatus(t *testing.T) {
anotherStatus := server.NewWorker(scope.Global.String(),
server.WithName("another_test_worker"),
server.WithAddress("address"),
server.WithOperationalState("shutdown"))
server.WithOperationalState("shutdown"),
server.WithReleaseVersion("Boundary v0.11.0"))
_, err = repo.UpsertWorkerStatus(ctx, anotherStatus)
require.NoError(t, err)
// Filtering out shutdown workers will remove the shutdown KMS and this shutdown worker, resulting in 1
workers, err := repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true))
workers, err := repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true))
require.NoError(t, err)
assert.Len(t, workers, 1)
})
@ -643,7 +645,7 @@ func TestListWorkers(t *testing.T) {
}
}
func TestListWorkers_WithExcludeShutdown(t *testing.T) {
func TestListWorkers_WithActiveWorkers(t *testing.T) {
t.Parallel()
require := require.New(t)
conn, _ := db.TestSetup(t, "postgres")
@ -662,41 +664,106 @@ func TestListWorkers_WithExcludeShutdown(t *testing.T) {
require.NoError(err)
require.Len(result, 3)
// Push an upsert to the first worker so that its status has been
// updated.
_, err = serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker1.GetName()),
server.WithAddress(worker1.GetAddress()),
server.WithOperationalState(server.ShutdownOperationalState.String()),
server.WithPublicId(worker1.GetPublicId())))
require.NoError(err)
result, err = serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true))
require.NoError(err)
require.Len(result, 2)
_, err = serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker2.GetName()),
server.WithAddress(worker2.GetAddress()),
server.WithOperationalState(server.ShutdownOperationalState.String())),
server.WithPublicId(worker2.GetPublicId()))
require.NoError(err)
result, err = serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true))
require.NoError(err)
require.Len(result, 1)
_, err = serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker3.GetName()),
server.WithAddress(worker3.GetAddress()),
server.WithOperationalState(server.ShutdownOperationalState.String())),
server.WithPublicId(worker3.GetPublicId()))
require.NoError(err)
result, err = serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true))
require.NoError(err)
require.Len(result, 0)
tests := []struct {
name string
upsertFn func() (*server.Worker, error)
wantCnt int
wantState string
}{
{
name: "upsert-worker1-to-shutdown",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker1.GetName()),
server.WithAddress(worker1.GetAddress()),
server.WithOperationalState(server.ShutdownOperationalState.String()),
server.WithReleaseVersion("Boundary v.0.11"),
server.WithPublicId(worker1.GetPublicId())))
},
wantCnt: 2,
wantState: server.ShutdownOperationalState.String(),
},
{
name: "upsert-worker2-to-shutdown",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker2.GetName()),
server.WithAddress(worker2.GetAddress()),
server.WithOperationalState(server.ShutdownOperationalState.String()),
server.WithReleaseVersion("Boundary v.0.11"),
server.WithPublicId(worker2.GetPublicId())))
},
wantCnt: 1,
wantState: server.ShutdownOperationalState.String(),
},
{
name: "upsert-worker3-to-shutdown",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker3.GetName()),
server.WithAddress(worker3.GetAddress()),
server.WithOperationalState(server.ShutdownOperationalState.String()),
server.WithReleaseVersion("Boundary v.0.11"),
server.WithPublicId(worker3.GetPublicId())))
},
wantCnt: 0,
wantState: server.ShutdownOperationalState.String(),
},
{ // Upsert without a release version or state and expect to get a hit- test backwards compatibility
// Pre 0.11 workers will default to Active
name: "upsert-no-release-version-no-state",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker3.GetName()),
server.WithAddress(worker3.GetAddress())),
server.WithPublicId(worker3.GetPublicId()))
},
wantCnt: 1,
wantState: server.ActiveOperationalState.String(),
},
{ // Upsert with active status and no version and expect to get a hit- test backwards compatibility
name: "upsert-no-release-version-active-state",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker3.GetName()),
server.WithAddress(worker3.GetAddress()),
server.WithOperationalState(server.ActiveOperationalState.String())),
server.WithPublicId(worker3.GetPublicId()))
},
wantCnt: 1,
wantState: server.ActiveOperationalState.String(),
},
{ // Upsert with unknown status and do not expect to get a hit- test worker create before status
name: "upsert-unknown-status",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker3.GetName()),
server.WithAddress(worker3.GetAddress()),
server.WithOperationalState(server.UnknownOperationalState.String())),
server.WithPublicId(worker3.GetPublicId()))
},
wantCnt: 0,
wantState: server.UnknownOperationalState.String(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
worker, err := tt.upsertFn()
require.NoError(err)
got, err := serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true))
require.NoError(err)
assert.Len(t, got, tt.wantCnt)
if len(tt.wantState) > 0 {
assert.Equal(t, tt.wantState, worker.OperationalState)
}
})
}
}
func TestListWorkers_WithLiveness(t *testing.T) {

Loading…
Cancel
Save