From 0d308110da7f4d1eb2abd0265de84ae4adebd84f Mon Sep 17 00:00:00 2001 From: Martin Atkins Date: Mon, 9 Oct 2023 17:25:08 -0700 Subject: [PATCH] rpcapi: Shared hooks for both PlanStackChanges and ApplyStackChanges Previously we needed two separate hook implementations because the progress event messages were of different Go interface types for each operation. Now the RPC API shares a single StackChangeProgress message type we can factor out the logic that actually emits the events and have the two phases differ only in how they send those common message types to the client. This also hooks some additional apply-time events that we weren't previously hooking, passing them through as RPC events in the same way as we were doing for the plan phase. --- internal/rpcapi/stacks.go | 222 +++++++++++++++++++++----------------- 1 file changed, 123 insertions(+), 99 deletions(-) diff --git a/internal/rpcapi/stacks.go b/internal/rpcapi/stacks.go index ee69dac744..df8ff6b64a 100644 --- a/internal/rpcapi/stacks.go +++ b/internal/rpcapi/stacks.go @@ -448,6 +448,35 @@ Events: } func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks { + return stackChangeHooks( + func(scp *terraform1.StackChangeProgress) error { + return evts.Send(&terraform1.PlanStackChanges_Event{ + Event: &terraform1.PlanStackChanges_Event_Progress{ + Progress: scp, + }, + }) + }, + mainStackSource, + ) +} + +func stackApplyHooks(evts *syncApplyStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks { + return stackChangeHooks( + func(scp *terraform1.StackChangeProgress) error { + return evts.Send(&terraform1.ApplyStackChanges_Event{ + Event: &terraform1.ApplyStackChanges_Event_Progress{ + Progress: scp, + }, + }) + }, + mainStackSource, + ) +} + +// stackChangeHooks is the shared hook-handling logic for both [stackPlanHooks] +// and [stackApplyHooks]. Each phase emits a different subset of the events +// handled here. +func stackChangeHooks(send func(*terraform1.StackChangeProgress) error, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks { return &stackruntime.Hooks{ // For any BeginFunc-shaped hook that returns an OpenTelemetry tracing // span, we'll wrap it in a context so that the runtime's downstream @@ -475,6 +504,21 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr return nil }, + // For the overall apply operation we don't emit any events to the client, + // since it already knows it has asked us to apply, but we do establish + // a root tracing span for all of the downstream planning operations to + // attach themselves to. + BeginApply: func(ctx context.Context, s struct{}) any { + _, span := tracer.Start(ctx, "applying", trace.WithAttributes( + attribute.String("main_stack_source", mainStackSource.String()), + )) + return span + }, + EndApply: func(ctx context.Context, span any, s struct{}) any { + span.(trace.Span).End() + return nil + }, + // After expanding a component, we emit an event to the client to // list all of the resulting instances. In the common case of an // unexpanded component, this will be a single address. @@ -483,15 +527,11 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr for _, ia := range ce.InstanceAddrs { ias = append(ias, ia.String()) } - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: &terraform1.StackChangeProgress{ - Event: &terraform1.StackChangeProgress_ComponentInstances_{ - ComponentInstances: &terraform1.StackChangeProgress_ComponentInstances{ - ComponentAddr: ce.ComponentAddr.String(), - InstanceAddrs: ias, - }, - }, + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ComponentInstances_{ + ComponentInstances: &terraform1.StackChangeProgress_ComponentInstances{ + ComponentAddr: ce.ComponentAddr.String(), + InstanceAddrs: ias, }, }, }) @@ -501,38 +541,42 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr // client, reporting the status of the plan operation. We also create a // nested tracing span for the component instance. PendingComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstancePending), - }, - }) + send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending)) }, BeginComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanning), - }, - }) + send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanning)) _, span := tracer.Start(ctx, "planning", trace.WithAttributes( attribute.String("component_instance", ci.String()), )) return span }, EndComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanned), - }, - }) + send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanned)) span.(trace.Span).End() return nil }, ErrorComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored), - }, - }) + send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored)) + span.(trace.Span).End() + return nil + }, + PendingComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) { + send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending)) + }, + BeginComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any { + send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplying)) + _, span := tracer.Start(ctx, "applying", trace.WithAttributes( + attribute.String("component_instance", ci.String()), + )) + return span + }, + EndComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any { + send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplied)) + span.(trace.Span).End() + return nil + }, + ErrorComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any { + send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored)) span.(trace.Span).End() return nil }, @@ -540,18 +584,14 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr // When Terraform core reports a resource instance plan status, we // forward it to the events client. ReportResourceInstanceStatus: func(ctx context.Context, span any, rihd *hooks.ResourceInstanceStatusHookData) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: &terraform1.StackChangeProgress{ - Event: &terraform1.StackChangeProgress_ResourceInstanceStatus_{ - ResourceInstanceStatus: &terraform1.StackChangeProgress_ResourceInstanceStatus{ - Addr: &terraform1.ResourceInstanceInStackAddr{ - ComponentInstanceAddr: rihd.Addr.Component.String(), - ResourceInstanceAddr: rihd.Addr.Item.String(), - }, - Status: rihd.Status.ForProtobuf(), - }, + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ResourceInstanceStatus_{ + ResourceInstanceStatus: &terraform1.StackChangeProgress_ResourceInstanceStatus{ + Addr: &terraform1.ResourceInstanceInStackAddr{ + ComponentInstanceAddr: rihd.Addr.Component.String(), + ResourceInstanceAddr: rihd.Addr.Item.String(), }, + Status: rihd.Status.ForProtobuf(), }, }, }) @@ -580,20 +620,16 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr imported.ImportId = ric.Change.Importing.ID } - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: &terraform1.StackChangeProgress{ - Event: &terraform1.StackChangeProgress_ResourceInstancePlannedChange_{ - ResourceInstancePlannedChange: &terraform1.StackChangeProgress_ResourceInstancePlannedChange{ - Addr: &terraform1.ResourceInstanceInStackAddr{ - ComponentInstanceAddr: ric.Addr.Component.String(), - ResourceInstanceAddr: ric.Addr.Item.String(), - }, - Actions: actions, - Moved: moved, - Imported: imported, - }, + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ResourceInstancePlannedChange_{ + ResourceInstancePlannedChange: &terraform1.StackChangeProgress_ResourceInstancePlannedChange{ + Addr: &terraform1.ResourceInstanceInStackAddr{ + ComponentInstanceAddr: ric.Addr.Component.String(), + ResourceInstanceAddr: ric.Addr.Item.String(), }, + Actions: actions, + Moved: moved, + Imported: imported, }, }, }) @@ -601,61 +637,49 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr }, // We also report a roll-up of planned resource action counts after each - // component instance plan completes. + // component instance plan or apply completes. ReportComponentInstancePlanned: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: &terraform1.StackChangeProgress{ - Event: &terraform1.StackChangeProgress_ComponentInstanceChanges_{ - ComponentInstanceChanges: &terraform1.StackChangeProgress_ComponentInstanceChanges{ - Addr: &terraform1.ComponentInstanceInStackAddr{ - ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(), - ComponentInstanceAddr: cic.Addr.String(), - }, - Total: int32(cic.Total()), - Add: int32(cic.Add), - Change: int32(cic.Change), - Import: int32(cic.Import), - Remove: int32(cic.Remove), - }, + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ComponentInstanceChanges_{ + ComponentInstanceChanges: &terraform1.StackChangeProgress_ComponentInstanceChanges{ + Addr: &terraform1.ComponentInstanceInStackAddr{ + ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(), + ComponentInstanceAddr: cic.Addr.String(), }, + Total: int32(cic.Total()), + Add: int32(cic.Add), + Change: int32(cic.Change), + Import: int32(cic.Import), + Remove: int32(cic.Remove), }, }, }) return span }, - } -} - -func stackApplyHooks(evts *syncApplyStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks { - return &stackruntime.Hooks{ - // For any BeginFunc-shaped hook that returns an OpenTelemetry tracing - // span, we'll wrap it in a context so that the runtime's downstream - // operations will appear as children of it. - ContextAttach: func(parent context.Context, tracking any) context.Context { - span, ok := tracking.(trace.Span) - if !ok { - return parent - } - return trace.ContextWithSpan(parent, span) - }, - - // For the overall apply operation we don't emit any events to the client, - // since it already knows it has asked us to apply, but we do establish - // a root tracing span for all of the downstream planning operations to - // attach themselves to. - BeginApply: func(ctx context.Context, s struct{}) any { - _, span := tracer.Start(ctx, "applying", trace.WithAttributes( - attribute.String("main_stack_source", mainStackSource.String()), - )) + // The apply rollup should typically report the same information as + // the plan one did earlier, but could vary in some situations if + // e.g. a planned update turned out to be a no-op once some unknown + // values were known, or if the apply phase is handled by a different + // version of the agent than the plan phase which has support for + // a different set of possible change types. + ReportComponentInstanceApplied: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any { + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ComponentInstanceChanges_{ + ComponentInstanceChanges: &terraform1.StackChangeProgress_ComponentInstanceChanges{ + Addr: &terraform1.ComponentInstanceInStackAddr{ + ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(), + ComponentInstanceAddr: cic.Addr.String(), + }, + Total: int32(cic.Total()), + Add: int32(cic.Add), + Change: int32(cic.Change), + Import: int32(cic.Import), + Remove: int32(cic.Remove), + }, + }, + }) return span }, - EndApply: func(ctx context.Context, span any, s struct{}) any { - span.(trace.Span).End() - return nil - }, - - // TODO: other event types } }