From 6161635f658bb0978cb0d3267e7276833c58401e Mon Sep 17 00:00:00 2001 From: Irena Rindos Date: Wed, 13 Sep 2023 13:08:16 -0400 Subject: [PATCH] feat(event): add register pipeline (#3714) --- internal/event/eventer.go | 7 +++++++ internal/event/eventer_test.go | 2 ++ 2 files changed, 9 insertions(+) diff --git a/internal/event/eventer.go b/internal/event/eventer.go index 21aae0f8e4..57ea8b82e7 100644 --- a/internal/event/eventer.go +++ b/internal/event/eventer.go @@ -74,6 +74,7 @@ type queuedEvent struct { // Eventer provides a method to send events to pipelines of sinks type Eventer struct { broker broker + serverName string flushableNodes []flushable conf EventerConfig logger hclog.Logger @@ -90,6 +91,11 @@ type Eventer struct { gatedQueueLock *sync.Mutex } +type node struct { + node eventlogger.Node + id eventlogger.NodeID +} + type pipeline struct { eventType Type fmtId eventlogger.NodeID @@ -250,6 +256,7 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri conf: c, broker: b, auditWrapperNodes: []any{}, + serverName: serverName, } if !opts.withNow.IsZero() { diff --git a/internal/event/eventer_test.go b/internal/event/eventer_test.go index 3b811e6470..f1c377c3f0 100644 --- a/internal/event/eventer_test.go +++ b/internal/event/eventer_test.go @@ -157,6 +157,7 @@ func Test_InitSysEventer(t *testing.T) { tt.want.errPipelines = got.errPipelines tt.want.observationPipelines = got.observationPipelines tt.want.auditWrapperNodes = got.auditWrapperNodes + tt.want.serverName = got.serverName assert.Equal(tt.want, got) }) } @@ -690,6 +691,7 @@ func Test_NewEventer(t *testing.T) { tt.want.errPipelines = got.errPipelines tt.want.observationPipelines = got.observationPipelines tt.want.auditWrapperNodes = got.auditWrapperNodes + tt.want.serverName = got.serverName assert.Equal(tt.want, got) assert.Lenf(testBroker.registeredNodeIds, len(tt.wantRegistered), "got nodes: %q", testBroker.registeredNodeIds)