mirror of https://github.com/hashicorp/terraform
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1345 lines
49 KiB
1345 lines
49 KiB
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package rpcapi
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-slug/sourceaddrs"
|
|
"github.com/hashicorp/go-slug/sourcebundle"
|
|
"github.com/hashicorp/terraform-svchost/disco"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
otelCodes "go.opentelemetry.io/otel/codes"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"github.com/hashicorp/terraform/internal/addrs"
|
|
"github.com/hashicorp/terraform/internal/depsfile"
|
|
"github.com/hashicorp/terraform/internal/plans"
|
|
"github.com/hashicorp/terraform/internal/providercache"
|
|
"github.com/hashicorp/terraform/internal/providers"
|
|
"github.com/hashicorp/terraform/internal/rpcapi/terraform1"
|
|
"github.com/hashicorp/terraform/internal/rpcapi/terraform1/stacks"
|
|
"github.com/hashicorp/terraform/internal/stacks/stackaddrs"
|
|
"github.com/hashicorp/terraform/internal/stacks/stackconfig"
|
|
"github.com/hashicorp/terraform/internal/stacks/stackmigrate"
|
|
"github.com/hashicorp/terraform/internal/stacks/stackplan"
|
|
"github.com/hashicorp/terraform/internal/stacks/stackruntime"
|
|
"github.com/hashicorp/terraform/internal/stacks/stackruntime/hooks"
|
|
"github.com/hashicorp/terraform/internal/stacks/stackstate"
|
|
"github.com/hashicorp/terraform/internal/states"
|
|
"github.com/hashicorp/terraform/internal/states/statefile"
|
|
"github.com/hashicorp/terraform/internal/tfdiags"
|
|
)
|
|
|
|
type stacksServer struct {
|
|
stacks.UnimplementedStacksServer
|
|
|
|
stopper *stopper
|
|
services *disco.Disco
|
|
handles *handleTable
|
|
experimentsAllowed bool
|
|
|
|
// providerCacheOverride is a map of provider names to provider factories
|
|
// that should be used instead of the default provider cache. This is used
|
|
// within tests to side load providers without needing a real provider
|
|
// cache.
|
|
providerCacheOverride map[addrs.Provider]providers.Factory
|
|
// providerDependencyLockOverride is an in-memory override of the provider
|
|
// lockfile used for testing when the real provider is side-loaded.
|
|
providerDependencyLockOverride *depsfile.Locks
|
|
// planTimestampOverride is an in-memory override of the plan timestamp used
|
|
// for testing. This just ensures our tests aren't flaky as we can use a
|
|
// constant timestamp for the plan.
|
|
planTimestampOverride *time.Time
|
|
}
|
|
|
|
var (
|
|
_ stacks.StacksServer = (*stacksServer)(nil)
|
|
|
|
WorkspaceNameEnvVar = "TF_WORKSPACE"
|
|
)
|
|
|
|
func newStacksServer(stopper *stopper, handles *handleTable, services *disco.Disco, opts *serviceOpts) *stacksServer {
|
|
return &stacksServer{
|
|
stopper: stopper,
|
|
services: services,
|
|
handles: handles,
|
|
experimentsAllowed: opts.experimentsAllowed,
|
|
}
|
|
}
|
|
|
|
func (s *stacksServer) OpenStackConfiguration(ctx context.Context, req *stacks.OpenStackConfiguration_Request) (*stacks.OpenStackConfiguration_Response, error) {
|
|
sourcesHnd := handle[*sourcebundle.Bundle](req.SourceBundleHandle)
|
|
sources := s.handles.SourceBundle(sourcesHnd)
|
|
if sources == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given source bundle handle is invalid")
|
|
}
|
|
|
|
sourceAddr, err := resolveFinalSourceAddr(req.SourceAddress, sources)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "invalid source address: %s", err)
|
|
}
|
|
|
|
config, diags := stackconfig.LoadConfigDir(sourceAddr, sources)
|
|
if diags.HasErrors() {
|
|
// For errors in the configuration itself we treat that as a successful
|
|
// result from OpenStackConfiguration but with diagnostics in the
|
|
// response and no source handle.
|
|
return &stacks.OpenStackConfiguration_Response{
|
|
Diagnostics: diagnosticsToProto(diags),
|
|
}, nil
|
|
}
|
|
|
|
configHnd, err := s.handles.NewStackConfig(config, sourcesHnd)
|
|
if err != nil {
|
|
// The only reasonable way we can fail here is if the caller made
|
|
// a concurrent call to Dependencies.CloseSourceBundle after we
|
|
// checked the handle validity above. That'd be a very strange thing
|
|
// to do, but in the event it happens we'll just discard the config
|
|
// we loaded (since its source files on disk might be gone imminently)
|
|
// and return an error.
|
|
return nil, status.Errorf(codes.Unknown, "allocating config handle: %s", err)
|
|
}
|
|
|
|
// If we get here then we're guaranteed that the source bundle handle
|
|
// cannot be closed until the config handle is closed -- enforced by
|
|
// [handleTable]'s dependency tracking -- and so we can return the config
|
|
// handle. (The caller is required to ensure that the source bundle files
|
|
// on disk are not modified for as long as the source bundle handle remains
|
|
// open, and its lifetime will necessarily exceed the config handle.)
|
|
return &stacks.OpenStackConfiguration_Response{
|
|
StackConfigHandle: configHnd.ForProtobuf(),
|
|
Diagnostics: diagnosticsToProto(diags),
|
|
}, nil
|
|
}
|
|
|
|
func (s *stacksServer) CloseStackConfiguration(ctx context.Context, req *stacks.CloseStackConfiguration_Request) (*stacks.CloseStackConfiguration_Response, error) {
|
|
hnd := handle[*stackconfig.Config](req.StackConfigHandle)
|
|
err := s.handles.CloseStackConfig(hnd)
|
|
if err != nil {
|
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
return &stacks.CloseStackConfiguration_Response{}, nil
|
|
}
|
|
|
|
func (s *stacksServer) ValidateStackConfiguration(ctx context.Context, req *stacks.ValidateStackConfiguration_Request) (*stacks.ValidateStackConfiguration_Response, error) {
|
|
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
|
|
cfg := s.handles.StackConfig(cfgHnd)
|
|
if cfg == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
|
|
}
|
|
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
|
|
var deps *depsfile.Locks
|
|
if !depsHnd.IsNil() {
|
|
deps = s.handles.DependencyLocks(depsHnd)
|
|
if deps == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
|
|
}
|
|
} else {
|
|
deps = depsfile.NewLocks()
|
|
}
|
|
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
|
|
var providerCache *providercache.Dir
|
|
if !providerCacheHnd.IsNil() {
|
|
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
|
|
if providerCache == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
|
|
}
|
|
}
|
|
|
|
// (providerFactoriesForLocks explicitly supports a nil providerCache)
|
|
providerFactories, err := providerFactoriesForLocks(deps, providerCache)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
|
|
}
|
|
|
|
diags := stackruntime.Validate(ctx, &stackruntime.ValidateRequest{
|
|
Config: cfg,
|
|
ExperimentsAllowed: s.experimentsAllowed,
|
|
ProviderFactories: providerFactories,
|
|
DependencyLocks: *deps,
|
|
})
|
|
return &stacks.ValidateStackConfiguration_Response{
|
|
Diagnostics: diagnosticsToProto(diags),
|
|
}, nil
|
|
}
|
|
|
|
func (s *stacksServer) FindStackConfigurationComponents(ctx context.Context, req *stacks.FindStackConfigurationComponents_Request) (*stacks.FindStackConfigurationComponents_Response, error) {
|
|
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
|
|
cfg := s.handles.StackConfig(cfgHnd)
|
|
if cfg == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
|
|
}
|
|
|
|
return &stacks.FindStackConfigurationComponents_Response{
|
|
Config: stackConfigMetaforProto(cfg.Root, stackaddrs.RootStack),
|
|
}, nil
|
|
}
|
|
|
|
func stackConfigMetaforProto(cfgNode *stackconfig.ConfigNode, stackAddr stackaddrs.Stack) *stacks.FindStackConfigurationComponents_StackConfig {
|
|
ret := &stacks.FindStackConfigurationComponents_StackConfig{
|
|
Components: make(map[string]*stacks.FindStackConfigurationComponents_Component),
|
|
EmbeddedStacks: make(map[string]*stacks.FindStackConfigurationComponents_EmbeddedStack),
|
|
InputVariables: make(map[string]*stacks.FindStackConfigurationComponents_InputVariable),
|
|
OutputValues: make(map[string]*stacks.FindStackConfigurationComponents_OutputValue),
|
|
Removed: make(map[string]*stacks.FindStackConfigurationComponents_Removed),
|
|
}
|
|
|
|
for name, cc := range cfgNode.Stack.Components {
|
|
cProto := &stacks.FindStackConfigurationComponents_Component{
|
|
SourceAddr: cc.FinalSourceAddr.String(),
|
|
ComponentAddr: stackaddrs.Config(stackAddr, stackaddrs.Component{Name: cc.Name}).String(),
|
|
}
|
|
switch {
|
|
case cc.ForEach != nil:
|
|
cProto.Instances = stacks.FindStackConfigurationComponents_FOR_EACH
|
|
default:
|
|
cProto.Instances = stacks.FindStackConfigurationComponents_SINGLE
|
|
}
|
|
ret.Components[name] = cProto
|
|
}
|
|
|
|
for name, sn := range cfgNode.Children {
|
|
sc := cfgNode.Stack.EmbeddedStacks[name]
|
|
sProto := &stacks.FindStackConfigurationComponents_EmbeddedStack{
|
|
SourceAddr: sn.Stack.SourceAddr.String(),
|
|
Config: stackConfigMetaforProto(sn, stackAddr.Child(name)),
|
|
}
|
|
switch {
|
|
case sc.ForEach != nil:
|
|
sProto.Instances = stacks.FindStackConfigurationComponents_FOR_EACH
|
|
default:
|
|
sProto.Instances = stacks.FindStackConfigurationComponents_SINGLE
|
|
}
|
|
ret.EmbeddedStacks[name] = sProto
|
|
}
|
|
|
|
for name, vc := range cfgNode.Stack.InputVariables {
|
|
vProto := &stacks.FindStackConfigurationComponents_InputVariable{
|
|
Optional: !vc.DefaultValue.IsNull(),
|
|
Sensitive: vc.Sensitive,
|
|
Ephemeral: vc.Ephemeral,
|
|
}
|
|
ret.InputVariables[name] = vProto
|
|
}
|
|
|
|
for name, oc := range cfgNode.Stack.OutputValues {
|
|
oProto := &stacks.FindStackConfigurationComponents_OutputValue{
|
|
Sensitive: oc.Sensitive,
|
|
Ephemeral: oc.Ephemeral,
|
|
}
|
|
ret.OutputValues[name] = oProto
|
|
}
|
|
|
|
// Currently Components are the only thing that can be removed
|
|
for name, rc := range cfgNode.Stack.RemovedComponents.All() {
|
|
var blocks []*stacks.FindStackConfigurationComponents_Removed_Block
|
|
for _, rc := range rc {
|
|
relativeAddress := rc.From.TargetConfigComponent()
|
|
cProto := &stacks.FindStackConfigurationComponents_Removed_Block{
|
|
SourceAddr: rc.FinalSourceAddr.String(),
|
|
ComponentAddr: stackaddrs.Config(append(stackAddr, relativeAddress.Stack...), relativeAddress.Item).String(),
|
|
Destroy: rc.Destroy,
|
|
}
|
|
switch {
|
|
case rc.ForEach != nil:
|
|
cProto.Instances = stacks.FindStackConfigurationComponents_FOR_EACH
|
|
default:
|
|
cProto.Instances = stacks.FindStackConfigurationComponents_SINGLE
|
|
}
|
|
blocks = append(blocks, cProto)
|
|
}
|
|
relativeAddress := rc[0].From.TargetConfigComponent()
|
|
ret.Removed[name.String()] = &stacks.FindStackConfigurationComponents_Removed{
|
|
// in order to ensure as much backwards and forwards compatibility
|
|
// as possible, we're going to set the deprecated single fields
|
|
// with the first run block
|
|
|
|
SourceAddr: rc[0].FinalSourceAddr.String(),
|
|
Instances: func() stacks.FindStackConfigurationComponents_Instances {
|
|
switch {
|
|
case rc[0].ForEach != nil:
|
|
return stacks.FindStackConfigurationComponents_FOR_EACH
|
|
default:
|
|
return stacks.FindStackConfigurationComponents_SINGLE
|
|
}
|
|
}(),
|
|
ComponentAddr: stackaddrs.Config(append(stackAddr, relativeAddress.Stack...), relativeAddress.Item).String(),
|
|
Destroy: rc[0].Destroy,
|
|
|
|
// We return all the values here:
|
|
|
|
Blocks: blocks,
|
|
}
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
func (s *stacksServer) OpenState(stream stacks.Stacks_OpenStateServer) error {
|
|
loader := stackstate.NewLoader()
|
|
for {
|
|
item, err := stream.Recv()
|
|
if err == io.EOF {
|
|
break // All done!
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
err = loader.AddRaw(item.Raw.Key, item.Raw.Value)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "invalid raw state element: %s", err)
|
|
}
|
|
}
|
|
|
|
hnd := s.handles.NewStackState(loader.State())
|
|
return stream.SendAndClose(&stacks.OpenStackState_Response{
|
|
StateHandle: hnd.ForProtobuf(),
|
|
})
|
|
}
|
|
|
|
func (s *stacksServer) CloseState(ctx context.Context, req *stacks.CloseStackState_Request) (*stacks.CloseStackState_Response, error) {
|
|
hnd := handle[*stackstate.State](req.StateHandle)
|
|
err := s.handles.CloseStackState(hnd)
|
|
if err != nil {
|
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
return &stacks.CloseStackState_Response{}, nil
|
|
}
|
|
|
|
func (s *stacksServer) PlanStackChanges(req *stacks.PlanStackChanges_Request, evts stacks.Stacks_PlanStackChangesServer) error {
|
|
ctx := evts.Context()
|
|
syncEvts := newSyncStreamingRPCSender(evts)
|
|
evts = nil // Prevent accidental unsynchronized usage of this server
|
|
|
|
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
|
|
cfg := s.handles.StackConfig(cfgHnd)
|
|
if cfg == nil {
|
|
return status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
|
|
}
|
|
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
|
|
var deps *depsfile.Locks
|
|
if s.providerDependencyLockOverride != nil {
|
|
deps = s.providerDependencyLockOverride
|
|
} else if !depsHnd.IsNil() {
|
|
deps = s.handles.DependencyLocks(depsHnd)
|
|
if deps == nil {
|
|
return status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
|
|
}
|
|
} else {
|
|
deps = depsfile.NewLocks()
|
|
}
|
|
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
|
|
var providerCache *providercache.Dir
|
|
if !providerCacheHnd.IsNil() {
|
|
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
|
|
if providerCache == nil {
|
|
return status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
|
|
}
|
|
}
|
|
// NOTE: providerCache can be nil if no handle was provided, in which
|
|
// case the call can only use built-in providers. All code below
|
|
// must avoid panicking when providerCache is nil, but is allowed to
|
|
// return an InvalidArgument error in that case.
|
|
|
|
if req.PreviousStateHandle != 0 && len(req.PreviousState) != 0 {
|
|
return status.Error(codes.InvalidArgument, "must not set both previous_state_handle and previous_state")
|
|
}
|
|
|
|
inputValues, err := externalInputValuesFromProto(req.InputValues)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "invalid input values: %s", err)
|
|
}
|
|
|
|
var providerFactories map[addrs.Provider]providers.Factory
|
|
if s.providerCacheOverride != nil {
|
|
// This is only used in tests to side load providers without needing a
|
|
// real provider cache.
|
|
providerFactories = s.providerCacheOverride
|
|
} else {
|
|
// (providerFactoriesForLocks explicitly supports a nil providerCache)
|
|
providerFactories, err = providerFactoriesForLocks(deps, providerCache)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
|
|
}
|
|
}
|
|
|
|
// We'll hook some internal events in the planning process both to generate
|
|
// tracing information if we're in an OpenTelemetry-aware context and
|
|
// to propagate a subset of the events to our client.
|
|
hooks := stackPlanHooks(syncEvts, cfg.Root.Stack.SourceAddr)
|
|
ctx = stackruntime.ContextWithHooks(ctx, hooks)
|
|
|
|
var planMode plans.Mode
|
|
switch req.PlanMode {
|
|
case stacks.PlanMode_NORMAL:
|
|
planMode = plans.NormalMode
|
|
case stacks.PlanMode_REFRESH_ONLY:
|
|
planMode = plans.RefreshOnlyMode
|
|
case stacks.PlanMode_DESTROY:
|
|
planMode = plans.DestroyMode
|
|
default:
|
|
return status.Errorf(codes.InvalidArgument, "unsupported planning mode %d", req.PlanMode)
|
|
}
|
|
|
|
var prevState *stackstate.State
|
|
if req.PreviousStateHandle != 0 {
|
|
stateHnd := handle[*stackstate.State](req.PreviousStateHandle)
|
|
prevState = s.handles.StackState(stateHnd)
|
|
if prevState == nil {
|
|
return status.Error(codes.InvalidArgument, "the given previous state handle is invalid")
|
|
}
|
|
} else {
|
|
// Deprecated: The previous state is provided inline as a map.
|
|
// FIXME: Remove this old field once our existing clients are updated.
|
|
prevState, err = stackstate.LoadFromProto(req.PreviousState)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "can't load previous state: %s", err)
|
|
}
|
|
}
|
|
|
|
changesCh := make(chan stackplan.PlannedChange, 8)
|
|
diagsCh := make(chan tfdiags.Diagnostic, 2)
|
|
rtReq := stackruntime.PlanRequest{
|
|
PlanMode: planMode,
|
|
Config: cfg,
|
|
PrevState: prevState,
|
|
ProviderFactories: providerFactories,
|
|
InputValues: inputValues,
|
|
ExperimentsAllowed: s.experimentsAllowed,
|
|
DependencyLocks: *deps,
|
|
|
|
// planTimestampOverride will be null if not set, so it's fine for
|
|
// us to just set this all the time. In practice, this will only have
|
|
// a value in tests.
|
|
ForcePlanTimestamp: s.planTimestampOverride,
|
|
}
|
|
rtResp := stackruntime.PlanResponse{
|
|
PlannedChanges: changesCh,
|
|
Diagnostics: diagsCh,
|
|
}
|
|
|
|
// As a long-running operation, the plan RPC must be able to be stopped. We
|
|
// do this by requesting a stop channel from the stopper, and using it to
|
|
// cancel the planning process.
|
|
stopCh := s.stopper.add()
|
|
defer s.stopper.remove(stopCh)
|
|
|
|
// We create a new cancellable context for the stack plan operation to
|
|
// allow us to respond to stop requests.
|
|
planCtx, cancelPlan := context.WithCancel(ctx)
|
|
defer cancelPlan()
|
|
|
|
// The actual plan operation runs in the background, and emits events
|
|
// to us via the channels in rtResp before finally closing changesCh
|
|
// to signal that the process is complete.
|
|
go stackruntime.Plan(planCtx, &rtReq, &rtResp)
|
|
|
|
emitDiag := func(diag tfdiags.Diagnostic) {
|
|
diags := tfdiags.Diagnostics{diag}
|
|
protoDiags := diagnosticsToProto(diags)
|
|
for _, protoDiag := range protoDiags {
|
|
syncEvts.Send(&stacks.PlanStackChanges_Event{
|
|
Event: &stacks.PlanStackChanges_Event_Diagnostic{
|
|
Diagnostic: protoDiag,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// There is no strong ordering between the planned changes and the
|
|
// diagnostics, so we need to be prepared for them to arrive in any
|
|
// order. However, stackruntime.Plan does guarantee that it will
|
|
// close changesCh only after it's finished writing to and closing
|
|
// everything else, and so we can assume that once changesCh is
|
|
// closed we only need to worry about whatever's left in the
|
|
// diagsCh buffer.
|
|
Events:
|
|
for {
|
|
select {
|
|
|
|
case change, ok := <-changesCh:
|
|
if !ok {
|
|
if diagsCh != nil {
|
|
// Async work is done! We do still need to consume the rest
|
|
// of diagsCh before we stop, though, because there might
|
|
// be some extras in the channel's buffer that we didn't
|
|
// get to yet.
|
|
for diag := range diagsCh {
|
|
emitDiag(diag)
|
|
}
|
|
}
|
|
break Events
|
|
}
|
|
|
|
protoChange, err := change.PlannedChangeProto()
|
|
if err != nil {
|
|
// Should not get here: it always indicates a bug in
|
|
// PlannedChangeProto or in the code which constructed
|
|
// the change over in package stackeval.
|
|
emitDiag(tfdiags.Sourceless(
|
|
tfdiags.Error,
|
|
"Incorrectly-constructed change",
|
|
fmt.Sprintf(
|
|
"Failed to serialize a %T value for recording in the saved plan: %s.\n\nThis is a bug in Terraform; please report it!",
|
|
protoChange, err,
|
|
),
|
|
))
|
|
continue
|
|
}
|
|
|
|
syncEvts.Send(&stacks.PlanStackChanges_Event{
|
|
Event: &stacks.PlanStackChanges_Event_PlannedChange{
|
|
PlannedChange: protoChange,
|
|
},
|
|
})
|
|
|
|
case diag, ok := <-diagsCh:
|
|
if !ok {
|
|
// The diagnostics channel has closed, so we'll just stop
|
|
// trying to read from it and wait for changesCh to close,
|
|
// which will be our final signal that everything is done.
|
|
diagsCh = nil
|
|
continue
|
|
}
|
|
emitDiag(diag)
|
|
|
|
case <-stopCh:
|
|
// If our stop channel is signalled, we need to cancel the plan.
|
|
// This may result in remaining changes or diagnostics being
|
|
// emitted, so we continue to monitor those channels if they're
|
|
// still active.
|
|
cancelPlan()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *stacksServer) OpenPlan(stream stacks.Stacks_OpenPlanServer) error {
|
|
loader := stackplan.NewLoader()
|
|
for {
|
|
item, err := stream.Recv()
|
|
if err == io.EOF {
|
|
break // All done!
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
err = loader.AddRaw(item.Raw)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "invalid raw plan element: %s", err)
|
|
}
|
|
}
|
|
|
|
plan, err := loader.Plan()
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "invalid raw plan: %s", err)
|
|
}
|
|
hnd := s.handles.NewStackPlan(plan)
|
|
return stream.SendAndClose(&stacks.OpenStackPlan_Response{
|
|
PlanHandle: hnd.ForProtobuf(),
|
|
})
|
|
}
|
|
|
|
func (s *stacksServer) ClosePlan(ctx context.Context, req *stacks.CloseStackPlan_Request) (*stacks.CloseStackPlan_Response, error) {
|
|
hnd := handle[*stackplan.Plan](req.PlanHandle)
|
|
err := s.handles.CloseStackPlan(hnd)
|
|
if err != nil {
|
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
return &stacks.CloseStackPlan_Response{}, nil
|
|
}
|
|
|
|
func (s *stacksServer) ApplyStackChanges(req *stacks.ApplyStackChanges_Request, evts stacks.Stacks_ApplyStackChangesServer) error {
|
|
ctx := evts.Context()
|
|
syncEvts := newSyncStreamingRPCSender(evts)
|
|
evts = nil // Prevent accidental unsynchronized usage of this server
|
|
|
|
if req.PlanHandle != 0 && len(req.PlannedChanges) != 0 {
|
|
return status.Error(codes.InvalidArgument, "must not set both plan_handle and planned_changes")
|
|
}
|
|
var plan *stackplan.Plan
|
|
if req.PlanHandle != 0 {
|
|
planHnd := handle[*stackplan.Plan](req.PlanHandle)
|
|
plan = s.handles.StackPlan(planHnd)
|
|
if plan == nil {
|
|
return status.Error(codes.InvalidArgument, "the given plan handle is invalid")
|
|
}
|
|
// The plan handle is immediately invalidated by trying to apply it;
|
|
// plans are not reusable because they are valid only against the
|
|
// exact prior state they were generated for.
|
|
if err := s.handles.CloseStackPlan(planHnd); err != nil {
|
|
// It would be very strange to get here!
|
|
return status.Error(codes.Internal, "failed to close the plan handle")
|
|
}
|
|
} else {
|
|
// Deprecated: whole plan specified inline
|
|
// FIXME: Remove this old field once our existing clients are updated.
|
|
var err error
|
|
plan, err = stackplan.LoadFromProto(req.PlannedChanges)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "invalid planned_changes: %s", err)
|
|
}
|
|
}
|
|
|
|
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
|
|
cfg := s.handles.StackConfig(cfgHnd)
|
|
if cfg == nil {
|
|
return status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
|
|
}
|
|
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
|
|
var deps *depsfile.Locks
|
|
if s.providerDependencyLockOverride != nil {
|
|
deps = s.providerDependencyLockOverride
|
|
} else if !depsHnd.IsNil() {
|
|
deps = s.handles.DependencyLocks(depsHnd)
|
|
if deps == nil {
|
|
return status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
|
|
}
|
|
} else {
|
|
deps = depsfile.NewLocks()
|
|
}
|
|
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
|
|
var providerCache *providercache.Dir
|
|
if !providerCacheHnd.IsNil() {
|
|
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
|
|
if providerCache == nil {
|
|
return status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
|
|
}
|
|
}
|
|
var providerFactories map[addrs.Provider]providers.Factory
|
|
if s.providerCacheOverride != nil {
|
|
// This is only used in tests to side load providers without needing a
|
|
// real provider cache.
|
|
providerFactories = s.providerCacheOverride
|
|
} else {
|
|
// NOTE: providerCache can be nil if no handle was provided, in which
|
|
// case the call can only use built-in providers. All code below
|
|
// must avoid panicking when providerCache is nil, but is allowed to
|
|
// return an InvalidArgument error in that case.
|
|
// (providerFactoriesForLocks explicitly supports a nil providerCache)
|
|
var err error
|
|
// (providerFactoriesForLocks explicitly supports a nil providerCache)
|
|
providerFactories, err = providerFactoriesForLocks(deps, providerCache)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
|
|
}
|
|
}
|
|
|
|
inputValues, err := externalInputValuesFromProto(req.InputValues)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "invalid input values: %s", err)
|
|
}
|
|
|
|
// We'll hook some internal events in the planning process both to generate
|
|
// tracing information if we're in an OpenTelemetry-aware context and
|
|
// to propagate a subset of the events to our client.
|
|
hooks := stackApplyHooks(syncEvts, cfg.Root.Stack.SourceAddr)
|
|
ctx = stackruntime.ContextWithHooks(ctx, hooks)
|
|
|
|
changesCh := make(chan stackstate.AppliedChange, 8)
|
|
diagsCh := make(chan tfdiags.Diagnostic, 2)
|
|
rtReq := stackruntime.ApplyRequest{
|
|
Config: cfg,
|
|
InputValues: inputValues,
|
|
ProviderFactories: providerFactories,
|
|
Plan: plan,
|
|
ExperimentsAllowed: s.experimentsAllowed,
|
|
DependencyLocks: *deps,
|
|
}
|
|
rtResp := stackruntime.ApplyResponse{
|
|
AppliedChanges: changesCh,
|
|
Diagnostics: diagsCh,
|
|
}
|
|
|
|
// As a long-running operation, the apply RPC must be able to be stopped.
|
|
// We do this by requesting a stop channel from the stopper, and using it
|
|
// to cancel the planning process.
|
|
stopCh := s.stopper.add()
|
|
defer s.stopper.remove(stopCh)
|
|
|
|
// We create a new cancellable context for the stack plan operation to
|
|
// allow us to respond to stop requests.
|
|
applyCtx, cancelApply := context.WithCancel(ctx)
|
|
defer cancelApply()
|
|
|
|
// The actual apply operation runs in the background, and emits events
|
|
// to us via the channels in rtResp before finally closing changesCh
|
|
// to signal that the process is complete.
|
|
go stackruntime.Apply(applyCtx, &rtReq, &rtResp)
|
|
|
|
emitDiag := func(diag tfdiags.Diagnostic) {
|
|
diags := tfdiags.Diagnostics{diag}
|
|
protoDiags := diagnosticsToProto(diags)
|
|
for _, protoDiag := range protoDiags {
|
|
syncEvts.Send(&stacks.ApplyStackChanges_Event{
|
|
Event: &stacks.ApplyStackChanges_Event_Diagnostic{
|
|
Diagnostic: protoDiag,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// There is no strong ordering between the planned changes and the
|
|
// diagnostics, so we need to be prepared for them to arrive in any
|
|
// order. However, stackruntime.Apply does guarantee that it will
|
|
// close changesCh only after it's finished writing to and closing
|
|
// everything else, and so we can assume that once changesCh is
|
|
// closed we only need to worry about whatever's left in the
|
|
// diagsCh buffer.
|
|
Events:
|
|
for {
|
|
select {
|
|
|
|
case change, ok := <-changesCh:
|
|
if !ok {
|
|
if diagsCh != nil {
|
|
// Async work is done! We do still need to consume the rest
|
|
// of diagsCh before we stop, though, because there might
|
|
// be some extras in the channel's buffer that we didn't
|
|
// get to yet.
|
|
for diag := range diagsCh {
|
|
emitDiag(diag)
|
|
}
|
|
}
|
|
break Events
|
|
}
|
|
|
|
protoChange, err := change.AppliedChangeProto()
|
|
if err != nil {
|
|
// Should not get here: it always indicates a bug in
|
|
// AppliedChangeProto or in the code which constructed
|
|
// the change over in package stackeval.
|
|
// If we get here then it's likely that something will be
|
|
// left stale in the final stack state, so we should really
|
|
// avoid ever getting here.
|
|
emitDiag(tfdiags.Sourceless(
|
|
tfdiags.Error,
|
|
"Incorrectly-constructed apply result",
|
|
fmt.Sprintf(
|
|
"Failed to serialize a %T value for recording in the updated state: %s.\n\nThis is a bug in Terraform; please report it!",
|
|
protoChange, err,
|
|
),
|
|
))
|
|
continue
|
|
}
|
|
|
|
syncEvts.Send(&stacks.ApplyStackChanges_Event{
|
|
Event: &stacks.ApplyStackChanges_Event_AppliedChange{
|
|
AppliedChange: protoChange,
|
|
},
|
|
})
|
|
|
|
case diag, ok := <-diagsCh:
|
|
if !ok {
|
|
// The diagnostics channel has closed, so we'll just stop
|
|
// trying to read from it and wait for changesCh to close,
|
|
// which will be our final signal that everything is done.
|
|
diagsCh = nil
|
|
continue
|
|
}
|
|
emitDiag(diag)
|
|
|
|
case <-stopCh:
|
|
// If our stop channel is signalled, we need to cancel the apply.
|
|
// This may result in remaining changes or diagnostics being
|
|
// emitted, so we continue to monitor those channels if they're
|
|
// still active.
|
|
cancelApply()
|
|
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *stacksServer) OpenStackInspector(ctx context.Context, req *stacks.OpenStackInspector_Request) (*stacks.OpenStackInspector_Response, error) {
|
|
cfgHnd := handle[*stackconfig.Config](req.StackConfigHandle)
|
|
cfg := s.handles.StackConfig(cfgHnd)
|
|
if cfg == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given stack configuration handle is invalid")
|
|
}
|
|
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
|
|
var deps *depsfile.Locks
|
|
if !depsHnd.IsNil() {
|
|
deps = s.handles.DependencyLocks(depsHnd)
|
|
if deps == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
|
|
}
|
|
} else {
|
|
deps = depsfile.NewLocks()
|
|
}
|
|
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
|
|
var providerCache *providercache.Dir
|
|
if !providerCacheHnd.IsNil() {
|
|
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
|
|
if providerCache == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
|
|
}
|
|
}
|
|
// NOTE: providerCache can be nil if no handle was provided, in which
|
|
// case the call can only use built-in providers. All code below
|
|
// must avoid panicking when providerCache is nil, but is allowed to
|
|
// return an InvalidArgument error in that case.
|
|
// (providerFactoriesForLocks explicitly supports a nil providerCache)
|
|
providerFactories, err := providerFactoriesForLocks(deps, providerCache)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
|
|
}
|
|
inputValues, err := externalInputValuesFromProto(req.InputValues)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "invalid input values: %s", err)
|
|
}
|
|
state, err := stackstate.LoadFromProto(req.State)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "can't load state snapshot: %s", err)
|
|
}
|
|
|
|
hnd := s.handles.NewStackInspector(&stacksInspector{
|
|
Config: cfg,
|
|
State: state,
|
|
ProviderFactories: providerFactories,
|
|
InputValues: inputValues,
|
|
ExperimentsAllowed: s.experimentsAllowed,
|
|
})
|
|
|
|
return &stacks.OpenStackInspector_Response{
|
|
StackInspectorHandle: hnd.ForProtobuf(),
|
|
// There are currently no situations that return diagnostics, but
|
|
// we reserve the right to add some later.
|
|
}, nil
|
|
}
|
|
|
|
func (s *stacksServer) ListResourceIdentities(ctx context.Context, req *stacks.ListResourceIdentities_Request) (*stacks.ListResourceIdentities_Response, error) {
|
|
hnd := handle[*stackstate.State](req.StateHandle)
|
|
stackState := s.handles.StackState(hnd)
|
|
if stackState == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given stack state handle is invalid")
|
|
}
|
|
|
|
depsHnd := handle[*depsfile.Locks](req.DependencyLocksHandle)
|
|
var deps *depsfile.Locks
|
|
if !depsHnd.IsNil() {
|
|
deps = s.handles.DependencyLocks(depsHnd)
|
|
if deps == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
|
|
}
|
|
} else {
|
|
deps = depsfile.NewLocks()
|
|
}
|
|
providerCacheHnd := handle[*providercache.Dir](req.ProviderCacheHandle)
|
|
var providerCache *providercache.Dir
|
|
if !providerCacheHnd.IsNil() {
|
|
providerCache = s.handles.ProviderPluginCache(providerCacheHnd)
|
|
if providerCache == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
|
|
}
|
|
}
|
|
// NOTE: providerCache can be nil if no handle was provided, in which
|
|
// case the call can only use built-in providers. All code below
|
|
// must avoid panicking when providerCache is nil, but is allowed to
|
|
// return an InvalidArgument error in that case.
|
|
// (providerFactoriesForLocks explicitly supports a nil providerCache)
|
|
providerFactories, err := providerFactoriesForLocks(deps, providerCache)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
|
|
}
|
|
|
|
identitySchemas := make(map[addrs.Provider]map[string]providers.IdentitySchema)
|
|
for name, factory := range providerFactories {
|
|
provider, err := factory()
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "provider %s failed to initialize: %s", name, err)
|
|
}
|
|
|
|
schema := provider.GetResourceIdentitySchemas()
|
|
if len(schema.Diagnostics) > 0 {
|
|
return nil, status.Errorf(codes.InvalidArgument, "provider %s failed to retrieve schema: %s", name, schema.Diagnostics.Err())
|
|
} else {
|
|
identitySchemas[name] = schema.IdentityTypes
|
|
}
|
|
}
|
|
|
|
resourceIdentities, err := listResourceIdentities(stackState, identitySchemas)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &stacks.ListResourceIdentities_Response{
|
|
Resource: resourceIdentities,
|
|
}, nil
|
|
}
|
|
|
|
func (s *stacksServer) InspectExpressionResult(ctx context.Context, req *stacks.InspectExpressionResult_Request) (*stacks.InspectExpressionResult_Response, error) {
|
|
hnd := handle[*stacksInspector](req.StackInspectorHandle)
|
|
insp := s.handles.StackInspector(hnd)
|
|
if insp == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "the given stack inspector handle is invalid")
|
|
}
|
|
return insp.InspectExpressionResult(ctx, req)
|
|
}
|
|
|
|
func (s *stacksServer) OpenTerraformState(ctx context.Context, request *stacks.OpenTerraformState_Request) (*stacks.OpenTerraformState_Response, error) {
|
|
switch data := request.State.(type) {
|
|
case *stacks.OpenTerraformState_Request_ConfigPath:
|
|
// Load the state from the backend.
|
|
// This function should return an empty state even if the diags
|
|
// has errors. This makes it easier for the caller, as they should
|
|
// close the state handle regardless of the diags.
|
|
loader := stackmigrate.Loader{Discovery: s.services}
|
|
state, diags := loader.LoadState(data.ConfigPath)
|
|
|
|
hnd := s.handles.NewTerraformState(state)
|
|
return &stacks.OpenTerraformState_Response{
|
|
StateHandle: hnd.ForProtobuf(),
|
|
Diagnostics: diagnosticsToProto(diags),
|
|
}, nil
|
|
|
|
case *stacks.OpenTerraformState_Request_Raw:
|
|
// load the state from the raw data
|
|
file, err := statefile.Read(bytes.NewReader(data.Raw))
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "invalid raw state data: %s", err)
|
|
}
|
|
|
|
hnd := s.handles.NewTerraformState(file.State)
|
|
return &stacks.OpenTerraformState_Response{
|
|
StateHandle: hnd.ForProtobuf(),
|
|
}, nil
|
|
|
|
default:
|
|
return nil, status.Error(codes.InvalidArgument, "invalid state source")
|
|
}
|
|
}
|
|
|
|
func (s *stacksServer) CloseTerraformState(ctx context.Context, request *stacks.CloseTerraformState_Request) (*stacks.CloseTerraformState_Response, error) {
|
|
hnd := handle[*states.State](request.StateHandle)
|
|
err := s.handles.CloseTerraformState(hnd)
|
|
if err != nil {
|
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
return new(stacks.CloseTerraformState_Response), nil
|
|
}
|
|
|
|
func (s *stacksServer) MigrateTerraformState(request *stacks.MigrateTerraformState_Request, server stacks.Stacks_MigrateTerraformStateServer) error {
|
|
|
|
previousStateHandle := handle[*states.State](request.StateHandle)
|
|
previousState := s.handles.TerraformState(previousStateHandle)
|
|
if previousState == nil {
|
|
return status.Error(codes.InvalidArgument, "the given state handle is invalid")
|
|
}
|
|
|
|
configHandle := handle[*stackconfig.Config](request.ConfigHandle)
|
|
config := s.handles.StackConfig(configHandle)
|
|
if config == nil {
|
|
return status.Error(codes.InvalidArgument, "the given config handle is invalid")
|
|
}
|
|
|
|
dependencyLocksHandle := handle[*depsfile.Locks](request.DependencyLocksHandle)
|
|
dependencyLocks := s.handles.DependencyLocks(dependencyLocksHandle)
|
|
if dependencyLocks == nil {
|
|
return status.Error(codes.InvalidArgument, "the given dependency locks handle is invalid")
|
|
}
|
|
|
|
var providerFactories map[addrs.Provider]providers.Factory
|
|
if s.providerCacheOverride != nil {
|
|
// This is only used in tests to side load providers without needing a
|
|
// real provider cache.
|
|
providerFactories = s.providerCacheOverride
|
|
} else {
|
|
providerCacheHandle := handle[*providercache.Dir](request.ProviderCacheHandle)
|
|
providerCache := s.handles.ProviderPluginCache(providerCacheHandle)
|
|
if providerCache == nil {
|
|
return status.Error(codes.InvalidArgument, "the given provider cache handle is invalid")
|
|
}
|
|
|
|
var err error
|
|
providerFactories, err = providerFactoriesForLocks(dependencyLocks, providerCache)
|
|
if err != nil {
|
|
return status.Errorf(codes.InvalidArgument, "provider dependencies are inconsistent: %s", err)
|
|
}
|
|
}
|
|
|
|
migrate := &stackmigrate.Migration{
|
|
Providers: providerFactories,
|
|
PreviousState: previousState,
|
|
Config: config,
|
|
}
|
|
|
|
emit := func(change stackstate.AppliedChange) {
|
|
proto, err := change.AppliedChangeProto()
|
|
if err != nil {
|
|
server.Send(&stacks.MigrateTerraformState_Event{
|
|
Result: &stacks.MigrateTerraformState_Event_Diagnostic{
|
|
Diagnostic: &terraform1.Diagnostic{
|
|
Severity: terraform1.Diagnostic_ERROR,
|
|
Summary: "Failed to serialize change",
|
|
Detail: fmt.Sprintf("Failed to serialize state change for recording in the migration plan: %s", err),
|
|
},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
server.Send(&stacks.MigrateTerraformState_Event{
|
|
Result: &stacks.MigrateTerraformState_Event_AppliedChange{
|
|
AppliedChange: proto,
|
|
},
|
|
})
|
|
}
|
|
|
|
emitDiag := func(diagnostic tfdiags.Diagnostic) {
|
|
server.Send(&stacks.MigrateTerraformState_Event{
|
|
Result: &stacks.MigrateTerraformState_Event_Diagnostic{
|
|
Diagnostic: diagnosticToProto(diagnostic),
|
|
},
|
|
})
|
|
}
|
|
|
|
mapping := request.GetMapping()
|
|
if mapping == nil {
|
|
return status.Error(codes.InvalidArgument, "missing migration mapping")
|
|
}
|
|
switch mapping := mapping.(type) {
|
|
case *stacks.MigrateTerraformState_Request_Simple:
|
|
migrate.Migrate(
|
|
mapping.Simple.ResourceAddressMap,
|
|
mapping.Simple.ModuleAddressMap,
|
|
emit, emitDiag)
|
|
default:
|
|
return status.Error(codes.InvalidArgument, "unsupported migration mapping")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
|
|
return stackChangeHooks(
|
|
func(scp *stacks.StackChangeProgress) error {
|
|
return evts.Send(&stacks.PlanStackChanges_Event{
|
|
Event: &stacks.PlanStackChanges_Event_Progress{
|
|
Progress: scp,
|
|
},
|
|
})
|
|
},
|
|
mainStackSource,
|
|
)
|
|
}
|
|
|
|
func stackApplyHooks(evts *syncApplyStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
|
|
return stackChangeHooks(
|
|
func(scp *stacks.StackChangeProgress) error {
|
|
return evts.Send(&stacks.ApplyStackChanges_Event{
|
|
Event: &stacks.ApplyStackChanges_Event_Progress{
|
|
Progress: scp,
|
|
},
|
|
})
|
|
},
|
|
mainStackSource,
|
|
)
|
|
}
|
|
|
|
// stackChangeHooks is the shared hook-handling logic for both [stackPlanHooks]
|
|
// and [stackApplyHooks]. Each phase emits a different subset of the events
|
|
// handled here.
|
|
func stackChangeHooks(send func(*stacks.StackChangeProgress) error, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
|
|
return &stackruntime.Hooks{
|
|
// For any BeginFunc-shaped hook that returns an OpenTelemetry tracing
|
|
// span, we'll wrap it in a context so that the runtime's downstream
|
|
// operations will appear as children of it.
|
|
ContextAttach: func(parent context.Context, tracking any) context.Context {
|
|
span, ok := tracking.(trace.Span)
|
|
if !ok {
|
|
return parent
|
|
}
|
|
return trace.ContextWithSpan(parent, span)
|
|
},
|
|
|
|
// For the overall plan operation we don't emit any events to the client,
|
|
// since it already knows it has asked us to plan, but we do establish
|
|
// a root tracing span for all of the downstream planning operations to
|
|
// attach themselves to.
|
|
BeginPlan: func(ctx context.Context, s struct{}) any {
|
|
_, span := tracer.Start(ctx, "planning", trace.WithAttributes(
|
|
attribute.String("main_stack_source", mainStackSource.String()),
|
|
))
|
|
return span
|
|
},
|
|
EndPlan: func(ctx context.Context, span any, s struct{}) any {
|
|
span.(trace.Span).End()
|
|
return nil
|
|
},
|
|
|
|
// For the overall apply operation we don't emit any events to the client,
|
|
// since it already knows it has asked us to apply, but we do establish
|
|
// a root tracing span for all of the downstream planning operations to
|
|
// attach themselves to.
|
|
BeginApply: func(ctx context.Context, s struct{}) any {
|
|
_, span := tracer.Start(ctx, "applying", trace.WithAttributes(
|
|
attribute.String("main_stack_source", mainStackSource.String()),
|
|
))
|
|
return span
|
|
},
|
|
EndApply: func(ctx context.Context, span any, s struct{}) any {
|
|
span.(trace.Span).End()
|
|
return nil
|
|
},
|
|
|
|
// After expanding a component, we emit an event to the client to
|
|
// list all of the resulting instances. In the common case of an
|
|
// unexpanded component, this will be a single address.
|
|
ComponentExpanded: func(ctx context.Context, ce *hooks.ComponentInstances) {
|
|
ias := make([]string, 0, len(ce.InstanceAddrs))
|
|
for _, ia := range ce.InstanceAddrs {
|
|
ias = append(ias, ia.String())
|
|
}
|
|
send(&stacks.StackChangeProgress{
|
|
Event: &stacks.StackChangeProgress_ComponentInstances_{
|
|
ComponentInstances: &stacks.StackChangeProgress_ComponentInstances{
|
|
ComponentAddr: ce.ComponentAddr.String(),
|
|
InstanceAddrs: ias,
|
|
},
|
|
},
|
|
})
|
|
},
|
|
|
|
// For each component instance, we emit a series of events to the
|
|
// client, reporting the status of the plan operation. We also create a
|
|
// nested tracing span for the component instance.
|
|
PendingComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) {
|
|
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending))
|
|
},
|
|
BeginComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any {
|
|
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanning))
|
|
_, span := tracer.Start(ctx, "planning", trace.WithAttributes(
|
|
attribute.String("component_instance", ci.String()),
|
|
))
|
|
return span
|
|
},
|
|
EndComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
|
|
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanned))
|
|
span.(trace.Span).SetStatus(otelCodes.Ok, "planning succeeded")
|
|
span.(trace.Span).End()
|
|
return nil
|
|
},
|
|
ErrorComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
|
|
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored))
|
|
span.(trace.Span).SetStatus(otelCodes.Error, "planning failed")
|
|
span.(trace.Span).End()
|
|
return nil
|
|
},
|
|
DeferComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
|
|
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceDeferred))
|
|
span.(trace.Span).SetStatus(otelCodes.Error, "planning succeeded, but deferred")
|
|
span.(trace.Span).End()
|
|
return nil
|
|
},
|
|
PendingComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) {
|
|
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending))
|
|
},
|
|
BeginComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any {
|
|
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplying))
|
|
_, span := tracer.Start(ctx, "applying", trace.WithAttributes(
|
|
attribute.String("component_instance", ci.String()),
|
|
))
|
|
return span
|
|
},
|
|
EndComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
|
|
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplied))
|
|
span.(trace.Span).SetStatus(otelCodes.Ok, "applying succeeded")
|
|
span.(trace.Span).End()
|
|
return nil
|
|
},
|
|
ErrorComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
|
|
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored))
|
|
span.(trace.Span).SetStatus(otelCodes.Error, "applying failed")
|
|
span.(trace.Span).End()
|
|
return nil
|
|
},
|
|
|
|
// When Terraform core reports a resource instance plan status, we
|
|
// forward it to the events client.
|
|
ReportResourceInstanceStatus: func(ctx context.Context, span any, rihd *hooks.ResourceInstanceStatusHookData) any {
|
|
// addrs.Provider.String() will panic on the zero value. In this
|
|
// case, holding a zero provider would mean a bug in our event
|
|
// logging code rather than in core logic, so avoid exploding, but
|
|
// send a blank string to expose the error later.
|
|
providerAddr := ""
|
|
if !rihd.ProviderAddr.IsZero() {
|
|
providerAddr = rihd.ProviderAddr.String()
|
|
}
|
|
send(&stacks.StackChangeProgress{
|
|
Event: &stacks.StackChangeProgress_ResourceInstanceStatus_{
|
|
ResourceInstanceStatus: &stacks.StackChangeProgress_ResourceInstanceStatus{
|
|
Addr: stacks.NewResourceInstanceObjectInStackAddr(rihd.Addr),
|
|
Status: rihd.Status.ForProtobuf(),
|
|
ProviderAddr: providerAddr,
|
|
},
|
|
},
|
|
})
|
|
return span
|
|
},
|
|
|
|
// Upon completion of a component instance plan, we emit a planned
|
|
// change sumary event to the client for each resource instance.
|
|
ReportResourceInstancePlanned: func(ctx context.Context, span any, ric *hooks.ResourceInstanceChange) any {
|
|
span.(trace.Span).AddEvent("planned resource instance", trace.WithAttributes(
|
|
attribute.String("component_instance", ric.Addr.Component.String()),
|
|
attribute.String("resource_instance", ric.Addr.Item.String()),
|
|
))
|
|
|
|
ripc, err := resourceInstancePlanned(ric)
|
|
if err != nil {
|
|
return span
|
|
}
|
|
|
|
send(&stacks.StackChangeProgress{
|
|
Event: &stacks.StackChangeProgress_ResourceInstancePlannedChange_{
|
|
ResourceInstancePlannedChange: ripc,
|
|
},
|
|
})
|
|
return span
|
|
},
|
|
|
|
ReportResourceInstanceDeferred: func(ctx context.Context, span any, change *hooks.DeferredResourceInstanceChange) any {
|
|
span.(trace.Span).AddEvent("deferred resource instance", trace.WithAttributes(
|
|
attribute.String("component_instance", change.Change.Addr.Component.String()),
|
|
attribute.String("resource_instance", change.Change.Addr.Item.String()),
|
|
))
|
|
|
|
ripc, err := resourceInstancePlanned(change.Change)
|
|
if err != nil {
|
|
return span
|
|
}
|
|
|
|
deferred := stackplan.EncodeDeferred(change.Reason)
|
|
|
|
send(&stacks.StackChangeProgress{
|
|
Event: &stacks.StackChangeProgress_DeferredResourceInstancePlannedChange_{
|
|
DeferredResourceInstancePlannedChange: &stacks.StackChangeProgress_DeferredResourceInstancePlannedChange{
|
|
Change: ripc,
|
|
Deferred: deferred,
|
|
},
|
|
},
|
|
})
|
|
return span
|
|
},
|
|
|
|
// We also report a roll-up of planned resource action counts after each
|
|
// component instance plan or apply completes.
|
|
ReportComponentInstancePlanned: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any {
|
|
send(&stacks.StackChangeProgress{
|
|
Event: &stacks.StackChangeProgress_ComponentInstanceChanges_{
|
|
ComponentInstanceChanges: &stacks.StackChangeProgress_ComponentInstanceChanges{
|
|
Addr: &stacks.ComponentInstanceInStackAddr{
|
|
ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(),
|
|
ComponentInstanceAddr: cic.Addr.String(),
|
|
},
|
|
Total: int32(cic.Total()),
|
|
Add: int32(cic.Add),
|
|
Change: int32(cic.Change),
|
|
Import: int32(cic.Import),
|
|
Remove: int32(cic.Remove),
|
|
Defer: int32(cic.Defer),
|
|
Move: int32(cic.Move),
|
|
Forget: int32(cic.Forget),
|
|
},
|
|
},
|
|
})
|
|
return span
|
|
},
|
|
// The apply rollup should typically report the same information as
|
|
// the plan one did earlier, but could vary in some situations if
|
|
// e.g. a planned update turned out to be a no-op once some unknown
|
|
// values were known, or if the apply phase is handled by a different
|
|
// version of the agent than the plan phase which has support for
|
|
// a different set of possible change types.
|
|
ReportComponentInstanceApplied: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any {
|
|
send(&stacks.StackChangeProgress{
|
|
Event: &stacks.StackChangeProgress_ComponentInstanceChanges_{
|
|
ComponentInstanceChanges: &stacks.StackChangeProgress_ComponentInstanceChanges{
|
|
Addr: &stacks.ComponentInstanceInStackAddr{
|
|
ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(),
|
|
ComponentInstanceAddr: cic.Addr.String(),
|
|
},
|
|
Total: int32(cic.Total()),
|
|
Add: int32(cic.Add),
|
|
Change: int32(cic.Change),
|
|
Import: int32(cic.Import),
|
|
Remove: int32(cic.Remove),
|
|
Defer: int32(cic.Defer),
|
|
Move: int32(cic.Move),
|
|
Forget: int32(cic.Forget),
|
|
},
|
|
},
|
|
})
|
|
return span
|
|
},
|
|
}
|
|
}
|
|
|
|
func resourceInstancePlanned(ric *hooks.ResourceInstanceChange) (*stacks.StackChangeProgress_ResourceInstancePlannedChange, error) {
|
|
actions, err := stacks.ChangeTypesForPlanAction(ric.Change.Action)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var moved *stacks.StackChangeProgress_ResourceInstancePlannedChange_Moved
|
|
if !ric.Change.PrevRunAddr.Equal(ric.Change.Addr) {
|
|
moved = &stacks.StackChangeProgress_ResourceInstancePlannedChange_Moved{
|
|
PrevAddr: &stacks.ResourceInstanceInStackAddr{
|
|
ComponentInstanceAddr: ric.Addr.Component.String(),
|
|
ResourceInstanceAddr: ric.Change.PrevRunAddr.String(),
|
|
},
|
|
}
|
|
}
|
|
|
|
var imported *stacks.StackChangeProgress_ResourceInstancePlannedChange_Imported
|
|
if ric.Change.Importing != nil {
|
|
imported = &stacks.StackChangeProgress_ResourceInstancePlannedChange_Imported{
|
|
ImportId: ric.Change.Importing.ID,
|
|
Unknown: ric.Change.Importing.Unknown,
|
|
}
|
|
}
|
|
|
|
return &stacks.StackChangeProgress_ResourceInstancePlannedChange{
|
|
Addr: stacks.NewResourceInstanceObjectInStackAddr(ric.Addr),
|
|
Actions: actions,
|
|
Moved: moved,
|
|
Imported: imported,
|
|
ProviderAddr: ric.Change.ProviderAddr.Provider.String(),
|
|
}, nil
|
|
}
|
|
|
|
func evtComponentInstanceStatus(ci stackaddrs.AbsComponentInstance, status hooks.ComponentInstanceStatus) *stacks.StackChangeProgress {
|
|
return &stacks.StackChangeProgress{
|
|
Event: &stacks.StackChangeProgress_ComponentInstanceStatus_{
|
|
ComponentInstanceStatus: &stacks.StackChangeProgress_ComponentInstanceStatus{
|
|
Addr: &stacks.ComponentInstanceInStackAddr{
|
|
ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(ci).String(),
|
|
ComponentInstanceAddr: ci.String(),
|
|
},
|
|
Status: status.ForProtobuf(),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// syncPlanStackChangesServer is a wrapper around a
|
|
// stacks.Stacks_PlanStackChangesServer implementation that makes the
|
|
// Send method concurrency-safe by holding a mutex throughout the underlying
|
|
// call.
|
|
type syncPlanStackChangesServer = syncStreamingRPCSender[stacks.Stacks_PlanStackChangesServer, *stacks.PlanStackChanges_Event]
|
|
|
|
// syncApplyStackChangesServer is a wrapper around a
|
|
// stacks.Stacks_ApplyStackChangesServer implementation that makes the
|
|
// Send method concurrency-safe by holding a mutex throughout the underlying
|
|
// call.
|
|
type syncApplyStackChangesServer = syncStreamingRPCSender[stacks.Stacks_ApplyStackChangesServer, *stacks.ApplyStackChanges_Event]
|