diff --git a/internal/daemon/controller/handlers/targets/target_service.go b/internal/daemon/controller/handlers/targets/target_service.go index 979c0660af..0829c5e99c 100644 --- a/internal/daemon/controller/handlers/targets/target_service.go +++ b/internal/daemon/controller/handlers/targets/target_service.go @@ -681,7 +681,7 @@ func (s Service) AuthorizeSession(ctx context.Context, req *pbs.AuthorizeSession // worker IDs below is used to contain their IDs in the same order. This is // used to fetch tags for filtering. But we avoid allocation unless we // actually need it. - selectedWorkers, err := serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true)) + selectedWorkers, err := serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true)) if err != nil { return nil, err } diff --git a/internal/db/schema/migrations/oss/postgres/51/01_server_worker_release_version.up.sql b/internal/db/schema/migrations/oss/postgres/51/01_server_worker_release_version.up.sql index c40664df1e..75bc781d58 100644 --- a/internal/db/schema/migrations/oss/postgres/51/01_server_worker_release_version.up.sql +++ b/internal/db/schema/migrations/oss/postgres/51/01_server_worker_release_version.up.sql @@ -9,7 +9,7 @@ alter table server_worker drop view server_worker_aggregate; --- Updates view created in 34/04_views.up.sql +-- Updated in 52/01_worker_operational_state.up.sql create view server_worker_aggregate as with worker_config_tags(worker_id, source, tags) as ( select diff --git a/internal/server/options.go b/internal/server/options.go index d44ef7fe85..cd9ac6daf5 100644 --- a/internal/server/options.go +++ b/internal/server/options.go @@ -44,7 +44,7 @@ type options struct { WithCreateControllerLedActivationToken bool withReleaseVersion string withOperationalState string - withExcludeShutdownWorkers bool + withActiveWorkers bool } func getDefaultOptions() options { @@ -215,9 +215,9 @@ func WithOperationalState(state string) Option { } } -// WithExcludeShutdownWorkers provides an optional to filter out workers in shutdown -func WithExcludeShutdownWorkers(exclude bool) Option { +// WithActiveWorkers provides an optional filter to only include active workers +func WithActiveWorkers(withActive bool) Option { return func(o *options) { - o.withExcludeShutdownWorkers = exclude + o.withActiveWorkers = withActive } } diff --git a/internal/server/options_test.go b/internal/server/options_test.go index 8f7af23209..8c4a3703de 100644 --- a/internal/server/options_test.go +++ b/internal/server/options_test.go @@ -198,8 +198,8 @@ func Test_GetOpts(t *testing.T) { }) t.Run("WithExcludeShutdown", func(t *testing.T) { opts := getDefaultOptions() - assert.Empty(t, opts.withExcludeShutdownWorkers) - opts = GetOpts(WithExcludeShutdownWorkers(true)) - assert.Equal(t, true, opts.withExcludeShutdownWorkers) + assert.Empty(t, opts.withActiveWorkers) + opts = GetOpts(WithActiveWorkers(true)) + assert.Equal(t, true, opts.withActiveWorkers) }) } diff --git a/internal/server/repository_worker.go b/internal/server/repository_worker.go index 500937fd05..2c73f728ff 100644 --- a/internal/server/repository_worker.go +++ b/internal/server/repository_worker.go @@ -164,7 +164,7 @@ func lookupWorker(ctx context.Context, reader db.Reader, id string) (*Worker, er // then the last status update time is ignored. // If WithLimit < 0, then unlimited results are returned. If WithLimit == 0, then // default limits are used for results. -// Also supports: WithWorkerType, WithExcludeShutdownWorkers +// Also supports: WithWorkerType, WithActiveWorkers func (r *Repository) ListWorkers(ctx context.Context, scopeIds []string, opt ...Option) ([]*Worker, error) { const op = "server.(Repository).ListWorkers" switch { @@ -197,9 +197,9 @@ func (r *Repository) ListWorkers(ctx context.Context, scopeIds []string, opt ... return nil, errors.New(ctx, errors.InvalidParameter, op, fmt.Sprintf("unknown worker type %v", opts.withWorkerType)) } - if opts.withExcludeShutdownWorkers { - where = append(where, "operational_state not in (?)") - whereArgs = append(whereArgs, ShutdownOperationalState.String()) + if opts.withActiveWorkers { + where = append(where, "operational_state = ?") + whereArgs = append(whereArgs, ActiveOperationalState.String()) } limit := r.defaultLimit diff --git a/internal/server/repository_worker_test.go b/internal/server/repository_worker_test.go index d9de5447e4..f68e7b9f1c 100644 --- a/internal/server/repository_worker_test.go +++ b/internal/server/repository_worker_test.go @@ -290,14 +290,14 @@ func TestUpsertWorkerStatus(t *testing.T) { assert.Equal(t, "new_address", worker.GetAddress()) // Expect this worker to be returned as it is active - workers, err := repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true)) + workers, err := repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true)) require.NoError(t, err) assert.Len(t, workers, 1) // update again with a shutdown state wStatus3 := server.NewWorker(scope.Global.String(), server.WithAddress("new_address"), server.WithName("config_name1"), - server.WithOperationalState("shutdown")) + server.WithOperationalState("shutdown"), server.WithReleaseVersion("Boundary v0.11.0")) worker, err = repo.UpsertWorkerStatus(ctx, wStatus3) require.NoError(t, err) assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime()) @@ -306,7 +306,7 @@ func TestUpsertWorkerStatus(t *testing.T) { assert.Equal(t, "shutdown", worker.GetOperationalState()) // Should no longer see this worker in listing if we exclude shutdown workers - workers, err = repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true)) + workers, err = repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true)) require.NoError(t, err) assert.Len(t, workers, 0) }) @@ -455,7 +455,8 @@ func TestUpsertWorkerStatus(t *testing.T) { t.Run("add another status", func(t *testing.T) { anotherStatus := server.NewWorker(scope.Global.String(), server.WithName("another_test_worker"), - server.WithAddress("address")) + server.WithAddress("address"), + server.WithReleaseVersion("Boundary v0.11.0")) _, err = repo.UpsertWorkerStatus(ctx, anotherStatus) require.NoError(t, err) @@ -468,12 +469,13 @@ func TestUpsertWorkerStatus(t *testing.T) { anotherStatus := server.NewWorker(scope.Global.String(), server.WithName("another_test_worker"), server.WithAddress("address"), - server.WithOperationalState("shutdown")) + server.WithOperationalState("shutdown"), + server.WithReleaseVersion("Boundary v0.11.0")) _, err = repo.UpsertWorkerStatus(ctx, anotherStatus) require.NoError(t, err) // Filtering out shutdown workers will remove the shutdown KMS and this shutdown worker, resulting in 1 - workers, err := repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true)) + workers, err := repo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true)) require.NoError(t, err) assert.Len(t, workers, 1) }) @@ -643,7 +645,7 @@ func TestListWorkers(t *testing.T) { } } -func TestListWorkers_WithExcludeShutdown(t *testing.T) { +func TestListWorkers_WithActiveWorkers(t *testing.T) { t.Parallel() require := require.New(t) conn, _ := db.TestSetup(t, "postgres") @@ -662,41 +664,106 @@ func TestListWorkers_WithExcludeShutdown(t *testing.T) { require.NoError(err) require.Len(result, 3) - // Push an upsert to the first worker so that its status has been - // updated. - _, err = serversRepo.UpsertWorkerStatus(ctx, - server.NewWorker(scope.Global.String(), - server.WithName(worker1.GetName()), - server.WithAddress(worker1.GetAddress()), - server.WithOperationalState(server.ShutdownOperationalState.String()), - server.WithPublicId(worker1.GetPublicId()))) - require.NoError(err) - - result, err = serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true)) - require.NoError(err) - require.Len(result, 2) - - _, err = serversRepo.UpsertWorkerStatus(ctx, - server.NewWorker(scope.Global.String(), - server.WithName(worker2.GetName()), - server.WithAddress(worker2.GetAddress()), - server.WithOperationalState(server.ShutdownOperationalState.String())), - server.WithPublicId(worker2.GetPublicId())) - require.NoError(err) - result, err = serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true)) - require.NoError(err) - require.Len(result, 1) - - _, err = serversRepo.UpsertWorkerStatus(ctx, - server.NewWorker(scope.Global.String(), - server.WithName(worker3.GetName()), - server.WithAddress(worker3.GetAddress()), - server.WithOperationalState(server.ShutdownOperationalState.String())), - server.WithPublicId(worker3.GetPublicId())) - require.NoError(err) - result, err = serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithExcludeShutdownWorkers(true)) - require.NoError(err) - require.Len(result, 0) + tests := []struct { + name string + upsertFn func() (*server.Worker, error) + wantCnt int + wantState string + }{ + { + name: "upsert-worker1-to-shutdown", + upsertFn: func() (*server.Worker, error) { + return serversRepo.UpsertWorkerStatus(ctx, + server.NewWorker(scope.Global.String(), + server.WithName(worker1.GetName()), + server.WithAddress(worker1.GetAddress()), + server.WithOperationalState(server.ShutdownOperationalState.String()), + server.WithReleaseVersion("Boundary v.0.11"), + server.WithPublicId(worker1.GetPublicId()))) + }, + wantCnt: 2, + wantState: server.ShutdownOperationalState.String(), + }, + { + name: "upsert-worker2-to-shutdown", + upsertFn: func() (*server.Worker, error) { + return serversRepo.UpsertWorkerStatus(ctx, + server.NewWorker(scope.Global.String(), + server.WithName(worker2.GetName()), + server.WithAddress(worker2.GetAddress()), + server.WithOperationalState(server.ShutdownOperationalState.String()), + server.WithReleaseVersion("Boundary v.0.11"), + server.WithPublicId(worker2.GetPublicId()))) + }, + wantCnt: 1, + wantState: server.ShutdownOperationalState.String(), + }, + { + name: "upsert-worker3-to-shutdown", + upsertFn: func() (*server.Worker, error) { + return serversRepo.UpsertWorkerStatus(ctx, + server.NewWorker(scope.Global.String(), + server.WithName(worker3.GetName()), + server.WithAddress(worker3.GetAddress()), + server.WithOperationalState(server.ShutdownOperationalState.String()), + server.WithReleaseVersion("Boundary v.0.11"), + server.WithPublicId(worker3.GetPublicId()))) + }, + wantCnt: 0, + wantState: server.ShutdownOperationalState.String(), + }, + { // Upsert without a release version or state and expect to get a hit- test backwards compatibility + // Pre 0.11 workers will default to Active + name: "upsert-no-release-version-no-state", + upsertFn: func() (*server.Worker, error) { + return serversRepo.UpsertWorkerStatus(ctx, + server.NewWorker(scope.Global.String(), + server.WithName(worker3.GetName()), + server.WithAddress(worker3.GetAddress())), + server.WithPublicId(worker3.GetPublicId())) + }, + wantCnt: 1, + wantState: server.ActiveOperationalState.String(), + }, + { // Upsert with active status and no version and expect to get a hit- test backwards compatibility + name: "upsert-no-release-version-active-state", + upsertFn: func() (*server.Worker, error) { + return serversRepo.UpsertWorkerStatus(ctx, + server.NewWorker(scope.Global.String(), + server.WithName(worker3.GetName()), + server.WithAddress(worker3.GetAddress()), + server.WithOperationalState(server.ActiveOperationalState.String())), + server.WithPublicId(worker3.GetPublicId())) + }, + wantCnt: 1, + wantState: server.ActiveOperationalState.String(), + }, + { // Upsert with unknown status and do not expect to get a hit- test worker create before status + name: "upsert-unknown-status", + upsertFn: func() (*server.Worker, error) { + return serversRepo.UpsertWorkerStatus(ctx, + server.NewWorker(scope.Global.String(), + server.WithName(worker3.GetName()), + server.WithAddress(worker3.GetAddress()), + server.WithOperationalState(server.UnknownOperationalState.String())), + server.WithPublicId(worker3.GetPublicId())) + }, + wantCnt: 0, + wantState: server.UnknownOperationalState.String(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + worker, err := tt.upsertFn() + require.NoError(err) + got, err := serversRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithActiveWorkers(true)) + require.NoError(err) + assert.Len(t, got, tt.wantCnt) + if len(tt.wantState) > 0 { + assert.Equal(t, tt.wantState, worker.OperationalState) + } + }) + } } func TestListWorkers_WithLiveness(t *testing.T) {