internal/event: fix log processor race (#5401)

The cloud events formatter Process function would call
the signer without protecting it, which was racy with the Rotate
function.
pull/5414/head
Johan Brandhorst-Satzkorn 1 year ago committed by GitHub
parent ae916727e5
commit 1cbe1ca463
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -113,6 +113,15 @@ func (f *cloudEventsFormatterFilter) Rotate(w wrapping.Wrapper, _ ...Option) err
return nil
}
func (f *cloudEventsFormatterFilter) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
// The embedded FormatterFilter's Process function calls the signer, but doesn't know
// about the lock, leading to a potential race condition. We take the lock here to ensure
// that the signer is only accessed by one goroutine at a time.
f.l.RLock()
defer f.l.RUnlock()
return f.FormatterFilter.Process(ctx, e)
}
func newPredicate(allow, deny []*filter) func(ctx context.Context, ce any) (bool, error) {
return func(ctx context.Context, ce any) (bool, error) {
if len(allow) == 0 && len(deny) == 0 {

@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"net/url"
"sync"
"testing"
"time"
@ -395,3 +396,36 @@ func Test_cloudEventsFormatter_Rotate(t *testing.T) {
})
}
}
func Test_cloudEventsFormatter_Race(t *testing.T) {
testUrl, err := url.Parse("https://[::1]")
require.NoError(t, err)
testNode, err := newCloudEventsFormatterFilter(testUrl, cloudevents.FormatJSON, WithSchema(testUrl))
require.NoError(t, err)
start := make(chan struct{})
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer wg.Done()
<-start
for i := 0; i < 10; i++ {
_ = testNode.Rotate(testWrapper(t))
}
}()
wg.Add(1)
go func() {
defer wg.Done()
<-start
for i := 0; i < 10; i++ {
_, _ = testNode.Process(context.Background(), &eventlogger.Event{
Type: "test",
CreatedAt: time.Now(),
Payload: "test-data",
})
}
}()
close(start)
wg.Wait()
}

Loading…
Cancel
Save