diff --git a/internal/event/cloudevents_formatter_node.go b/internal/event/cloudevents_formatter_node.go index d4dd6c7eb9..9a24c78f28 100644 --- a/internal/event/cloudevents_formatter_node.go +++ b/internal/event/cloudevents_formatter_node.go @@ -113,6 +113,15 @@ func (f *cloudEventsFormatterFilter) Rotate(w wrapping.Wrapper, _ ...Option) err return nil } +func (f *cloudEventsFormatterFilter) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) { + // The embedded FormatterFilter's Process function calls the signer, but doesn't know + // about the lock, leading to a potential race condition. We take the lock here to ensure + // that the signer is only accessed by one goroutine at a time. + f.l.RLock() + defer f.l.RUnlock() + return f.FormatterFilter.Process(ctx, e) +} + func newPredicate(allow, deny []*filter) func(ctx context.Context, ce any) (bool, error) { return func(ctx context.Context, ce any) (bool, error) { if len(allow) == 0 && len(deny) == 0 { diff --git a/internal/event/cloudevents_formatter_node_test.go b/internal/event/cloudevents_formatter_node_test.go index e96101af9e..58f2a1e159 100644 --- a/internal/event/cloudevents_formatter_node_test.go +++ b/internal/event/cloudevents_formatter_node_test.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "net/url" + "sync" "testing" "time" @@ -395,3 +396,36 @@ func Test_cloudEventsFormatter_Rotate(t *testing.T) { }) } } + +func Test_cloudEventsFormatter_Race(t *testing.T) { + testUrl, err := url.Parse("https://[::1]") + require.NoError(t, err) + + testNode, err := newCloudEventsFormatterFilter(testUrl, cloudevents.FormatJSON, WithSchema(testUrl)) + require.NoError(t, err) + + start := make(chan struct{}) + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + defer wg.Done() + <-start + for i := 0; i < 10; i++ { + _ = testNode.Rotate(testWrapper(t)) + } + }() + wg.Add(1) + go func() { + defer wg.Done() + <-start + for i := 0; i < 10; i++ { + _, _ = testNode.Process(context.Background(), &eventlogger.Event{ + Type: "test", + CreatedAt: time.Now(), + Payload: "test-data", + }) + } + }() + close(start) + wg.Wait() +}