rpcapi: Shared hooks for both PlanStackChanges and ApplyStackChanges

Previously we needed two separate hook implementations because the progress
event messages were of different Go interface types for each operation.
Now the RPC API shares a single StackChangeProgress message type we can
factor out the logic that actually emits the events and have the two
phases differ only in how they send those common message types to the
client.

This also hooks some additional apply-time events that we weren't
previously hooking, passing them through as RPC events in the same way
as we were doing for the plan phase.
pull/34738/head
Martin Atkins 3 years ago
parent 947c483c49
commit 0d308110da

@ -448,6 +448,35 @@ Events:
}
func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
return stackChangeHooks(
func(scp *terraform1.StackChangeProgress) error {
return evts.Send(&terraform1.PlanStackChanges_Event{
Event: &terraform1.PlanStackChanges_Event_Progress{
Progress: scp,
},
})
},
mainStackSource,
)
}
func stackApplyHooks(evts *syncApplyStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
return stackChangeHooks(
func(scp *terraform1.StackChangeProgress) error {
return evts.Send(&terraform1.ApplyStackChanges_Event{
Event: &terraform1.ApplyStackChanges_Event_Progress{
Progress: scp,
},
})
},
mainStackSource,
)
}
// stackChangeHooks is the shared hook-handling logic for both [stackPlanHooks]
// and [stackApplyHooks]. Each phase emits a different subset of the events
// handled here.
func stackChangeHooks(send func(*terraform1.StackChangeProgress) error, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
return &stackruntime.Hooks{
// For any BeginFunc-shaped hook that returns an OpenTelemetry tracing
// span, we'll wrap it in a context so that the runtime's downstream
@ -475,6 +504,21 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr
return nil
},
// For the overall apply operation we don't emit any events to the client,
// since it already knows it has asked us to apply, but we do establish
// a root tracing span for all of the downstream planning operations to
// attach themselves to.
BeginApply: func(ctx context.Context, s struct{}) any {
_, span := tracer.Start(ctx, "applying", trace.WithAttributes(
attribute.String("main_stack_source", mainStackSource.String()),
))
return span
},
EndApply: func(ctx context.Context, span any, s struct{}) any {
span.(trace.Span).End()
return nil
},
// After expanding a component, we emit an event to the client to
// list all of the resulting instances. In the common case of an
// unexpanded component, this will be a single address.
@ -483,15 +527,11 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr
for _, ia := range ce.InstanceAddrs {
ias = append(ias, ia.String())
}
evts.Send(&terraform1.PlanStackChanges_Event{
Event: &terraform1.PlanStackChanges_Event_Progress{
Progress: &terraform1.StackChangeProgress{
Event: &terraform1.StackChangeProgress_ComponentInstances_{
ComponentInstances: &terraform1.StackChangeProgress_ComponentInstances{
ComponentAddr: ce.ComponentAddr.String(),
InstanceAddrs: ias,
},
},
send(&terraform1.StackChangeProgress{
Event: &terraform1.StackChangeProgress_ComponentInstances_{
ComponentInstances: &terraform1.StackChangeProgress_ComponentInstances{
ComponentAddr: ce.ComponentAddr.String(),
InstanceAddrs: ias,
},
},
})
@ -501,38 +541,42 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr
// client, reporting the status of the plan operation. We also create a
// nested tracing span for the component instance.
PendingComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) {
evts.Send(&terraform1.PlanStackChanges_Event{
Event: &terraform1.PlanStackChanges_Event_Progress{
Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstancePending),
},
})
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending))
},
BeginComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any {
evts.Send(&terraform1.PlanStackChanges_Event{
Event: &terraform1.PlanStackChanges_Event_Progress{
Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanning),
},
})
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanning))
_, span := tracer.Start(ctx, "planning", trace.WithAttributes(
attribute.String("component_instance", ci.String()),
))
return span
},
EndComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
evts.Send(&terraform1.PlanStackChanges_Event{
Event: &terraform1.PlanStackChanges_Event_Progress{
Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanned),
},
})
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanned))
span.(trace.Span).End()
return nil
},
ErrorComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
evts.Send(&terraform1.PlanStackChanges_Event{
Event: &terraform1.PlanStackChanges_Event_Progress{
Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored),
},
})
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored))
span.(trace.Span).End()
return nil
},
PendingComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending))
},
BeginComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplying))
_, span := tracer.Start(ctx, "applying", trace.WithAttributes(
attribute.String("component_instance", ci.String()),
))
return span
},
EndComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplied))
span.(trace.Span).End()
return nil
},
ErrorComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any {
send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored))
span.(trace.Span).End()
return nil
},
@ -540,18 +584,14 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr
// When Terraform core reports a resource instance plan status, we
// forward it to the events client.
ReportResourceInstanceStatus: func(ctx context.Context, span any, rihd *hooks.ResourceInstanceStatusHookData) any {
evts.Send(&terraform1.PlanStackChanges_Event{
Event: &terraform1.PlanStackChanges_Event_Progress{
Progress: &terraform1.StackChangeProgress{
Event: &terraform1.StackChangeProgress_ResourceInstanceStatus_{
ResourceInstanceStatus: &terraform1.StackChangeProgress_ResourceInstanceStatus{
Addr: &terraform1.ResourceInstanceInStackAddr{
ComponentInstanceAddr: rihd.Addr.Component.String(),
ResourceInstanceAddr: rihd.Addr.Item.String(),
},
Status: rihd.Status.ForProtobuf(),
},
send(&terraform1.StackChangeProgress{
Event: &terraform1.StackChangeProgress_ResourceInstanceStatus_{
ResourceInstanceStatus: &terraform1.StackChangeProgress_ResourceInstanceStatus{
Addr: &terraform1.ResourceInstanceInStackAddr{
ComponentInstanceAddr: rihd.Addr.Component.String(),
ResourceInstanceAddr: rihd.Addr.Item.String(),
},
Status: rihd.Status.ForProtobuf(),
},
},
})
@ -580,20 +620,16 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr
imported.ImportId = ric.Change.Importing.ID
}
evts.Send(&terraform1.PlanStackChanges_Event{
Event: &terraform1.PlanStackChanges_Event_Progress{
Progress: &terraform1.StackChangeProgress{
Event: &terraform1.StackChangeProgress_ResourceInstancePlannedChange_{
ResourceInstancePlannedChange: &terraform1.StackChangeProgress_ResourceInstancePlannedChange{
Addr: &terraform1.ResourceInstanceInStackAddr{
ComponentInstanceAddr: ric.Addr.Component.String(),
ResourceInstanceAddr: ric.Addr.Item.String(),
},
Actions: actions,
Moved: moved,
Imported: imported,
},
send(&terraform1.StackChangeProgress{
Event: &terraform1.StackChangeProgress_ResourceInstancePlannedChange_{
ResourceInstancePlannedChange: &terraform1.StackChangeProgress_ResourceInstancePlannedChange{
Addr: &terraform1.ResourceInstanceInStackAddr{
ComponentInstanceAddr: ric.Addr.Component.String(),
ResourceInstanceAddr: ric.Addr.Item.String(),
},
Actions: actions,
Moved: moved,
Imported: imported,
},
},
})
@ -601,61 +637,49 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr
},
// We also report a roll-up of planned resource action counts after each
// component instance plan completes.
// component instance plan or apply completes.
ReportComponentInstancePlanned: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any {
evts.Send(&terraform1.PlanStackChanges_Event{
Event: &terraform1.PlanStackChanges_Event_Progress{
Progress: &terraform1.StackChangeProgress{
Event: &terraform1.StackChangeProgress_ComponentInstanceChanges_{
ComponentInstanceChanges: &terraform1.StackChangeProgress_ComponentInstanceChanges{
Addr: &terraform1.ComponentInstanceInStackAddr{
ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(),
ComponentInstanceAddr: cic.Addr.String(),
},
Total: int32(cic.Total()),
Add: int32(cic.Add),
Change: int32(cic.Change),
Import: int32(cic.Import),
Remove: int32(cic.Remove),
},
send(&terraform1.StackChangeProgress{
Event: &terraform1.StackChangeProgress_ComponentInstanceChanges_{
ComponentInstanceChanges: &terraform1.StackChangeProgress_ComponentInstanceChanges{
Addr: &terraform1.ComponentInstanceInStackAddr{
ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(),
ComponentInstanceAddr: cic.Addr.String(),
},
Total: int32(cic.Total()),
Add: int32(cic.Add),
Change: int32(cic.Change),
Import: int32(cic.Import),
Remove: int32(cic.Remove),
},
},
})
return span
},
}
}
func stackApplyHooks(evts *syncApplyStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks {
return &stackruntime.Hooks{
// For any BeginFunc-shaped hook that returns an OpenTelemetry tracing
// span, we'll wrap it in a context so that the runtime's downstream
// operations will appear as children of it.
ContextAttach: func(parent context.Context, tracking any) context.Context {
span, ok := tracking.(trace.Span)
if !ok {
return parent
}
return trace.ContextWithSpan(parent, span)
},
// For the overall apply operation we don't emit any events to the client,
// since it already knows it has asked us to apply, but we do establish
// a root tracing span for all of the downstream planning operations to
// attach themselves to.
BeginApply: func(ctx context.Context, s struct{}) any {
_, span := tracer.Start(ctx, "applying", trace.WithAttributes(
attribute.String("main_stack_source", mainStackSource.String()),
))
// The apply rollup should typically report the same information as
// the plan one did earlier, but could vary in some situations if
// e.g. a planned update turned out to be a no-op once some unknown
// values were known, or if the apply phase is handled by a different
// version of the agent than the plan phase which has support for
// a different set of possible change types.
ReportComponentInstanceApplied: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any {
send(&terraform1.StackChangeProgress{
Event: &terraform1.StackChangeProgress_ComponentInstanceChanges_{
ComponentInstanceChanges: &terraform1.StackChangeProgress_ComponentInstanceChanges{
Addr: &terraform1.ComponentInstanceInStackAddr{
ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(),
ComponentInstanceAddr: cic.Addr.String(),
},
Total: int32(cic.Total()),
Add: int32(cic.Add),
Change: int32(cic.Change),
Import: int32(cic.Import),
Remove: int32(cic.Remove),
},
},
})
return span
},
EndApply: func(ctx context.Context, span any, s struct{}) any {
span.(trace.Span).End()
return nil
},
// TODO: other event types
}
}

Loading…
Cancel
Save