Merge pull request #35247 from hashicorp/alisdair/rpcapi-stop

rpcapi: Allow stopping long-running operations
pull/35262/head
Alisdair McDiarmid 2 years ago committed by GitHub
commit c8eec89c3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -45,7 +45,7 @@ func registerGRPCServices(s *grpc.Server, opts *serviceOpts) {
terraform1.RegisterSetupServer(s, setup)
}
func serverHandshake(s *grpc.Server, opts *serviceOpts) func(context.Context, *terraform1.Handshake_Request) (*terraform1.ServerCapabilities, error) {
func serverHandshake(s *grpc.Server, opts *serviceOpts) func(context.Context, *terraform1.Handshake_Request, *stopper) (*terraform1.ServerCapabilities, error) {
dependencies := dynrpcserver.NewDependenciesStub()
terraform1.RegisterDependenciesServer(s, dependencies)
stacks := dynrpcserver.NewStacksStub()
@ -53,7 +53,7 @@ func serverHandshake(s *grpc.Server, opts *serviceOpts) func(context.Context, *t
packages := dynrpcserver.NewPackagesStub()
terraform1.RegisterPackagesServer(s, packages)
return func(ctx context.Context, request *terraform1.Handshake_Request) (*terraform1.ServerCapabilities, error) {
return func(ctx context.Context, request *terraform1.Handshake_Request, stopper *stopper) (*terraform1.ServerCapabilities, error) {
// All of our servers will share a common handles table so that objects
// can be passed from one service to another.
handles := newHandleTable()
@ -79,7 +79,7 @@ func serverHandshake(s *grpc.Server, opts *serviceOpts) func(context.Context, *t
// doing real work. In future the details of what we register here
// might vary based on the negotiated capabilities.
dependencies.ActivateRPCServer(newDependenciesServer(handles, services))
stacks.ActivateRPCServer(newStacksServer(handles, opts))
stacks.ActivateRPCServer(newStacksServer(stopper, handles, opts))
packages.ActivateRPCServer(newPackagesServer(services))
// If the client requested any extra capabililties that we're going

@ -31,7 +31,7 @@ func ServePlugin(ctx context.Context, opts ServerOpts) error {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: handshake,
VersionedPlugins: map[int]plugin.PluginSet{
1: plugin.PluginSet{
1: {
"tfcore": &corePlugin{
experimentsAllowed: opts.ExperimentsAllowed,
},

@ -16,7 +16,7 @@ import (
// setupServer is an implementation of the "Setup" service defined in our
// terraform1 package.
//
// This service is here only to offer the "Handshake" function, which clients
// This service is here mainly to offer the "Handshake" function, which clients
// must call to negotiate access to any other services. This is really just
// an adapter around a handshake function implemented on [corePlugin].
type setupServer struct {
@ -25,13 +25,19 @@ type setupServer struct {
// initOthers is the callback used to perform the capability negotiation
// step and initialize all of the other API services based on what was
// negotiated.
initOthers func(context.Context, *terraform1.Handshake_Request) (*terraform1.ServerCapabilities, error)
mu sync.Mutex
initOthers func(context.Context, *terraform1.Handshake_Request, *stopper) (*terraform1.ServerCapabilities, error)
// stopper is used to track and stop long-running operations when the Stop
// RPC is called.
stopper *stopper
mu sync.Mutex
}
func newSetupServer(initOthers func(context.Context, *terraform1.Handshake_Request) (*terraform1.ServerCapabilities, error)) terraform1.SetupServer {
func newSetupServer(initOthers func(context.Context, *terraform1.Handshake_Request, *stopper) (*terraform1.ServerCapabilities, error)) terraform1.SetupServer {
return &setupServer{
initOthers: initOthers,
stopper: newStopper(),
}
}
@ -47,7 +53,7 @@ func (s *setupServer) Handshake(ctx context.Context, req *terraform1.Handshake_R
var err error
{
ctx, span := tracer.Start(ctx, "initialize RPC services")
serverCaps, err = s.initOthers(ctx, req)
serverCaps, err = s.initOthers(ctx, req, s.stopper)
span.End()
}
s.initOthers = nil // cannot handshake again
@ -58,3 +64,12 @@ func (s *setupServer) Handshake(ctx context.Context, req *terraform1.Handshake_R
Capabilities: serverCaps,
}, nil
}
func (s *setupServer) Stop(ctx context.Context, req *terraform1.Stop_Request) (*terraform1.Stop_Response, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.stopper.stop()
return &terraform1.Stop_Response{}, nil
}

@ -0,0 +1,85 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package rpcapi
import (
"context"
"strings"
"sync"
"testing"
"github.com/hashicorp/terraform/internal/rpcapi/terraform1"
)
func TestSetupServer_Handshake(t *testing.T) {
called := 0
server := newSetupServer(func(ctx context.Context, req *terraform1.Handshake_Request, stopper *stopper) (*terraform1.ServerCapabilities, error) {
called++
if got, want := req.Config.Credentials["localterraform.com"].Token, "boop"; got != want {
t.Fatalf("incorrect token. got %q, want %q", got, want)
}
return &terraform1.ServerCapabilities{}, nil
})
req := &terraform1.Handshake_Request{
Capabilities: &terraform1.ClientCapabilities{},
Config: &terraform1.Config{
Credentials: map[string]*terraform1.HostCredential{
"localterraform.com": {
Token: "boop",
},
},
},
}
_, err := server.Handshake(context.Background(), req)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if called != 1 {
t.Errorf("unexpected initOthers call count %d, want 1", called)
}
_, err = server.Handshake(context.Background(), req)
if err == nil || !strings.Contains(err.Error(), "handshake already completed") {
t.Fatalf("unexpected error: %s", err)
}
if called != 1 {
t.Errorf("unexpected initOthers call count %d, want 1", called)
}
}
func TestSetupServer_Stop(t *testing.T) {
var s *stopper
server := newSetupServer(func(ctx context.Context, req *terraform1.Handshake_Request, stopper *stopper) (*terraform1.ServerCapabilities, error) {
s = stopper
return &terraform1.ServerCapabilities{}, nil
})
_, err := server.Handshake(context.Background(), nil)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if s == nil {
t.Fatal("stopper not passed to initOthers")
}
var wg sync.WaitGroup
var stops []stopChan
for range 2 {
stops = append(stops, s.add())
wg.Add(1)
}
for _, stop := range stops {
stop := stop
go func() {
<-stop
wg.Done()
}()
}
server.Stop(context.Background(), nil)
wg.Wait()
}

@ -31,14 +31,16 @@ import (
type stacksServer struct {
terraform1.UnimplementedStacksServer
stopper *stopper
handles *handleTable
experimentsAllowed bool
}
var _ terraform1.StacksServer = (*stacksServer)(nil)
func newStacksServer(handles *handleTable, opts *serviceOpts) *stacksServer {
func newStacksServer(stopper *stopper, handles *handleTable, opts *serviceOpts) *stacksServer {
return &stacksServer{
stopper: stopper,
handles: handles,
experimentsAllowed: opts.experimentsAllowed,
}
@ -271,10 +273,21 @@ func (s *stacksServer) PlanStackChanges(req *terraform1.PlanStackChanges_Request
Diagnostics: diagsCh,
}
// As a long-running operation, the plan RPC must be able to be stopped. We
// do this by requesting a stop channel from the stopper, and using it to
// cancel the planning process.
stopCh := s.stopper.add()
defer s.stopper.remove(stopCh)
// We create a new cancellable context for the stack plan operation to
// allow us to respond to stop requests.
planCtx, cancelPlan := context.WithCancel(ctx)
defer cancelPlan()
// The actual plan operation runs in the background, and emits events
// to us via the channels in rtResp before finally closing changesCh
// to signal that the process is complete.
go stackruntime.Plan(ctx, &rtReq, &rtResp)
go stackruntime.Plan(planCtx, &rtReq, &rtResp)
emitDiag := func(diag tfdiags.Diagnostic) {
diags := tfdiags.Diagnostics{diag}
@ -345,6 +358,12 @@ Events:
}
emitDiag(diag)
case <-stopCh:
// If our stop channel is signalled, we need to cancel the plan.
// This may result in remaining changes or diagnostics being
// emitted, so we continue to monitor those channels if they're
// still active.
cancelPlan()
}
}
@ -408,10 +427,21 @@ func (s *stacksServer) ApplyStackChanges(req *terraform1.ApplyStackChanges_Reque
Diagnostics: diagsCh,
}
// As a long-running operation, the apply RPC must be able to be stopped.
// We do this by requesting a stop channel from the stopper, and using it
// to cancel the planning process.
stopCh := s.stopper.add()
defer s.stopper.remove(stopCh)
// We create a new cancellable context for the stack plan operation to
// allow us to respond to stop requests.
applyCtx, cancelApply := context.WithCancel(ctx)
defer cancelApply()
// The actual apply operation runs in the background, and emits events
// to us via the channels in rtResp before finally closing changesCh
// to signal that the process is complete.
go stackruntime.Apply(ctx, &rtReq, &rtResp)
go stackruntime.Apply(applyCtx, &rtReq, &rtResp)
emitDiag := func(diag tfdiags.Diagnostic) {
diags := tfdiags.Diagnostics{diag}
@ -485,6 +515,13 @@ Events:
}
emitDiag(diag)
case <-stopCh:
// If our stop channel is signalled, we need to cancel the apply.
// This may result in remaining changes or diagnostics being
// emitted, so we continue to monitor those channels if they're
// still active.
cancelApply()
}
}

@ -28,7 +28,7 @@ func TestStacksOpenCloseStackConfiguration(t *testing.T) {
ctx := context.Background()
handles := newHandleTable()
stacksServer := newStacksServer(handles, &serviceOpts{})
stacksServer := newStacksServer(newStopper(), handles, &serviceOpts{})
// In normal use a client would have previously opened a source bundle
// using Dependencies.OpenSourceBundle, so we'll simulate the effect
@ -110,7 +110,7 @@ func TestStacksFindStackConfigurationComponents(t *testing.T) {
ctx := context.Background()
handles := newHandleTable()
stacksServer := newStacksServer(handles, &serviceOpts{})
stacksServer := newStacksServer(newStopper(), handles, &serviceOpts{})
// In normal use a client would have previously opened a source bundle
// using Dependencies.OpenSourceBundle, so we'll simulate the effect
@ -230,7 +230,7 @@ func TestStacksPlanStackChanges(t *testing.T) {
ctx := context.Background()
handles := newHandleTable()
stacksServer := newStacksServer(handles, &serviceOpts{})
stacksServer := newStacksServer(newStopper(), handles, &serviceOpts{})
fakeSourceBundle := &sourcebundle.Bundle{}
bundleHnd := handles.NewSourceBundle(fakeSourceBundle)

@ -0,0 +1,57 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package rpcapi
import (
"sync"
)
type stopChan chan struct{}
// stopper allows the RPC API to stop in-progress long-running operations. Each
// operation must add a new stop to the stopper, and remove it if the operation
// completes successfully. If a Stop RPC is received while the operation is
// running, the stops will all be processed, signalling to each operation that
// it should abort.
//
// Each stop is represented by a channel, which is closed to indicate that the
// operation should stop.
type stopper struct {
stops map[stopChan]struct{}
mu sync.Mutex
}
func newStopper() *stopper {
return &stopper{
stops: make(map[stopChan]struct{}),
}
}
func (s *stopper) add() stopChan {
s.mu.Lock()
defer s.mu.Unlock()
stop := make(chan struct{})
s.stops[stop] = struct{}{}
return stop
}
func (s *stopper) remove(stop stopChan) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.stops, stop)
}
func (s *stopper) stop() {
s.mu.Lock()
defer s.mu.Unlock()
for stop := range s.stops {
close(stop)
delete(s.stops, stop)
}
}

@ -225,7 +225,7 @@ func TestTelemetryInTestsGRPC(t *testing.T) {
client, close := grpcClientForTesting(ctx, t, func(srv *grpc.Server) {
setup := &setupServer{
initOthers: func(ctx context.Context, cc *terraform1.Handshake_Request) (*terraform1.ServerCapabilities, error) {
initOthers: func(ctx context.Context, cc *terraform1.Handshake_Request, stopper *stopper) (*terraform1.ServerCapabilities, error) {
return &terraform1.ServerCapabilities{}, nil
},
}

File diff suppressed because it is too large Load Diff

@ -13,6 +13,10 @@ service Setup {
//
// This function can be called only once per RPC server.
rpc Handshake(Handshake.Request) returns (Handshake.Response);
// At any time after handshaking, clients may call Stop to initiate a
// graceful shutdown of the server.
rpc Stop(Stop.Request) returns (Stop.Response);
}
message Handshake {
@ -25,6 +29,13 @@ message Handshake {
}
}
message Stop {
message Request {
}
message Response {
}
}
// The capabilities that the client wishes to advertise to the server during
// handshake.
message ClientCapabilities {

@ -503,6 +503,19 @@ func (c *ComponentConfig) checkValid(ctx context.Context, phase EvalPhase) tfdia
}
}()
// When our given context is cancelled, we want to instruct the
// modules runtime to stop the running operation. We use this
// nested context to ensure that we don't leak a goroutine when the
// parent context isn't cancelled.
operationCtx, operationCancel := context.WithCancel(ctx)
defer operationCancel()
go func() {
<-operationCtx.Done()
if ctx.Err() == context.Canceled {
tfCtx.Stop()
}
}()
diags = diags.Append(tfCtx.Validate(moduleTree, &terraform.ValidateOpts{
ExternalProviders: providerClients,
}))

@ -581,6 +581,19 @@ func (c *ComponentInstance) CheckModuleTreePlan(ctx context.Context) (*plans.Pla
deferred = true
}
// When our given context is cancelled, we want to instruct the
// modules runtime to stop the running operation. We use this
// nested context to ensure that we don't leak a goroutine when the
// parent context isn't cancelled.
operationCtx, operationCancel := context.WithCancel(ctx)
defer operationCancel()
go func() {
<-operationCtx.Done()
if ctx.Err() == context.Canceled {
tfCtx.Stop()
}
}()
// NOTE: This ComponentInstance type only deals with component
// instances currently declared in the configuration. See
// [ComponentInstanceRemoved] for the model of a component instance
@ -804,6 +817,19 @@ func (c *ComponentInstance) ApplyModuleTreePlan(ctx context.Context, plan *plans
var newState *states.State
if modifiedPlan.Applyable {
// When our given context is cancelled, we want to instruct the
// modules runtime to stop the running operation. We use this
// nested context to ensure that we don't leak a goroutine when the
// parent context isn't cancelled.
operationCtx, operationCancel := context.WithCancel(ctx)
defer operationCancel()
go func() {
<-operationCtx.Done()
if ctx.Err() == context.Canceled {
tfCtx.Stop()
}
}()
// NOTE: tfCtx.Apply tends to make changes to the given plan while it
// works, and so code after this point should not make any further use
// of either "modifiedPlan" or "plan" (since they share lots of the same

@ -206,10 +206,28 @@ func (p *ProviderInstance) CheckClient(ctx context.Context, phase EvalPhase) (pr
return stubConfiguredProvider{unknown: false}, diags
}
// If the context we recieved gets cancelled then we want providers
// to try to cancel any operations they have in progress, so we'll
// watch for that in a separate goroutine. This extra context
// is here just so we can avoid leaking this goroutine if the
// parent doesn't get cancelled.
providerCtx, localCancel := context.WithCancel(ctx)
go func() {
<-providerCtx.Done()
if ctx.Err() == context.Canceled {
// Not all providers respond to this, but some will quickly
// abort operations currently in progress and return a
// cancellation error, thus allowing us to halt more quickly
// when interrupted.
client.Stop()
}
}()
// If this provider is implemented as a separate plugin then we
// must terminate its child process once evaluation is complete.
p.main.RegisterCleanup(func(ctx context.Context) tfdiags.Diagnostics {
var diags tfdiags.Diagnostics
localCancel() // make sure our cancel-monitoring goroutine terminates
err := client.Close()
if err != nil {
diags = diags.Append(&hcl.Diagnostic{

Loading…
Cancel
Save