Refactor cloud test run cancellation logic to always intercept signals (#33930)

* Refactor cloud test run cancellation logic to always intercept signals

* add test

* add more comments explaining cancellation flow
pull/33941/merge
Liam Cervante 2 years ago committed by GitHub
parent ae6b36b247
commit e37526b4e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -24,6 +24,7 @@ import (
"github.com/hashicorp/terraform/internal/command/jsonformat"
"github.com/hashicorp/terraform/internal/command/views"
"github.com/hashicorp/terraform/internal/configs"
"github.com/hashicorp/terraform/internal/logging"
"github.com/hashicorp/terraform/internal/moduletest"
"github.com/hashicorp/terraform/internal/plans"
"github.com/hashicorp/terraform/internal/terminal"
@ -212,15 +213,56 @@ func (runner *TestSuiteRunner) Test() (moduletest.Status, tfdiags.Diagnostics) {
return moduletest.Error, diags
}
var waitDiags tfdiags.Diagnostics
run, waitDiags = runner.waitForRun(client, run, id)
runningCtx, done := context.WithCancel(context.Background())
go func() {
defer logging.PanicHandler()
defer done()
// Let's wait for the test run to start separately, so we can provide
// some nice updates while we wait.
completed := false
started := time.Now()
updated := started
for i := 0; !completed; i++ {
run, err := client.TestRuns.Read(context.Background(), id, run.ID)
if err != nil {
diags = diags.Append(generalError("Failed to retrieve test run", err))
return // exit early
}
if run.Status != tfe.TestRunQueued {
// We block as long as the test run is still queued.
completed = true
continue // We can render the logs now.
}
current := time.Now()
if i == 0 || current.Sub(updated).Seconds() > 30 {
updated = current
// TODO: Provide better updates based on queue status etc.
// We could look through the queue to find out exactly where the
// test run is and give a count down. Other stuff like that.
// For now, we'll just print a simple status updated.
runner.View.TFCStatusUpdate(run.Status, current.Sub(started))
}
}
// The test run has actually started now, so let's render the logs.
logDiags := runner.renderLogs(client, run, id)
diags = diags.Append(logDiags)
}()
// We're doing a couple of things in the wait function. Firstly, waiting
// for the test run to actually finish. Secondly, listening for interrupt
// signals and forwarding them onto TFC.
waitDiags := runner.wait(runningCtx, client, run, id)
diags = diags.Append(waitDiags)
if waitDiags.HasErrors() {
return moduletest.Error, diags
}
logDiags := runner.renderLogs(client, run, id)
diags = diags.Append(logDiags)
if diags.HasErrors() {
return moduletest.Error, diags
}
@ -354,42 +396,10 @@ func (runner *TestSuiteRunner) client(addr tfaddr.Module, id tfe.RegistryModuleI
return client, module, diags
}
func (runner *TestSuiteRunner) waitForRun(client *tfe.Client, original *tfe.TestRun, moduleId tfe.RegistryModuleID) (*tfe.TestRun, tfdiags.Diagnostics) {
func (runner *TestSuiteRunner) wait(ctx context.Context, client *tfe.Client, run *tfe.TestRun, moduleId tfe.RegistryModuleID) tfdiags.Diagnostics {
var diags tfdiags.Diagnostics
run := original
started := time.Now()
updated := started
completed := func(i int) bool {
var err error
if run, err = client.TestRuns.Read(context.Background(), moduleId, run.ID); err != nil {
diags = diags.Append(generalError("Failed to retrieve test run", err))
return true
}
if run.Status != tfe.TestRunQueued {
// We block as long as the test run is still queued.
return true
}
current := time.Now()
if i == 0 || current.Sub(updated).Seconds() > 30 {
updated = current
// TODO: Provide better updates based on queue status etc.
// We could look through the queue to find out exactly where the
// test run is and give a count down. Other stuff like that.
// For now, we'll just print a simple status updated.
runner.View.TFCStatusUpdate(run.Status, current.Sub(started))
}
return false
}
handleCancelled := func(i int) {
handleCancelled := func() {
if err := client.TestRuns.ForceCancel(context.Background(), moduleId, run.ID); err != nil {
diags = diags.Append(tfdiags.Sourceless(
tfdiags.Error,
@ -398,25 +408,14 @@ func (runner *TestSuiteRunner) waitForRun(client *tfe.Client, original *tfe.Test
return
}
// Otherwise, we'll still wait for the operation to finish as we want to
// render the logs later.
//
// At this point Terraform will just kill itself if the operation takes
// too long to finish. We don't need to handle that here.
for ; ; i++ {
// Wait for the backoff.
time.Sleep(backoff(backoffMin, backoffMax, i))
// Check if we're done.
if completed(i) {
return
}
}
// At this point we've requested a force cancel, and we know that
// Terraform locally is just going to quit after some amount of time so
// we'll just wait for that to happen or for TFC to finish, whichever
// happens first.
<-ctx.Done()
}
handleStopped := func(i int) {
handleStopped := func() {
if err := client.TestRuns.Cancel(context.Background(), moduleId, run.ID); err != nil {
diags = diags.Append(tfdiags.Sourceless(
tfdiags.Error,
@ -425,40 +424,31 @@ func (runner *TestSuiteRunner) waitForRun(client *tfe.Client, original *tfe.Test
return
}
// We've requested a cancel, we'll just happily wait for the remote
// operation to trigger everything and shut down nicely.
for ; ; i++ {
select {
case <-runner.CancelledCtx.Done():
handleCancelled(i)
return
case <-time.After(backoff(backoffMin, backoffMax, i)):
// Timer up, show status
}
if completed(i) {
return
}
}
}
for i := 0; ; i++ {
// We've request a cancel, we're happy to just wait for TFC to cancel
// the run appropriately.
select {
case <-runner.StoppedCtx.Done():
handleStopped(i)
return run, diags
case <-runner.CancelledCtx.Done():
handleCancelled(i)
return run, diags
case <-time.After(backoff(backoffMin, backoffMax, i)):
// Timer up, show status
// We got more pushy, let's force cancel.
handleCancelled()
case <-ctx.Done():
// It finished normally after we request the cancel. Do nothing.
}
}
if completed(i) {
return run, diags
}
select {
case <-runner.StoppedCtx.Done():
// The StoppedCtx is passed in from the command package, which is
// listening for interrupts from the user. After the first interrupt the
// StoppedCtx is triggered.
handleStopped()
case <-runner.CancelledCtx.Done():
// After the second interrupt the CancelledCtx is triggered.
handleCancelled()
case <-ctx.Done():
// The remote run finished normally! Do nothing.
}
return diags
}
func (runner *TestSuiteRunner) renderLogs(client *tfe.Client, run *tfe.TestRun, moduleId tfe.RegistryModuleID) tfdiags.Diagnostics {

@ -459,6 +459,119 @@ Success! 1 passed, 0 failed, 1 skipped.
}
}
// TestTest_DelayedCancel just makes sure that if we trigger the cancellation
// during the log reading stage then it still cancels properly.
func TestTest_DelayedCancel(t *testing.T) {
streams, outputFn := terminal.StreamsForTesting(t)
view := views.NewTest(arguments.ViewHuman, views.NewView(streams))
colorize := mockColorize()
colorize.Disable = true
mock := NewMockClient()
client := &tfe.Client{
ConfigurationVersions: mock.ConfigurationVersions,
Organizations: mock.Organizations,
RegistryModules: mock.RegistryModules,
TestRuns: mock.TestRuns,
}
if _, err := client.Organizations.Create(context.Background(), tfe.OrganizationCreateOptions{
Name: tfe.String("organisation"),
}); err != nil {
t.Fatalf("failed to create organisation: %v", err)
}
module, err := client.RegistryModules.Create(context.Background(), "organisation", tfe.RegistryModuleCreateOptions{
Name: tfe.String("name"),
Provider: tfe.String("provider"),
RegistryName: "app.terraform.io",
Namespace: "organisation",
})
if err != nil {
t.Fatalf("failed to create registry module: %v", err)
}
doneContext, done := context.WithCancel(context.Background())
stopContext, stop := context.WithCancel(context.Background())
mock.TestRuns.delayedCancel = stop
runner := TestSuiteRunner{
// Configuration data.
ConfigDirectory: "testdata/test-cancel",
TestingDirectory: "tests",
Config: nil, // We don't need this for this test.
Source: "app.terraform.io/organisation/name/provider",
// Cancellation controls, we won't be doing any cancellations in this
// test.
Stopped: false,
Cancelled: false,
StoppedCtx: stopContext,
CancelledCtx: context.Background(),
// Test Options, empty for this test.
GlobalVariables: nil,
Verbose: false,
Filters: nil,
// Outputs
Renderer: &jsonformat.Renderer{
Streams: streams,
Colorize: colorize,
RunningInAutomation: false,
},
View: view,
Streams: streams,
// Networking
Services: nil, // Don't need this when the client is overridden.
clientOverride: client,
}
var diags tfdiags.Diagnostics
go func() {
defer done()
_, diags = runner.Test()
}()
// Wait for finish!
<-doneContext.Done()
if len(diags) > 0 {
t.Errorf("found diags and expected none: %s", diags.ErrWithWarnings())
}
output := outputFn(t)
actual := output.All()
expected := `main.tftest.hcl... in progress
Interrupt received.
Please wait for Terraform to exit or data loss may occur.
Gracefully shutting down...
defaults... pass
overrides... skip
main.tftest.hcl... tearing down
main.tftest.hcl... pass
Success! 1 passed, 0 failed, 1 skipped.
`
if diff := cmp.Diff(expected, actual); len(diff) > 0 {
t.Errorf("expected:\n%s\nactual:\n%s\ndiff:\n%s", expected, actual, diff)
}
// We want to make sure the cancel signal actually made it through.
// Luckily we can access the test runs directly in the mock client.
tr := mock.TestRuns.modules[module.ID][0]
if tr.Status != tfe.TestRunCanceled {
t.Errorf("expected test run to have been cancelled but was %s", tr.Status)
}
}
func TestTest_ForceCancel(t *testing.T) {
loader, close := configload.NewLoaderForTests(t)

@ -1638,16 +1638,22 @@ type MockTestRuns struct {
TestRuns map[string]*tfe.TestRun
modules map[string][]*tfe.TestRun
logs map[string]string
// delayedCancel allows a mock test run to cancel an operation instead of
// completing an operation. It's used
delayedCancel context.CancelFunc
cancelled bool
}
var _ tfe.TestRuns = (*MockTestRuns)(nil)
func newMockTestRuns(client *MockClient) *MockTestRuns {
return &MockTestRuns{
client: client,
TestRuns: make(map[string]*tfe.TestRun),
modules: make(map[string][]*tfe.TestRun),
logs: make(map[string]string),
client: client,
TestRuns: make(map[string]*tfe.TestRun),
modules: make(map[string][]*tfe.TestRun),
logs: make(map[string]string),
cancelled: false,
}
}
@ -1679,13 +1685,11 @@ func (m *MockTestRuns) Read(ctx context.Context, moduleID tfe.RegistryModuleID,
if tr, exists := m.TestRuns[testRunID]; exists {
// This just simulates some natural progression
// This just simulates some natural progression, the first time a
// test run is read it'll progress from queued to running.
switch tr.Status {
case tfe.TestRunQueued:
tr.Status = tfe.TestRunRunning
case tfe.TestRunRunning:
tr.Status = tfe.TestRunFinished
tr.TestStatus = tfe.TestPass
}
return tr, nil
@ -1750,11 +1754,25 @@ func (m *MockTestRuns) Logs(ctx context.Context, moduleID tfe.RegistryModuleID,
switch tr.Status {
case tfe.TestRunRunning:
// Update the status so that on the next call it thinks it's
// finished.
tr.Status = tfe.TestRunFinished
tr.TestStatus = tfe.TestPass
return false, nil
// The first time the done function is called we'll progress from
// running into finished. We may instead cancel this if the
// delayedCancel trigger is set.
if m.delayedCancel != nil {
if !m.cancelled {
// Make sure we only trigger the cancel once.
m.delayedCancel()
m.cancelled = true
}
return false, nil
} else {
// Update the status so that on the next call it thinks it's
// finished.
tr.Status = tfe.TestRunFinished
tr.TestStatus = tfe.TestPass
return false, nil
}
case tfe.TestRunFinished, tfe.TestRunCanceled:
// We're done.

Loading…
Cancel
Save