diff --git a/internal/rpcapi/stacks.go b/internal/rpcapi/stacks.go index ee69dac744..df8ff6b64a 100644 --- a/internal/rpcapi/stacks.go +++ b/internal/rpcapi/stacks.go @@ -448,6 +448,35 @@ Events: } func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks { + return stackChangeHooks( + func(scp *terraform1.StackChangeProgress) error { + return evts.Send(&terraform1.PlanStackChanges_Event{ + Event: &terraform1.PlanStackChanges_Event_Progress{ + Progress: scp, + }, + }) + }, + mainStackSource, + ) +} + +func stackApplyHooks(evts *syncApplyStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks { + return stackChangeHooks( + func(scp *terraform1.StackChangeProgress) error { + return evts.Send(&terraform1.ApplyStackChanges_Event{ + Event: &terraform1.ApplyStackChanges_Event_Progress{ + Progress: scp, + }, + }) + }, + mainStackSource, + ) +} + +// stackChangeHooks is the shared hook-handling logic for both [stackPlanHooks] +// and [stackApplyHooks]. Each phase emits a different subset of the events +// handled here. +func stackChangeHooks(send func(*terraform1.StackChangeProgress) error, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks { return &stackruntime.Hooks{ // For any BeginFunc-shaped hook that returns an OpenTelemetry tracing // span, we'll wrap it in a context so that the runtime's downstream @@ -475,6 +504,21 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr return nil }, + // For the overall apply operation we don't emit any events to the client, + // since it already knows it has asked us to apply, but we do establish + // a root tracing span for all of the downstream planning operations to + // attach themselves to. + BeginApply: func(ctx context.Context, s struct{}) any { + _, span := tracer.Start(ctx, "applying", trace.WithAttributes( + attribute.String("main_stack_source", mainStackSource.String()), + )) + return span + }, + EndApply: func(ctx context.Context, span any, s struct{}) any { + span.(trace.Span).End() + return nil + }, + // After expanding a component, we emit an event to the client to // list all of the resulting instances. In the common case of an // unexpanded component, this will be a single address. @@ -483,15 +527,11 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr for _, ia := range ce.InstanceAddrs { ias = append(ias, ia.String()) } - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: &terraform1.StackChangeProgress{ - Event: &terraform1.StackChangeProgress_ComponentInstances_{ - ComponentInstances: &terraform1.StackChangeProgress_ComponentInstances{ - ComponentAddr: ce.ComponentAddr.String(), - InstanceAddrs: ias, - }, - }, + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ComponentInstances_{ + ComponentInstances: &terraform1.StackChangeProgress_ComponentInstances{ + ComponentAddr: ce.ComponentAddr.String(), + InstanceAddrs: ias, }, }, }) @@ -501,38 +541,42 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr // client, reporting the status of the plan operation. We also create a // nested tracing span for the component instance. PendingComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstancePending), - }, - }) + send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending)) }, BeginComponentInstancePlan: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanning), - }, - }) + send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanning)) _, span := tracer.Start(ctx, "planning", trace.WithAttributes( attribute.String("component_instance", ci.String()), )) return span }, EndComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanned), - }, - }) + send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePlanned)) span.(trace.Span).End() return nil }, ErrorComponentInstancePlan: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored), - }, - }) + send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored)) + span.(trace.Span).End() + return nil + }, + PendingComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) { + send(evtComponentInstanceStatus(ci, hooks.ComponentInstancePending)) + }, + BeginComponentInstanceApply: func(ctx context.Context, ci stackaddrs.AbsComponentInstance) any { + send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplying)) + _, span := tracer.Start(ctx, "applying", trace.WithAttributes( + attribute.String("component_instance", ci.String()), + )) + return span + }, + EndComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any { + send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceApplied)) + span.(trace.Span).End() + return nil + }, + ErrorComponentInstanceApply: func(ctx context.Context, span any, ci stackaddrs.AbsComponentInstance) any { + send(evtComponentInstanceStatus(ci, hooks.ComponentInstanceErrored)) span.(trace.Span).End() return nil }, @@ -540,18 +584,14 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr // When Terraform core reports a resource instance plan status, we // forward it to the events client. ReportResourceInstanceStatus: func(ctx context.Context, span any, rihd *hooks.ResourceInstanceStatusHookData) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: &terraform1.StackChangeProgress{ - Event: &terraform1.StackChangeProgress_ResourceInstanceStatus_{ - ResourceInstanceStatus: &terraform1.StackChangeProgress_ResourceInstanceStatus{ - Addr: &terraform1.ResourceInstanceInStackAddr{ - ComponentInstanceAddr: rihd.Addr.Component.String(), - ResourceInstanceAddr: rihd.Addr.Item.String(), - }, - Status: rihd.Status.ForProtobuf(), - }, + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ResourceInstanceStatus_{ + ResourceInstanceStatus: &terraform1.StackChangeProgress_ResourceInstanceStatus{ + Addr: &terraform1.ResourceInstanceInStackAddr{ + ComponentInstanceAddr: rihd.Addr.Component.String(), + ResourceInstanceAddr: rihd.Addr.Item.String(), }, + Status: rihd.Status.ForProtobuf(), }, }, }) @@ -580,20 +620,16 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr imported.ImportId = ric.Change.Importing.ID } - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: &terraform1.StackChangeProgress{ - Event: &terraform1.StackChangeProgress_ResourceInstancePlannedChange_{ - ResourceInstancePlannedChange: &terraform1.StackChangeProgress_ResourceInstancePlannedChange{ - Addr: &terraform1.ResourceInstanceInStackAddr{ - ComponentInstanceAddr: ric.Addr.Component.String(), - ResourceInstanceAddr: ric.Addr.Item.String(), - }, - Actions: actions, - Moved: moved, - Imported: imported, - }, + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ResourceInstancePlannedChange_{ + ResourceInstancePlannedChange: &terraform1.StackChangeProgress_ResourceInstancePlannedChange{ + Addr: &terraform1.ResourceInstanceInStackAddr{ + ComponentInstanceAddr: ric.Addr.Component.String(), + ResourceInstanceAddr: ric.Addr.Item.String(), }, + Actions: actions, + Moved: moved, + Imported: imported, }, }, }) @@ -601,61 +637,49 @@ func stackPlanHooks(evts *syncPlanStackChangesServer, mainStackSource sourceaddr }, // We also report a roll-up of planned resource action counts after each - // component instance plan completes. + // component instance plan or apply completes. ReportComponentInstancePlanned: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any { - evts.Send(&terraform1.PlanStackChanges_Event{ - Event: &terraform1.PlanStackChanges_Event_Progress{ - Progress: &terraform1.StackChangeProgress{ - Event: &terraform1.StackChangeProgress_ComponentInstanceChanges_{ - ComponentInstanceChanges: &terraform1.StackChangeProgress_ComponentInstanceChanges{ - Addr: &terraform1.ComponentInstanceInStackAddr{ - ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(), - ComponentInstanceAddr: cic.Addr.String(), - }, - Total: int32(cic.Total()), - Add: int32(cic.Add), - Change: int32(cic.Change), - Import: int32(cic.Import), - Remove: int32(cic.Remove), - }, + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ComponentInstanceChanges_{ + ComponentInstanceChanges: &terraform1.StackChangeProgress_ComponentInstanceChanges{ + Addr: &terraform1.ComponentInstanceInStackAddr{ + ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(), + ComponentInstanceAddr: cic.Addr.String(), }, + Total: int32(cic.Total()), + Add: int32(cic.Add), + Change: int32(cic.Change), + Import: int32(cic.Import), + Remove: int32(cic.Remove), }, }, }) return span }, - } -} - -func stackApplyHooks(evts *syncApplyStackChangesServer, mainStackSource sourceaddrs.FinalSource) *stackruntime.Hooks { - return &stackruntime.Hooks{ - // For any BeginFunc-shaped hook that returns an OpenTelemetry tracing - // span, we'll wrap it in a context so that the runtime's downstream - // operations will appear as children of it. - ContextAttach: func(parent context.Context, tracking any) context.Context { - span, ok := tracking.(trace.Span) - if !ok { - return parent - } - return trace.ContextWithSpan(parent, span) - }, - - // For the overall apply operation we don't emit any events to the client, - // since it already knows it has asked us to apply, but we do establish - // a root tracing span for all of the downstream planning operations to - // attach themselves to. - BeginApply: func(ctx context.Context, s struct{}) any { - _, span := tracer.Start(ctx, "applying", trace.WithAttributes( - attribute.String("main_stack_source", mainStackSource.String()), - )) + // The apply rollup should typically report the same information as + // the plan one did earlier, but could vary in some situations if + // e.g. a planned update turned out to be a no-op once some unknown + // values were known, or if the apply phase is handled by a different + // version of the agent than the plan phase which has support for + // a different set of possible change types. + ReportComponentInstanceApplied: func(ctx context.Context, span any, cic *hooks.ComponentInstanceChange) any { + send(&terraform1.StackChangeProgress{ + Event: &terraform1.StackChangeProgress_ComponentInstanceChanges_{ + ComponentInstanceChanges: &terraform1.StackChangeProgress_ComponentInstanceChanges{ + Addr: &terraform1.ComponentInstanceInStackAddr{ + ComponentAddr: stackaddrs.ConfigComponentForAbsInstance(cic.Addr).String(), + ComponentInstanceAddr: cic.Addr.String(), + }, + Total: int32(cic.Total()), + Add: int32(cic.Add), + Change: int32(cic.Change), + Import: int32(cic.Import), + Remove: int32(cic.Remove), + }, + }, + }) return span }, - EndApply: func(ctx context.Context, span any, s struct{}) any { - span.(trace.Span).End() - return nil - }, - - // TODO: other event types } }