From b64eadedbf36e6a9d1b548c9ee585ce1b00d1acf Mon Sep 17 00:00:00 2001 From: Jeff Mitchell Date: Thu, 21 Sep 2023 13:25:53 -0500 Subject: [PATCH] Fix flaky TestServer_ReloadInitialUpstreams (#3758) With the KMS changes this should never have worked at all (because the different databases means the credentials conveyed after the KMS-based authentication would never be valid). However, it was sometimes passing because of a race condition: the worker could be found in the database before the new credentials were returned to the worker, causing the worker to start at the KMS step with the second controller. This changes the test to use the same database for two controllers, but to only ever have one running at a time, and uses a sequence of two changed (or unchanged, depending) status updates from the worker to ensure that the reload was successful. --- .../worker_initial_upstreams_reload_test.go | 103 +++++++++++++----- internal/daemon/controller/listeners.go | 4 +- internal/daemon/controller/testing.go | 1 + 3 files changed, 77 insertions(+), 31 deletions(-) diff --git a/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go b/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go index 95ac92ff00..24b7b36fb1 100644 --- a/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go +++ b/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go @@ -11,8 +11,9 @@ import ( "testing" "time" + "github.com/hashicorp/boundary/internal/daemon/controller" "github.com/hashicorp/boundary/internal/server" - "github.com/hashicorp/boundary/testing/controller" + tc "github.com/hashicorp/boundary/testing/controller" "github.com/mitchellh/cli" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -51,25 +52,18 @@ func TestServer_ReloadInitialUpstreams(t *testing.T) { recoveryWrapper, _ := wrapperWithKey(t) workerAuthWrapper, key := wrapperWithKey(t) - // Create two controllers, each with their own database. In practice it - // would be odd to have separate databases, but it makes it easy for the - // test to assert that the worker has connected to the second controller - // after a reload signal is sent. - testController := controller.NewTestController( + testController := tc.NewTestController( t, - controller.WithWorkerAuthKms(workerAuthWrapper), - controller.WithRootKms(rootWrapper), - controller.WithRecoveryKms(recoveryWrapper), + tc.WithWorkerAuthKms(workerAuthWrapper), + tc.WithRootKms(rootWrapper), + tc.WithRecoveryKms(recoveryWrapper), + tc.DisableDatabaseDestruction(), ) - defer testController.Shutdown() - testController2 := controller.NewTestController( - t, - controller.WithWorkerAuthKms(workerAuthWrapper), - controller.WithRootKms(rootWrapper), - controller.WithRecoveryKms(recoveryWrapper), - ) - defer testController2.Shutdown() - require.NotEqual(testController.Config().DatabaseUrl, testController2.Config().DatabaseUrl) + t.Cleanup(testController.Shutdown) + + testController2 := testController.AddClusterControllerMember(t, &controller.TestControllerOpts{ + DisableAutoStart: true, + }) wg := &sync.WaitGroup{} @@ -91,25 +85,69 @@ func TestServer_ReloadInitialUpstreams(t *testing.T) { t.Fatalf("timeout waiting for worker start") } - // Wait until the worker has connected to the first controller + // Wait until the worker has connected to the first controller as seen via + // two status updates timeout := time.NewTimer(15 * time.Second) poll := time.NewTimer(0) var w *server.Worker + var lastStatusTime time.Time + serversRepo, err := testController.Controller().ServersRepoFn() + require.NoError(err) pollFirstController: for { select { case <-timeout.C: t.Fatalf("timeout wait for worker to connect to first controller") case <-poll.C: - serversRepo, err := testController.Controller().ServersRepoFn() - require.NoError(err) w, err = serversRepo.LookupWorkerByName(testController.Context(), "test") require.NoError(err) if w != nil { - timeout.Stop() - break pollFirstController + switch { + case lastStatusTime.IsZero(): + lastStatusTime = w.GetLastStatusTime().AsTime().Round(time.Second) + default: + if !lastStatusTime.Equal(w.GetLastStatusTime().AsTime().Round(time.Second)) { + timeout.Stop() + break pollFirstController + } + } + } + poll.Reset(time.Second) + } + } + + // Shut down first controller, start second, then ensure we are no longer + // seeing status updates. + testController.Shutdown() + require.NoError(testController2.Controller().Start()) + t.Cleanup(testController2.Shutdown) + + lastStatusTime = time.Time{} + timeout.Reset(15 * time.Second) + poll.Reset(0) + serversRepo, err = testController2.Controller().ServersRepoFn() + require.NoError(err) +pollForNoStatus: + for { + select { + case <-timeout.C: + // Great, didn't see it + poll.Stop() + break pollForNoStatus + case <-poll.C: + w, err = serversRepo.LookupWorkerByName(testController2.Context(), "test") + require.NoError(err) + if w != nil { + switch { + case lastStatusTime.IsZero(): + lastStatusTime = w.GetLastStatusTime().AsTime().Round(time.Second) + default: + if !lastStatusTime.Equal(w.GetLastStatusTime().AsTime().Round(time.Second)) { + t.Fatal("found updated status times when not expected") + } + } } - poll.Reset(1 * time.Millisecond) + poll.Reset(time.Second) } } @@ -124,21 +162,28 @@ pollFirstController: // Wait until the worker connects to the second controller timeout.Reset(15 * time.Second) - poll.Reset(10 * time.Millisecond) + poll.Reset(time.Second) + lastStatusTime = time.Time{} pollSecondController: for { select { case <-timeout.C: t.Fatalf("timeout wait for worker to connect to second controller") case <-poll.C: - serversRepo, err := testController2.Controller().ServersRepoFn() - require.NoError(err) w, err = serversRepo.LookupWorkerByName(testController2.Context(), "test") require.NoError(err) if w != nil { - break pollSecondController + switch { + case lastStatusTime.IsZero(): + lastStatusTime = w.GetLastStatusTime().AsTime().Round(time.Second) + default: + if !lastStatusTime.Round(time.Second).Equal(w.GetLastStatusTime().AsTime().Round(time.Second)) { + timeout.Stop() + break pollSecondController + } + } } - poll.Reset(1 * time.Millisecond) + poll.Reset(time.Second) } } diff --git a/internal/daemon/controller/listeners.go b/internal/daemon/controller/listeners.go index b5d6966657..b9e1b5137a 100644 --- a/internal/daemon/controller/listeners.go +++ b/internal/daemon/controller/listeners.go @@ -276,13 +276,13 @@ func (c *Controller) configureForCluster(ln *base.ServerListener) (func(), error } go func() { err := splitListener.Start() - if err != nil { + if err != nil && !errors.Is(err, net.ErrClosed) { event.WriteError(c.baseContext, op, err, event.WithInfoMsg("splitListener.Start() error")) } }() go func() { err := ln.GrpcServer.Serve(metric.InstrumentClusterTrackingListener(multiplexingAuthedListener, grpcListenerPurpose)) - if err != nil { + if err != nil && !errors.Is(err, net.ErrClosed) { event.WriteError(c.baseContext, op, err, event.WithInfoMsg("multiplexingAuthedListener error")) } }() diff --git a/internal/daemon/controller/testing.go b/internal/daemon/controller/testing.go index bdbd348e72..0f5957111d 100644 --- a/internal/daemon/controller/testing.go +++ b/internal/daemon/controller/testing.go @@ -810,6 +810,7 @@ func (tc *TestController) AddClusterControllerMember(t testing.TB, opts *TestCon DefaultPassword: tc.b.DevPassword, DisableKmsKeyCreation: true, DisableAuthMethodCreation: true, + DisableAutoStart: opts.DisableAutoStart, PublicClusterAddr: opts.PublicClusterAddr, WorkerStatusGracePeriodDuration: opts.WorkerStatusGracePeriodDuration, LivenessTimeToStaleDuration: opts.LivenessTimeToStaleDuration,