From 1cbe1ca463c9878fb277b035d4c8b2871dc160da Mon Sep 17 00:00:00 2001 From: Johan Brandhorst-Satzkorn Date: Tue, 7 Jan 2025 09:11:52 -0800 Subject: [PATCH] internal/event: fix log processor race (#5401) The cloud events formatter Process function would call the signer without protecting it, which was racy with the Rotate function. --- internal/event/cloudevents_formatter_node.go | 9 +++++ .../event/cloudevents_formatter_node_test.go | 34 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/internal/event/cloudevents_formatter_node.go b/internal/event/cloudevents_formatter_node.go index d4dd6c7eb9..9a24c78f28 100644 --- a/internal/event/cloudevents_formatter_node.go +++ b/internal/event/cloudevents_formatter_node.go @@ -113,6 +113,15 @@ func (f *cloudEventsFormatterFilter) Rotate(w wrapping.Wrapper, _ ...Option) err return nil } +func (f *cloudEventsFormatterFilter) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) { + // The embedded FormatterFilter's Process function calls the signer, but doesn't know + // about the lock, leading to a potential race condition. We take the lock here to ensure + // that the signer is only accessed by one goroutine at a time. + f.l.RLock() + defer f.l.RUnlock() + return f.FormatterFilter.Process(ctx, e) +} + func newPredicate(allow, deny []*filter) func(ctx context.Context, ce any) (bool, error) { return func(ctx context.Context, ce any) (bool, error) { if len(allow) == 0 && len(deny) == 0 { diff --git a/internal/event/cloudevents_formatter_node_test.go b/internal/event/cloudevents_formatter_node_test.go index e96101af9e..58f2a1e159 100644 --- a/internal/event/cloudevents_formatter_node_test.go +++ b/internal/event/cloudevents_formatter_node_test.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "net/url" + "sync" "testing" "time" @@ -395,3 +396,36 @@ func Test_cloudEventsFormatter_Rotate(t *testing.T) { }) } } + +func Test_cloudEventsFormatter_Race(t *testing.T) { + testUrl, err := url.Parse("https://[::1]") + require.NoError(t, err) + + testNode, err := newCloudEventsFormatterFilter(testUrl, cloudevents.FormatJSON, WithSchema(testUrl)) + require.NoError(t, err) + + start := make(chan struct{}) + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + defer wg.Done() + <-start + for i := 0; i < 10; i++ { + _ = testNode.Rotate(testWrapper(t)) + } + }() + wg.Add(1) + go func() { + defer wg.Done() + <-start + for i := 0; i < 10; i++ { + _, _ = testNode.Process(context.Background(), &eventlogger.Event{ + Type: "test", + CreatedAt: time.Now(), + Payload: "test-data", + }) + } + }() + close(start) + wg.Wait() +}