From fe09f61a85b2c4f17587423b694628a9eccae23c Mon Sep 17 00:00:00 2001 From: "Sorawis Nilparuk (Bo)" Date: Mon, 5 Jan 2026 10:40:28 -0800 Subject: [PATCH] chore(test) fix flaky tests (#6315) * internal/cmd/commands/server/listener_reload_test.go remove t.Parallel() comment * internal/cmd/commands/server/worker_initial_upstreams_reload_test.go remove t.Parallel * internal/cmd/commands/server/worker_tags_reload_test.go wait by polling worker instead of sleep * internal/daemon/worker/routing_info.go make deep copy in LastRoutingInfoSuccess to avoid race condition * internal/tests/cluster/parallel/orphaned_connections_test.go raise grace period time to allow test to wait for longer * improve logging in sequential tests * move ports around and use t.Cleanup to send shutdown signal to test server * fix one typo * do final assert in t.Cleanup instead of defer * undo t.Cleanup change * add another signal to force shutdown running servers * add exit early and increase server start timeout * tighten workerRPC settings on session tests * tighten more workerRPC timing * make gen * add comment explaining deep copying lastRoutingInfoSuccess --- .../server/controller_db_swap_test.go | 33 +++++++++---- .../controller_ratelimit_reload_test.go | 49 ++++++++++++++----- .../commands/server/listener_reload_test.go | 16 +++--- .../worker_initial_upstreams_reload_test.go | 2 - .../server/worker_tags_reload_test.go | 40 +++++++++------ internal/credential/vault/repository_test.go | 6 ++- internal/daemon/worker/routing_info.go | 12 +++-- .../cluster/parallel/ipv6_listener_test.go | 2 +- .../parallel/multi_controller_worker_test.go | 6 +-- .../parallel/orphaned_connections_test.go | 8 +-- .../cluster/parallel/unix_listener_test.go | 2 +- .../sequential/session_cleanup_test.go | 4 +- .../sequential/worker_bytesupdown_test.go | 11 +++-- .../cluster/sequential/worker_proxy_test.go | 11 +++-- .../cluster/sequential/worker_tagging_test.go | 4 +- 15 files changed, 130 insertions(+), 76 deletions(-) diff --git a/internal/cmd/commands/server/controller_db_swap_test.go b/internal/cmd/commands/server/controller_db_swap_test.go index f6ee0f49e1..bcb052a403 100644 --- a/internal/cmd/commands/server/controller_db_swap_test.go +++ b/internal/cmd/commands/server/controller_db_swap_test.go @@ -59,12 +59,12 @@ kms "aead" { listener "tcp" { purpose = "api" - address = "127.0.0.1:9500" + address = "127.0.0.1:9600" tls_disable = true } listener "tcp" { - address = "127.0.0.1:9501" + address = "127.0.0.1:9601" purpose = "cluster" } ` @@ -108,22 +108,30 @@ func TestReloadControllerDatabase(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) + earlyExitChan := make(chan struct{}) go func() { defer wg.Done() - args := []string{"-config", td + "/config.hcl"} exitCode := cmd.Run(args) if exitCode != 0 { output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String() fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output) } + // send non-blocking message to a channel to signal that the server has exited + // this channel is used to avoid waiting for the full timeout in case of early exit + select { + case earlyExitChan <- struct{}{}: + default: + } }() // Wait until things are up and running (or timeout). select { case <-cmd.startedCh: - case <-time.After(15 * time.Second): - t.Fatal("timeout") + case <-earlyExitChan: + t.Fatal("server exited early") + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for server to start") } require.NotNil(t, cmd.schemaManager) @@ -158,7 +166,7 @@ func TestReloadControllerDatabase(t *testing.T) { select { case <-cmd.reloadedCh: case <-time.After(15 * time.Second): - t.Fatal("timeout") + t.Fatal("timeout waiting for server reload") } // Assert that the schema manager ptr and value changed @@ -243,6 +251,7 @@ func TestReloadControllerDatabase_InvalidNewDatabaseState(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) + earlyExitChan := make(chan struct{}) go func() { defer wg.Done() @@ -252,13 +261,19 @@ func TestReloadControllerDatabase_InvalidNewDatabaseState(t *testing.T) { output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String() fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output) } + select { + case earlyExitChan <- struct{}{}: + default: + } }() // Wait until things are up and running (or timeout). select { case <-cmd.startedCh: - case <-time.After(15 * time.Second): - t.Fatal("timeout") + case <-earlyExitChan: + t.Fatal("server exited early") + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for server to start") } require.NotNil(t, cmd.schemaManager) @@ -292,7 +307,7 @@ func TestReloadControllerDatabase_InvalidNewDatabaseState(t *testing.T) { select { case <-cmd.reloadedCh: case <-time.After(15 * time.Second): - t.Fatal("timeout") + t.Fatal("timeout waiting for server reload") } // Assert that the schema manager ptr and value did not change. diff --git a/internal/cmd/commands/server/controller_ratelimit_reload_test.go b/internal/cmd/commands/server/controller_ratelimit_reload_test.go index 2fce90741e..ef12930387 100644 --- a/internal/cmd/commands/server/controller_ratelimit_reload_test.go +++ b/internal/cmd/commands/server/controller_ratelimit_reload_test.go @@ -180,7 +180,7 @@ listener "tcp" { listener "tcp" { address = "127.0.0.1:9501" purpose = "cluster" -} +} ` ) @@ -202,22 +202,28 @@ func TestReloadControllerRateLimits(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) + earlyExitChan := make(chan struct{}) go func() { defer wg.Done() - args := []string{"-config", td + "/config.hcl"} exitCode := cmd.Run(args) if exitCode != 0 { output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String() fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output) } + select { + case earlyExitChan <- struct{}{}: + default: + } }() // Wait until things are up and running (or timeout). select { case <-cmd.startedCh: - case <-time.After(15 * time.Second): - t.Fatal("timeout") + case <-earlyExitChan: + t.Fatal("server exited early") + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for server to start") } // Change config so it is ready for reloading @@ -262,7 +268,7 @@ func TestReloadControllerRateLimits(t *testing.T) { select { case <-cmd.reloadedCh: case <-time.After(15 * time.Second): - t.Fatal("timeout") + t.Fatal("timeout waiting for reload signal") } // Make another request, the limit should have reset and the new limit @@ -301,6 +307,7 @@ func TestReloadControllerRateLimitsSameConfig(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) + earlyExitChan := make(chan struct{}) go func() { defer wg.Done() @@ -310,13 +317,19 @@ func TestReloadControllerRateLimitsSameConfig(t *testing.T) { output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String() fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output) } + select { + case earlyExitChan <- struct{}{}: + default: + } }() // Wait until things are up and running (or timeout). select { case <-cmd.startedCh: - case <-time.After(15 * time.Second): - t.Fatal("timeout") + case <-earlyExitChan: + t.Fatal("server exited early") + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for server to start") } c := http.Client{} @@ -395,6 +408,7 @@ func TestReloadControllerRateLimitsDisable(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) + earlyExitChan := make(chan struct{}) go func() { defer wg.Done() @@ -404,13 +418,19 @@ func TestReloadControllerRateLimitsDisable(t *testing.T) { output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String() fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output) } + select { + case earlyExitChan <- struct{}{}: + default: + } }() // Wait until things are up and running (or timeout). select { case <-cmd.startedCh: - case <-time.After(15 * time.Second): - t.Fatal("timeout") + case <-earlyExitChan: + t.Fatal("server exited early") + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for server to start") } // Change config so it is ready for reloading @@ -494,6 +514,7 @@ func TestReloadControllerRateLimitsEnable(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) + earlyExitChan := make(chan struct{}) go func() { defer wg.Done() @@ -503,13 +524,19 @@ func TestReloadControllerRateLimitsEnable(t *testing.T) { output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String() fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output) } + select { + case earlyExitChan <- struct{}{}: + default: + } }() // Wait until things are up and running (or timeout). select { case <-cmd.startedCh: - case <-time.After(15 * time.Second): - t.Fatal("timeout") + case <-earlyExitChan: + t.Fatal("server exited early") + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for server to start") } // Change config so it is ready for reloading diff --git a/internal/cmd/commands/server/listener_reload_test.go b/internal/cmd/commands/server/listener_reload_test.go index a5472f4501..30be784db3 100644 --- a/internal/cmd/commands/server/listener_reload_test.go +++ b/internal/cmd/commands/server/listener_reload_test.go @@ -64,7 +64,7 @@ kms "aead" { listener "tcp" { purpose = "api" - address = "127.0.0.1:9500" + address = "127.0.0.1:9700" tls_cert_file = "%s/bundle.pem" tls_key_file = "%s/bundle.pem" cors_enabled = true @@ -72,18 +72,17 @@ listener "tcp" { } listener "tcp" { - address = "127.0.0.1:9501" + address = "127.0.0.1:9701" purpose = "cluster" } listener "tcp" { - address = "127.0.0.1:9502" + address = "127.0.0.1:9702" purpose = "proxy" } ` func TestServer_ReloadListener(t *testing.T) { - // t.Parallel() require := require.New(t) wg := &sync.WaitGroup{} @@ -107,12 +106,11 @@ func TestServer_ReloadListener(t *testing.T) { cmd.WorkerAuthKms = nil cmd.RecoveryKms = nil - defer func() { + t.Cleanup(func() { if cmd.DevDatabaseCleanupFunc != nil { require.NoError(cmd.DevDatabaseCleanupFunc()) } - }() - + }) // Setup initial certs inBytes, err := os.ReadFile(wd + "bundle1.pem") require.NoError(err) @@ -135,9 +133,8 @@ func TestServer_ReloadListener(t *testing.T) { fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output) } }() - testCertificateSerial := func(serial string) { - conn, err := tls.Dial("tcp", "127.0.0.1:9500", &tls.Config{ + conn, err := tls.Dial("tcp", "127.0.0.1:9700", &tls.Config{ RootCAs: certPool, }) require.NoError(err) @@ -168,7 +165,6 @@ func TestServer_ReloadListener(t *testing.T) { } testCertificateSerial("193080739105342897219784862820114567438786419504") - cmd.ShutdownCh <- struct{}{} wg.Wait() } diff --git a/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go b/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go index eb7e0855c9..058d8d2741 100644 --- a/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go +++ b/internal/cmd/commands/server/worker_initial_upstreams_reload_test.go @@ -45,7 +45,6 @@ worker { ` func TestServer_ReloadInitialUpstreams(t *testing.T) { - t.Parallel() require := require.New(t) rootWrapper, _ := wrapperWithKey(t) @@ -186,7 +185,6 @@ pollSecondController: poll.Reset(time.Second) } } - cmd.ShutdownCh <- struct{}{} wg.Wait() } diff --git a/internal/cmd/commands/server/worker_tags_reload_test.go b/internal/cmd/commands/server/worker_tags_reload_test.go index def7f457fc..68315aaf02 100644 --- a/internal/cmd/commands/server/worker_tags_reload_test.go +++ b/internal/cmd/commands/server/worker_tags_reload_test.go @@ -76,7 +76,7 @@ func TestServer_ReloadWorkerTags(t *testing.T) { recoveryWrapper, _ := wrapperWithKey(t) workerAuthWrapper, key := wrapperWithKey(t) testController := controller.NewTestController(t, controller.WithWorkerAuthKms(workerAuthWrapper), controller.WithRootKms(rootWrapper), controller.WithRecoveryKms(recoveryWrapper)) - defer testController.Shutdown() + t.Cleanup(testController.Shutdown) wg := &sync.WaitGroup{} @@ -98,21 +98,32 @@ func TestServer_ReloadWorkerTags(t *testing.T) { t.Fatalf("timeout") } - fetchWorkerTags := func(name string, key string, values []string) { + waitForWorkerTags := func(name string, key string, values []string) { t.Helper() - serversRepo, err := testController.Controller().ServersRepoFn() - require.NoError(err) - w, err := server.TestLookupWorkerByName(testController.Context(), t, name, serversRepo) - require.NoError(err) - require.NotNil(w) - v, ok := w.CanonicalTags()[key] - require.True(ok) - require.ElementsMatch(values, v) + maxWait := time.NewTimer(time.Second * 15) + for { + select { + case <-maxWait.C: + t.Fatal("timed out waiting for worker tags") + case <-time.After(500 * time.Millisecond): + serversRepo, err := testController.Controller().ServersRepoFn() + require.NoError(err) + w, err := server.TestLookupWorkerByName(testController.Context(), t, name, serversRepo) + require.NoError(err) + require.NotNil(w) + v, ok := w.CanonicalTags()[key] + if !ok { + continue + } + require.True(ok) + require.ElementsMatch(values, v) + return + } + } } // Give time to populate up to the controller - time.Sleep(10 * time.Second) - fetchWorkerTags("test", "type", []string{"dev", "local"}) + waitForWorkerTags("test", "type", []string{"dev", "local"}) cmd.presetConfig.Store(fmt.Sprintf(workerBaseConfig+tag2Config, key, testController.ClusterAddrs()[0])) @@ -123,11 +134,8 @@ func TestServer_ReloadWorkerTags(t *testing.T) { t.Fatalf("timeout") } - time.Sleep(10 * time.Second) - fetchWorkerTags("test", "foo", []string{"bar", "baz"}) - + waitForWorkerTags("test", "foo", []string{"bar", "baz"}) cmd.ShutdownCh <- struct{}{} - wg.Wait() } diff --git a/internal/credential/vault/repository_test.go b/internal/credential/vault/repository_test.go index 0c025d5453..6ab32b2853 100644 --- a/internal/credential/vault/repository_test.go +++ b/internal/credential/vault/repository_test.go @@ -64,8 +64,10 @@ func TestRepository_New(t *testing.T) { w: rw, kms: kmsCache, scheduler: sche, - opts: []Option{WithLimit(5), - WithRandomReader(testReader)}, + opts: []Option{ + WithLimit(5), + WithRandomReader(testReader), + }, }, want: &Repository{ reader: rw, diff --git a/internal/daemon/worker/routing_info.go b/internal/daemon/worker/routing_info.go index 74e0469f6c..4b3ea3bd33 100644 --- a/internal/daemon/worker/routing_info.go +++ b/internal/daemon/worker/routing_info.go @@ -113,10 +113,12 @@ func (w *Worker) startRoutingInfoTicking(cancelCtx context.Context) { // RoutingInfo request. func (w *Worker) LastRoutingInfoSuccess() *LastRoutingInfo { s, ok := w.lastRoutingInfoSuccess.Load().(*LastRoutingInfo) - if !ok { + if !ok || s == nil { return nil } - return s + // make a deep copy to avoid race conditions from accessing underlying data + copied := *s + return &copied } func (w *Worker) sendWorkerRoutingInfo(cancelCtx context.Context) { @@ -178,7 +180,11 @@ func (w *Worker) sendWorkerRoutingInfo(cancelCtx context.Context) { // In the case that the control plane has gone down and come up with different IPs, // append initial upstreams/ cluster addr to the resolver to try if pastGrace, _, _ := w.isPastGrace(); pastGrace && w.GrpcClientConn.Load().GetState() == connectivity.TransientFailure { - lastRoutingInfo := w.lastRoutingInfoSuccess.Load().(*LastRoutingInfo) + routingInfo := w.lastRoutingInfoSuccess.Load().(*LastRoutingInfo) + // make a deep copy to avoid race conditions from accessing underlying data + // since routingInfo may be modified later in this function + copied := *routingInfo + lastRoutingInfo := &copied if lastRoutingInfo != nil && lastRoutingInfo.LastCalculatedUpstreams != nil { addrs := lastRoutingInfo.LastCalculatedUpstreams diff --git a/internal/tests/cluster/parallel/ipv6_listener_test.go b/internal/tests/cluster/parallel/ipv6_listener_test.go index 1af85c7fff..6687b0ff8f 100644 --- a/internal/tests/cluster/parallel/ipv6_listener_test.go +++ b/internal/tests/cluster/parallel/ipv6_listener_test.go @@ -60,7 +60,7 @@ func TestIPv6Listener(t *testing.T) { WorkerAuthKms: c1.Config().WorkerAuthKms, InitialUpstreams: append(c1.ClusterAddrs(), c2.ClusterAddrs()...), Logger: logger.Named("w1"), - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) wg.Add(2) diff --git a/internal/tests/cluster/parallel/multi_controller_worker_test.go b/internal/tests/cluster/parallel/multi_controller_worker_test.go index aeb47abcd6..0e7de7b1b5 100644 --- a/internal/tests/cluster/parallel/multi_controller_worker_test.go +++ b/internal/tests/cluster/parallel/multi_controller_worker_test.go @@ -53,7 +53,7 @@ func TestMultiControllerMultiWorkerConnections(t *testing.T) { WorkerAuthKms: c1.Config().WorkerAuthKms, InitialUpstreams: append(c1.ClusterAddrs(), c2.ClusterAddrs()...), Logger: logger.Named("w1"), - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) wg.Add(2) @@ -99,7 +99,7 @@ func TestMultiControllerMultiWorkerConnections(t *testing.T) { WorkerAuthKms: c1.Config().WorkerAuthKms, InitialUpstreams: c1.ClusterAddrs(), Logger: logger.Named("w3"), - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) wg.Add(2) @@ -157,7 +157,7 @@ func TestWorkerAppendInitialUpstreams(t *testing.T) { InitialUpstreams: initialUpstreams, Logger: logger.Named("w1"), SuccessfulControllerRPCGracePeriodDuration: 1 * time.Second, - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) // Wait for worker to send routing info diff --git a/internal/tests/cluster/parallel/orphaned_connections_test.go b/internal/tests/cluster/parallel/orphaned_connections_test.go index 674416ff07..38ff81301d 100644 --- a/internal/tests/cluster/parallel/orphaned_connections_test.go +++ b/internal/tests/cluster/parallel/orphaned_connections_test.go @@ -40,9 +40,9 @@ func TestSpeedyOrphanedConnectionCleanup(t *testing.T) { InitialResourcesSuffix: "1234567890", Logger: logger.Named("c1"), PublicClusterAddr: pl.Addr().String(), - WorkerRPCGracePeriod: 2 * time.Second, + WorkerRPCGracePeriod: 10 * time.Second, // Run the scheduler more often to speed up cleanup of orphaned connections - SchedulerRunJobInterval: time.Second, + SchedulerRunJobInterval: 500 * time.Millisecond, }) helper.ExpectWorkers(t, c) @@ -64,8 +64,8 @@ func TestSpeedyOrphanedConnectionCleanup(t *testing.T) { WorkerAuthKms: c.Config().WorkerAuthKms, InitialUpstreams: []string{proxy.ListenerAddr()}, Logger: logger.Named("w1"), - SuccessfulControllerRPCGracePeriodDuration: 2 * time.Second, - WorkerRPCInterval: time.Second, + SuccessfulControllerRPCGracePeriodDuration: 10 * time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) helper.ExpectWorkers(t, c, w) diff --git a/internal/tests/cluster/parallel/unix_listener_test.go b/internal/tests/cluster/parallel/unix_listener_test.go index a5d8f22f2f..3b38dced68 100644 --- a/internal/tests/cluster/parallel/unix_listener_test.go +++ b/internal/tests/cluster/parallel/unix_listener_test.go @@ -86,7 +86,7 @@ func TestUnixListener(t *testing.T) { WorkerAuthKms: c1.Config().WorkerAuthKms, InitialUpstreams: c1.ClusterAddrs(), Logger: logger.Named("w1"), - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) helper.ExpectWorkers(t, c1, w1) diff --git a/internal/tests/cluster/sequential/session_cleanup_test.go b/internal/tests/cluster/sequential/session_cleanup_test.go index bb9d98ab59..a9c1017b8b 100644 --- a/internal/tests/cluster/sequential/session_cleanup_test.go +++ b/internal/tests/cluster/sequential/session_cleanup_test.go @@ -88,7 +88,7 @@ func testWorkerSessionCleanupSingle(burdenCase timeoutBurdenType) func(t *testin PublicClusterAddr: pl.Addr().String(), WorkerRPCGracePeriod: controllerGracePeriod(burdenCase), // Run the scheduler more often to speed up cleanup of orphaned connections - SchedulerRunJobInterval: time.Second, + SchedulerRunJobInterval: 500 * time.Millisecond, }) helper.ExpectWorkers(t, c1) @@ -283,7 +283,7 @@ func testWorkerSessionCleanupMulti(burdenCase timeoutBurdenType) func(t *testing InitialUpstreams: []string{p1.ListenerAddr(), p2.ListenerAddr()}, Logger: logger.Named("w1"), SuccessfulControllerRPCGracePeriodDuration: helper.DefaultControllerRPCGracePeriod, - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) wg.Add(2) diff --git a/internal/tests/cluster/sequential/worker_bytesupdown_test.go b/internal/tests/cluster/sequential/worker_bytesupdown_test.go index ff3be16e56..6c78064ee4 100644 --- a/internal/tests/cluster/sequential/worker_bytesupdown_test.go +++ b/internal/tests/cluster/sequential/worker_bytesupdown_test.go @@ -46,10 +46,11 @@ func TestWorkerBytesUpDown(t *testing.T) { } c1 := controller.NewTestController(t, &controller.TestControllerOpts{ - Config: conf, - InitialResourcesSuffix: "1234567890", - Logger: logger.Named("c1"), - WorkerRPCGracePeriod: helper.DefaultControllerRPCGracePeriod, + Config: conf, + InitialResourcesSuffix: "1234567890", + Logger: logger.Named("c1"), + WorkerRPCGracePeriod: helper.DefaultControllerRPCGracePeriod, + SchedulerRunJobInterval: 500 * time.Millisecond, }) helper.ExpectWorkers(t, c1) @@ -71,7 +72,7 @@ func TestWorkerBytesUpDown(t *testing.T) { Logger: logger.Named("w1"), SuccessfulControllerRPCGracePeriodDuration: helper.DefaultControllerRPCGracePeriod, EnableIPv6: true, - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) helper.ExpectWorkers(t, c1, w1) diff --git a/internal/tests/cluster/sequential/worker_proxy_test.go b/internal/tests/cluster/sequential/worker_proxy_test.go index f63e65387c..1b36b73ba4 100644 --- a/internal/tests/cluster/sequential/worker_proxy_test.go +++ b/internal/tests/cluster/sequential/worker_proxy_test.go @@ -47,10 +47,11 @@ func TestWorkerSessionProxyMultipleConnections(t *testing.T) { } c1 := controller.NewTestController(t, &controller.TestControllerOpts{ - Config: conf, - InitialResourcesSuffix: "1234567890", - Logger: logger.Named("c1"), - WorkerRPCGracePeriod: helper.DefaultControllerRPCGracePeriod, + Config: conf, + InitialResourcesSuffix: "1234567890", + Logger: logger.Named("c1"), + WorkerRPCGracePeriod: helper.DefaultControllerRPCGracePeriod, + SchedulerRunJobInterval: 500 * time.Millisecond, }) helper.ExpectWorkers(t, c1) @@ -72,7 +73,7 @@ func TestWorkerSessionProxyMultipleConnections(t *testing.T) { Logger: logger.Named("w1"), SuccessfulControllerRPCGracePeriodDuration: helper.DefaultControllerRPCGracePeriod, EnableIPv6: true, - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) helper.ExpectWorkers(t, c1, w1) diff --git a/internal/tests/cluster/sequential/worker_tagging_test.go b/internal/tests/cluster/sequential/worker_tagging_test.go index 4a310336b0..36bb9bff54 100644 --- a/internal/tests/cluster/sequential/worker_tagging_test.go +++ b/internal/tests/cluster/sequential/worker_tagging_test.go @@ -79,7 +79,7 @@ func TestWorkerTagging(t *testing.T) { WorkerAuthKms: c1.Config().WorkerAuthKms, InitialUpstreams: c1.ClusterAddrs(), Logger: logger.Named("w2"), - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) w2Addr := w2.ProxyAddrs()[0] @@ -96,7 +96,7 @@ func TestWorkerTagging(t *testing.T) { WorkerAuthKms: c1.Config().WorkerAuthKms, InitialUpstreams: c1.ClusterAddrs(), Logger: logger.Named("w3"), - WorkerRPCInterval: time.Second, + WorkerRPCInterval: 500 * time.Millisecond, }) w3Addr := w3.ProxyAddrs()[0]