chore(test) fix flaky tests (#6315)

* internal/cmd/commands/server/listener_reload_test.go remove t.Parallel() comment

* internal/cmd/commands/server/worker_initial_upstreams_reload_test.go remove t.Parallel

* internal/cmd/commands/server/worker_tags_reload_test.go wait by polling worker instead of sleep

* internal/daemon/worker/routing_info.go make deep copy in LastRoutingInfoSuccess to avoid race condition

* internal/tests/cluster/parallel/orphaned_connections_test.go raise grace period time to allow test to wait for longer

* improve logging in sequential tests

* move ports around and use t.Cleanup to send shutdown signal to test server

* fix one typo

* do final assert in t.Cleanup instead of defer

* undo t.Cleanup change

* add another signal to force shutdown running servers

* add exit early and increase server start timeout

* tighten workerRPC settings on session tests

* tighten more workerRPC timing

* make gen

* add comment explaining deep copying lastRoutingInfoSuccess
pull/6316/head^2
Sorawis Nilparuk (Bo) 4 months ago committed by GitHub
parent 0a165c0921
commit fe09f61a85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -59,12 +59,12 @@ kms "aead" {
listener "tcp" {
purpose = "api"
address = "127.0.0.1:9500"
address = "127.0.0.1:9600"
tls_disable = true
}
listener "tcp" {
address = "127.0.0.1:9501"
address = "127.0.0.1:9601"
purpose = "cluster"
}
`
@ -108,22 +108,30 @@ func TestReloadControllerDatabase(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
earlyExitChan := make(chan struct{})
go func() {
defer wg.Done()
args := []string{"-config", td + "/config.hcl"}
exitCode := cmd.Run(args)
if exitCode != 0 {
output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String()
fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output)
}
// send non-blocking message to a channel to signal that the server has exited
// this channel is used to avoid waiting for the full timeout in case of early exit
select {
case earlyExitChan <- struct{}{}:
default:
}
}()
// Wait until things are up and running (or timeout).
select {
case <-cmd.startedCh:
case <-time.After(15 * time.Second):
t.Fatal("timeout")
case <-earlyExitChan:
t.Fatal("server exited early")
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for server to start")
}
require.NotNil(t, cmd.schemaManager)
@ -158,7 +166,7 @@ func TestReloadControllerDatabase(t *testing.T) {
select {
case <-cmd.reloadedCh:
case <-time.After(15 * time.Second):
t.Fatal("timeout")
t.Fatal("timeout waiting for server reload")
}
// Assert that the schema manager ptr and value changed
@ -243,6 +251,7 @@ func TestReloadControllerDatabase_InvalidNewDatabaseState(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
earlyExitChan := make(chan struct{})
go func() {
defer wg.Done()
@ -252,13 +261,19 @@ func TestReloadControllerDatabase_InvalidNewDatabaseState(t *testing.T) {
output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String()
fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output)
}
select {
case earlyExitChan <- struct{}{}:
default:
}
}()
// Wait until things are up and running (or timeout).
select {
case <-cmd.startedCh:
case <-time.After(15 * time.Second):
t.Fatal("timeout")
case <-earlyExitChan:
t.Fatal("server exited early")
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for server to start")
}
require.NotNil(t, cmd.schemaManager)
@ -292,7 +307,7 @@ func TestReloadControllerDatabase_InvalidNewDatabaseState(t *testing.T) {
select {
case <-cmd.reloadedCh:
case <-time.After(15 * time.Second):
t.Fatal("timeout")
t.Fatal("timeout waiting for server reload")
}
// Assert that the schema manager ptr and value did not change.

@ -180,7 +180,7 @@ listener "tcp" {
listener "tcp" {
address = "127.0.0.1:9501"
purpose = "cluster"
}
}
`
)
@ -202,22 +202,28 @@ func TestReloadControllerRateLimits(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
earlyExitChan := make(chan struct{})
go func() {
defer wg.Done()
args := []string{"-config", td + "/config.hcl"}
exitCode := cmd.Run(args)
if exitCode != 0 {
output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String()
fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output)
}
select {
case earlyExitChan <- struct{}{}:
default:
}
}()
// Wait until things are up and running (or timeout).
select {
case <-cmd.startedCh:
case <-time.After(15 * time.Second):
t.Fatal("timeout")
case <-earlyExitChan:
t.Fatal("server exited early")
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for server to start")
}
// Change config so it is ready for reloading
@ -262,7 +268,7 @@ func TestReloadControllerRateLimits(t *testing.T) {
select {
case <-cmd.reloadedCh:
case <-time.After(15 * time.Second):
t.Fatal("timeout")
t.Fatal("timeout waiting for reload signal")
}
// Make another request, the limit should have reset and the new limit
@ -301,6 +307,7 @@ func TestReloadControllerRateLimitsSameConfig(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
earlyExitChan := make(chan struct{})
go func() {
defer wg.Done()
@ -310,13 +317,19 @@ func TestReloadControllerRateLimitsSameConfig(t *testing.T) {
output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String()
fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output)
}
select {
case earlyExitChan <- struct{}{}:
default:
}
}()
// Wait until things are up and running (or timeout).
select {
case <-cmd.startedCh:
case <-time.After(15 * time.Second):
t.Fatal("timeout")
case <-earlyExitChan:
t.Fatal("server exited early")
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for server to start")
}
c := http.Client{}
@ -395,6 +408,7 @@ func TestReloadControllerRateLimitsDisable(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
earlyExitChan := make(chan struct{})
go func() {
defer wg.Done()
@ -404,13 +418,19 @@ func TestReloadControllerRateLimitsDisable(t *testing.T) {
output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String()
fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output)
}
select {
case earlyExitChan <- struct{}{}:
default:
}
}()
// Wait until things are up and running (or timeout).
select {
case <-cmd.startedCh:
case <-time.After(15 * time.Second):
t.Fatal("timeout")
case <-earlyExitChan:
t.Fatal("server exited early")
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for server to start")
}
// Change config so it is ready for reloading
@ -494,6 +514,7 @@ func TestReloadControllerRateLimitsEnable(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
earlyExitChan := make(chan struct{})
go func() {
defer wg.Done()
@ -503,13 +524,19 @@ func TestReloadControllerRateLimitsEnable(t *testing.T) {
output := cmd.UI.(*cli.MockUi).ErrorWriter.String() + cmd.UI.(*cli.MockUi).OutputWriter.String()
fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output)
}
select {
case earlyExitChan <- struct{}{}:
default:
}
}()
// Wait until things are up and running (or timeout).
select {
case <-cmd.startedCh:
case <-time.After(15 * time.Second):
t.Fatal("timeout")
case <-earlyExitChan:
t.Fatal("server exited early")
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for server to start")
}
// Change config so it is ready for reloading

@ -64,7 +64,7 @@ kms "aead" {
listener "tcp" {
purpose = "api"
address = "127.0.0.1:9500"
address = "127.0.0.1:9700"
tls_cert_file = "%s/bundle.pem"
tls_key_file = "%s/bundle.pem"
cors_enabled = true
@ -72,18 +72,17 @@ listener "tcp" {
}
listener "tcp" {
address = "127.0.0.1:9501"
address = "127.0.0.1:9701"
purpose = "cluster"
}
listener "tcp" {
address = "127.0.0.1:9502"
address = "127.0.0.1:9702"
purpose = "proxy"
}
`
func TestServer_ReloadListener(t *testing.T) {
// t.Parallel()
require := require.New(t)
wg := &sync.WaitGroup{}
@ -107,12 +106,11 @@ func TestServer_ReloadListener(t *testing.T) {
cmd.WorkerAuthKms = nil
cmd.RecoveryKms = nil
defer func() {
t.Cleanup(func() {
if cmd.DevDatabaseCleanupFunc != nil {
require.NoError(cmd.DevDatabaseCleanupFunc())
}
}()
})
// Setup initial certs
inBytes, err := os.ReadFile(wd + "bundle1.pem")
require.NoError(err)
@ -135,9 +133,8 @@ func TestServer_ReloadListener(t *testing.T) {
fmt.Printf("%s: got a non-zero exit status: %s", t.Name(), output)
}
}()
testCertificateSerial := func(serial string) {
conn, err := tls.Dial("tcp", "127.0.0.1:9500", &tls.Config{
conn, err := tls.Dial("tcp", "127.0.0.1:9700", &tls.Config{
RootCAs: certPool,
})
require.NoError(err)
@ -168,7 +165,6 @@ func TestServer_ReloadListener(t *testing.T) {
}
testCertificateSerial("193080739105342897219784862820114567438786419504")
cmd.ShutdownCh <- struct{}{}
wg.Wait()
}

@ -45,7 +45,6 @@ worker {
`
func TestServer_ReloadInitialUpstreams(t *testing.T) {
t.Parallel()
require := require.New(t)
rootWrapper, _ := wrapperWithKey(t)
@ -186,7 +185,6 @@ pollSecondController:
poll.Reset(time.Second)
}
}
cmd.ShutdownCh <- struct{}{}
wg.Wait()
}

@ -76,7 +76,7 @@ func TestServer_ReloadWorkerTags(t *testing.T) {
recoveryWrapper, _ := wrapperWithKey(t)
workerAuthWrapper, key := wrapperWithKey(t)
testController := controller.NewTestController(t, controller.WithWorkerAuthKms(workerAuthWrapper), controller.WithRootKms(rootWrapper), controller.WithRecoveryKms(recoveryWrapper))
defer testController.Shutdown()
t.Cleanup(testController.Shutdown)
wg := &sync.WaitGroup{}
@ -98,21 +98,32 @@ func TestServer_ReloadWorkerTags(t *testing.T) {
t.Fatalf("timeout")
}
fetchWorkerTags := func(name string, key string, values []string) {
waitForWorkerTags := func(name string, key string, values []string) {
t.Helper()
serversRepo, err := testController.Controller().ServersRepoFn()
require.NoError(err)
w, err := server.TestLookupWorkerByName(testController.Context(), t, name, serversRepo)
require.NoError(err)
require.NotNil(w)
v, ok := w.CanonicalTags()[key]
require.True(ok)
require.ElementsMatch(values, v)
maxWait := time.NewTimer(time.Second * 15)
for {
select {
case <-maxWait.C:
t.Fatal("timed out waiting for worker tags")
case <-time.After(500 * time.Millisecond):
serversRepo, err := testController.Controller().ServersRepoFn()
require.NoError(err)
w, err := server.TestLookupWorkerByName(testController.Context(), t, name, serversRepo)
require.NoError(err)
require.NotNil(w)
v, ok := w.CanonicalTags()[key]
if !ok {
continue
}
require.True(ok)
require.ElementsMatch(values, v)
return
}
}
}
// Give time to populate up to the controller
time.Sleep(10 * time.Second)
fetchWorkerTags("test", "type", []string{"dev", "local"})
waitForWorkerTags("test", "type", []string{"dev", "local"})
cmd.presetConfig.Store(fmt.Sprintf(workerBaseConfig+tag2Config, key, testController.ClusterAddrs()[0]))
@ -123,11 +134,8 @@ func TestServer_ReloadWorkerTags(t *testing.T) {
t.Fatalf("timeout")
}
time.Sleep(10 * time.Second)
fetchWorkerTags("test", "foo", []string{"bar", "baz"})
waitForWorkerTags("test", "foo", []string{"bar", "baz"})
cmd.ShutdownCh <- struct{}{}
wg.Wait()
}

@ -64,8 +64,10 @@ func TestRepository_New(t *testing.T) {
w: rw,
kms: kmsCache,
scheduler: sche,
opts: []Option{WithLimit(5),
WithRandomReader(testReader)},
opts: []Option{
WithLimit(5),
WithRandomReader(testReader),
},
},
want: &Repository{
reader: rw,

@ -113,10 +113,12 @@ func (w *Worker) startRoutingInfoTicking(cancelCtx context.Context) {
// RoutingInfo request.
func (w *Worker) LastRoutingInfoSuccess() *LastRoutingInfo {
s, ok := w.lastRoutingInfoSuccess.Load().(*LastRoutingInfo)
if !ok {
if !ok || s == nil {
return nil
}
return s
// make a deep copy to avoid race conditions from accessing underlying data
copied := *s
return &copied
}
func (w *Worker) sendWorkerRoutingInfo(cancelCtx context.Context) {
@ -178,7 +180,11 @@ func (w *Worker) sendWorkerRoutingInfo(cancelCtx context.Context) {
// In the case that the control plane has gone down and come up with different IPs,
// append initial upstreams/ cluster addr to the resolver to try
if pastGrace, _, _ := w.isPastGrace(); pastGrace && w.GrpcClientConn.Load().GetState() == connectivity.TransientFailure {
lastRoutingInfo := w.lastRoutingInfoSuccess.Load().(*LastRoutingInfo)
routingInfo := w.lastRoutingInfoSuccess.Load().(*LastRoutingInfo)
// make a deep copy to avoid race conditions from accessing underlying data
// since routingInfo may be modified later in this function
copied := *routingInfo
lastRoutingInfo := &copied
if lastRoutingInfo != nil && lastRoutingInfo.LastCalculatedUpstreams != nil {
addrs := lastRoutingInfo.LastCalculatedUpstreams

@ -60,7 +60,7 @@ func TestIPv6Listener(t *testing.T) {
WorkerAuthKms: c1.Config().WorkerAuthKms,
InitialUpstreams: append(c1.ClusterAddrs(), c2.ClusterAddrs()...),
Logger: logger.Named("w1"),
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
wg.Add(2)

@ -53,7 +53,7 @@ func TestMultiControllerMultiWorkerConnections(t *testing.T) {
WorkerAuthKms: c1.Config().WorkerAuthKms,
InitialUpstreams: append(c1.ClusterAddrs(), c2.ClusterAddrs()...),
Logger: logger.Named("w1"),
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
wg.Add(2)
@ -99,7 +99,7 @@ func TestMultiControllerMultiWorkerConnections(t *testing.T) {
WorkerAuthKms: c1.Config().WorkerAuthKms,
InitialUpstreams: c1.ClusterAddrs(),
Logger: logger.Named("w3"),
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
wg.Add(2)
@ -157,7 +157,7 @@ func TestWorkerAppendInitialUpstreams(t *testing.T) {
InitialUpstreams: initialUpstreams,
Logger: logger.Named("w1"),
SuccessfulControllerRPCGracePeriodDuration: 1 * time.Second,
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
// Wait for worker to send routing info

@ -40,9 +40,9 @@ func TestSpeedyOrphanedConnectionCleanup(t *testing.T) {
InitialResourcesSuffix: "1234567890",
Logger: logger.Named("c1"),
PublicClusterAddr: pl.Addr().String(),
WorkerRPCGracePeriod: 2 * time.Second,
WorkerRPCGracePeriod: 10 * time.Second,
// Run the scheduler more often to speed up cleanup of orphaned connections
SchedulerRunJobInterval: time.Second,
SchedulerRunJobInterval: 500 * time.Millisecond,
})
helper.ExpectWorkers(t, c)
@ -64,8 +64,8 @@ func TestSpeedyOrphanedConnectionCleanup(t *testing.T) {
WorkerAuthKms: c.Config().WorkerAuthKms,
InitialUpstreams: []string{proxy.ListenerAddr()},
Logger: logger.Named("w1"),
SuccessfulControllerRPCGracePeriodDuration: 2 * time.Second,
WorkerRPCInterval: time.Second,
SuccessfulControllerRPCGracePeriodDuration: 10 * time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
helper.ExpectWorkers(t, c, w)

@ -86,7 +86,7 @@ func TestUnixListener(t *testing.T) {
WorkerAuthKms: c1.Config().WorkerAuthKms,
InitialUpstreams: c1.ClusterAddrs(),
Logger: logger.Named("w1"),
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
helper.ExpectWorkers(t, c1, w1)

@ -88,7 +88,7 @@ func testWorkerSessionCleanupSingle(burdenCase timeoutBurdenType) func(t *testin
PublicClusterAddr: pl.Addr().String(),
WorkerRPCGracePeriod: controllerGracePeriod(burdenCase),
// Run the scheduler more often to speed up cleanup of orphaned connections
SchedulerRunJobInterval: time.Second,
SchedulerRunJobInterval: 500 * time.Millisecond,
})
helper.ExpectWorkers(t, c1)
@ -283,7 +283,7 @@ func testWorkerSessionCleanupMulti(burdenCase timeoutBurdenType) func(t *testing
InitialUpstreams: []string{p1.ListenerAddr(), p2.ListenerAddr()},
Logger: logger.Named("w1"),
SuccessfulControllerRPCGracePeriodDuration: helper.DefaultControllerRPCGracePeriod,
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
wg.Add(2)

@ -46,10 +46,11 @@ func TestWorkerBytesUpDown(t *testing.T) {
}
c1 := controller.NewTestController(t, &controller.TestControllerOpts{
Config: conf,
InitialResourcesSuffix: "1234567890",
Logger: logger.Named("c1"),
WorkerRPCGracePeriod: helper.DefaultControllerRPCGracePeriod,
Config: conf,
InitialResourcesSuffix: "1234567890",
Logger: logger.Named("c1"),
WorkerRPCGracePeriod: helper.DefaultControllerRPCGracePeriod,
SchedulerRunJobInterval: 500 * time.Millisecond,
})
helper.ExpectWorkers(t, c1)
@ -71,7 +72,7 @@ func TestWorkerBytesUpDown(t *testing.T) {
Logger: logger.Named("w1"),
SuccessfulControllerRPCGracePeriodDuration: helper.DefaultControllerRPCGracePeriod,
EnableIPv6: true,
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
helper.ExpectWorkers(t, c1, w1)

@ -47,10 +47,11 @@ func TestWorkerSessionProxyMultipleConnections(t *testing.T) {
}
c1 := controller.NewTestController(t, &controller.TestControllerOpts{
Config: conf,
InitialResourcesSuffix: "1234567890",
Logger: logger.Named("c1"),
WorkerRPCGracePeriod: helper.DefaultControllerRPCGracePeriod,
Config: conf,
InitialResourcesSuffix: "1234567890",
Logger: logger.Named("c1"),
WorkerRPCGracePeriod: helper.DefaultControllerRPCGracePeriod,
SchedulerRunJobInterval: 500 * time.Millisecond,
})
helper.ExpectWorkers(t, c1)
@ -72,7 +73,7 @@ func TestWorkerSessionProxyMultipleConnections(t *testing.T) {
Logger: logger.Named("w1"),
SuccessfulControllerRPCGracePeriodDuration: helper.DefaultControllerRPCGracePeriod,
EnableIPv6: true,
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
helper.ExpectWorkers(t, c1, w1)

@ -79,7 +79,7 @@ func TestWorkerTagging(t *testing.T) {
WorkerAuthKms: c1.Config().WorkerAuthKms,
InitialUpstreams: c1.ClusterAddrs(),
Logger: logger.Named("w2"),
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
w2Addr := w2.ProxyAddrs()[0]
@ -96,7 +96,7 @@ func TestWorkerTagging(t *testing.T) {
WorkerAuthKms: c1.Config().WorkerAuthKms,
InitialUpstreams: c1.ClusterAddrs(),
Logger: logger.Named("w3"),
WorkerRPCInterval: time.Second,
WorkerRPCInterval: 500 * time.Millisecond,
})
w3Addr := w3.ProxyAddrs()[0]

Loading…
Cancel
Save