From e596daf34d444872f072de49a4ee2d16caa965eb Mon Sep 17 00:00:00 2001 From: Jeff Mitchell Date: Wed, 16 Mar 2022 17:25:34 -0400 Subject: [PATCH] Add hclog adapter and gating to eventer (#1915) This contains to related things: 1. An adapter allowing an hclog interface to take in information and send them to the eventer 2. A gate on the eventer, which is the analogue to the behavior prior to eventing where we'd ensure logs were not printed prior to the startup information. Without this, plugin logs using the hclog adapter will start showing up too early. --- internal/cmd/base/option.go | 8 + internal/cmd/base/option_test.go | 7 + internal/cmd/base/server_test.go | 7 +- internal/cmd/base/servers.go | 22 +- internal/cmd/commands/database/migrate.go | 63 +++-- internal/cmd/commands/dev/dev.go | 8 +- internal/cmd/commands/server/server.go | 8 +- internal/observability/event/eventer.go | 110 +++++++- .../observability/event/eventer_gate_test.go | 166 ++++++++++++ internal/observability/event/eventer_test.go | 29 ++- .../event/hclog_event_adapter.go | 237 ++++++++++++++++++ .../event/hclog_event_adapter_test.go | 186 ++++++++++++++ internal/observability/event/options.go | 27 ++ internal/observability/event/options_test.go | 22 ++ internal/observability/event/sink_config.go | 19 +- internal/observability/event/sink_type.go | 5 +- internal/servers/controller/controller.go | 8 + 17 files changed, 879 insertions(+), 53 deletions(-) create mode 100644 internal/observability/event/eventer_gate_test.go create mode 100644 internal/observability/event/hclog_event_adapter.go create mode 100644 internal/observability/event/hclog_event_adapter_test.go diff --git a/internal/cmd/base/option.go b/internal/cmd/base/option.go index 4ab3b754bf..d5c4735e00 100644 --- a/internal/cmd/base/option.go +++ b/internal/cmd/base/option.go @@ -39,6 +39,7 @@ type Options struct { withAttributeFieldPrefix string withStatusCode int withHostPlugin func() (string, plugin.HostPluginServiceClient) + withEventGating bool } func getDefaultOptions() Options { @@ -180,3 +181,10 @@ func WithHostPlugin(pluginId string, plg plugin.HostPluginServiceClient) Option } } } + +// WithEventGating starts the eventer in gated mode +func WithEventGating(with bool) Option { + return func(o *Options) { + o.withEventGating = with + } +} diff --git a/internal/cmd/base/option_test.go b/internal/cmd/base/option_test.go index cd082ea3ca..ef4d913fe0 100644 --- a/internal/cmd/base/option_test.go +++ b/internal/cmd/base/option_test.go @@ -117,4 +117,11 @@ func Test_GetOpts(t *testing.T) { testOpts.withDialect = "test-dialect" assert.Equal(opts, testOpts) }) + t.Run("withEventGating", func(t *testing.T) { + assert := assert.New(t) + testOpts := getDefaultOptions() + assert.False(testOpts.withEventGating) + opts := getOpts(WithEventGating(true)) + assert.True(opts.withEventGating) + }) } diff --git a/internal/cmd/base/server_test.go b/internal/cmd/base/server_test.go index 37dd0885a6..a973036792 100644 --- a/internal/cmd/base/server_test.go +++ b/internal/cmd/base/server_test.go @@ -59,6 +59,8 @@ func TestServer_SetupKMSes(t *testing.T) { purposes: []string{globals.KmsPurposeRoot, globals.KmsPurposeRecovery, globals.KmsPurposeWorkerAuth, globals.KmsPurposeConfig}, }, } + logger := hclog.Default() + serLock := new(sync.Mutex) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { assert, require := assert.New(t), require.New(t) @@ -70,8 +72,9 @@ func TestServer_SetupKMSes(t *testing.T) { }, }, } - s := NewServer(&Command{}) - err := s.SetupKMSes(context.Background(), cli.NewMockUi(), &config.Config{SharedConfig: conf}) + s := NewServer(&Command{Context: context.Background()}) + require.NoError(s.SetupEventing(logger, serLock, "setup-kms-testing")) + err := s.SetupKMSes(s.Context, cli.NewMockUi(), &config.Config{SharedConfig: conf}) if tt.wantErrContains != "" { require.Error(err) diff --git a/internal/cmd/base/servers.go b/internal/cmd/base/servers.go index 9e21f3cfbc..738fd92c76 100644 --- a/internal/cmd/base/servers.go +++ b/internal/cmd/base/servers.go @@ -205,7 +205,8 @@ func (b *Server) SetupEventing(logger hclog.Logger, serializationLock *sync.Mute serializationLock, serverName, *opts.withEventerConfig, - event.WithAuditWrapper(opts.withEventWrapper)) + event.WithAuditWrapper(opts.withEventWrapper), + event.WithGating(opts.withEventGating)) if err != nil { return berrors.WrapDeprecated(err, op, berrors.WithMsg("unable to create eventer")) } @@ -283,11 +284,15 @@ func (b *Server) SetupLogging(flagLogLevel, flagLogFormat, configLogLevel, confi return nil } -func (b *Server) ReleaseLogGate() { +func (b *Server) ReleaseLogGate() error { // Release the log gate. b.Logger.(hclog.OutputResettable).ResetOutputWithFlush(&hclog.LoggerOptions{ Output: b.logOutput, }, b.GatedWriter) + if b.Eventer != nil { + return b.Eventer.ReleaseGate() + } + return nil } func (b *Server) StorePidFile(pidPath string) error { @@ -482,6 +487,7 @@ func (b *Server) SetupListeners(ui cli.Ui, config *configutil.SharedConfig, allo // and sends each off to configutil to instantiate a wrapper. func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Config) error { sharedConfig := config.SharedConfig + var pluginLogger hclog.Logger var err error for _, kms := range sharedConfig.Seals { for _, purpose := range kms.Purpose { @@ -498,6 +504,13 @@ func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Confi return fmt.Errorf("Unknown KMS purpose %q", kms.Purpose) } + if pluginLogger == nil { + pluginLogger, err = event.NewHclogLogger(b.Context, b.Eventer) + if err != nil { + return fmt.Errorf("Error creating KMS plugin logger: %w", err) + } + } + // This can be modified by configutil so store the original value origPurpose := kms.Purpose kms.Purpose = []string{purpose} @@ -509,7 +522,9 @@ func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Confi &b.Info, configutil.WithPluginOptions( pluginutil.WithPluginsMap(kms_plugin_assets.BuiltinKmsPlugins()), - pluginutil.WithPluginsFilesystem(kms_plugin_assets.KmsPluginPrefix, kms_plugin_assets.FileSystem())), + pluginutil.WithPluginsFilesystem(kms_plugin_assets.KmsPluginPrefix, kms_plugin_assets.FileSystem()), + ), + configutil.WithLogger(pluginLogger.Named(kms.Type).With("purpose", purpose)), ) if wrapperConfigError != nil { return fmt.Errorf( @@ -551,7 +566,6 @@ func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Confi default: return fmt.Errorf("KMS purpose of %q is unknown", purpose) } - } } diff --git a/internal/cmd/commands/database/migrate.go b/internal/cmd/commands/database/migrate.go index 7db8e2ccd5..2330d2343d 100644 --- a/internal/cmd/commands/database/migrate.go +++ b/internal/cmd/commands/database/migrate.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/boundary/internal/cmd/base" "github.com/hashicorp/boundary/internal/cmd/config" "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/observability/event" host_plugin_assets "github.com/hashicorp/boundary/plugins/host" kms_plugin_assets "github.com/hashicorp/boundary/plugins/kms" external_host_plugins "github.com/hashicorp/boundary/sdk/plugins/host" @@ -138,13 +139,49 @@ func (c *MigrateCommand) Run(args []string) (retCode int) { }() } + dialect := "postgres" + + c.srv = base.NewServer(&base.Command{UI: c.UI}) + + if err := c.srv.SetupLogging(c.flagLogLevel, c.flagLogFormat, c.Config.LogLevel, c.Config.LogFormat); err != nil { + c.UI.Error(err.Error()) + return base.CommandCliError + } + var serverName string + switch { + case c.Config.Controller == nil: + serverName = "boundary-database-migrate" + default: + if _, err := c.Config.Controller.InitNameIfEmpty(); err != nil { + c.UI.Error(err.Error()) + return base.CommandCliError + } + serverName = c.Config.Controller.Name + "/boundary-database-migrate" + } + if err := c.srv.SetupEventing( + c.srv.Logger, + c.srv.StderrLock, + serverName, + base.WithEventerConfig(c.Config.Eventing)); err != nil { + c.UI.Error(err.Error()) + return base.CommandCliError + } + + pluginLogger, err := event.NewHclogLogger(c.Context, c.srv.Eventer) + if err != nil { + c.UI.Error(fmt.Sprintf("Error creating host catalog plugin logger: %v", err)) + return base.CommandCliError + } + _, awsCleanup, err := external_host_plugins.CreateHostPlugin( c.Context, "aws", external_host_plugins.WithPluginOptions( pluginutil.WithPluginExecutionDirectory(c.Config.Plugins.ExecutionDir), pluginutil.WithPluginsFilesystem(host_plugin_assets.HostPluginPrefix, host_plugin_assets.FileSystem()), - )) + ), + external_host_plugins.WithLogger(pluginLogger.Named("aws")), + ) if err != nil { c.UI.Error(fmt.Errorf("Error creating dynamic host plugin: %w", err).Error()) c.UI.Warn(base.WrapAtLength( @@ -176,30 +213,6 @@ plugins { return base.CommandCliError } - dialect := "postgres" - - c.srv = base.NewServer(&base.Command{UI: c.UI}) - - if err := c.srv.SetupLogging(c.flagLogLevel, c.flagLogFormat, c.Config.LogLevel, c.Config.LogFormat); err != nil { - c.UI.Error(err.Error()) - return base.CommandCliError - } - var serverName string - switch { - case c.Config.Controller == nil: - serverName = "boundary-database-migrate" - default: - if _, err := c.Config.Controller.InitNameIfEmpty(); err != nil { - c.UI.Error(err.Error()) - return base.CommandCliError - } - serverName = c.Config.Controller.Name + "/boundary-database-migrate" - } - if err := c.srv.SetupEventing(c.srv.Logger, c.srv.StderrLock, serverName, base.WithEventerConfig(c.Config.Eventing)); err != nil { - c.UI.Error(err.Error()) - return base.CommandCliError - } - // If mlockall(2) isn't supported, show a warning. We disable this in dev // because it is quite scary to see when first using Boundary. We also disable // this if the user has explicitly disabled mlock in configuration. diff --git a/internal/cmd/commands/dev/dev.go b/internal/cmd/commands/dev/dev.go index a1a5647509..80a22b60ef 100644 --- a/internal/cmd/commands/dev/dev.go +++ b/internal/cmd/commands/dev/dev.go @@ -514,7 +514,8 @@ func (c *Command) Run(args []string) int { c.StderrLock, serverName, base.WithEventerConfig(c.Config.Eventing), - base.WithEventFlags(eventFlags)); err != nil { + base.WithEventFlags(eventFlags), + base.WithEventGating(true)); err != nil { c.UI.Error(err.Error()) return base.CommandCliError } @@ -596,7 +597,10 @@ func (c *Command) Run(args []string) int { } c.PrintInfo(c.UI) - c.ReleaseLogGate() + if err := c.ReleaseLogGate(); err != nil { + c.UI.Error(fmt.Errorf("Error releasing event gate: %w", err).Error()) + return base.CommandCliError + } { c.EnabledPlugins = append(c.EnabledPlugins, base.EnabledPluginHostAws, base.EnabledPluginHostAzure) diff --git a/internal/cmd/commands/server/server.go b/internal/cmd/commands/server/server.go index 15910852ab..5cc12688ee 100644 --- a/internal/cmd/commands/server/server.go +++ b/internal/cmd/commands/server/server.go @@ -164,7 +164,8 @@ func (c *Command) Run(args []string) int { if err := c.SetupEventing(c.Logger, c.StderrLock, strings.Join(serverNames, "/"), - base.WithEventerConfig(c.Config.Eventing)); err != nil { + base.WithEventerConfig(c.Config.Eventing), + base.WithEventGating(true)); err != nil { c.UI.Error(err.Error()) return base.CommandUserError } @@ -447,7 +448,10 @@ func (c *Command) Run(args []string) int { }() c.PrintInfo(c.UI) - c.ReleaseLogGate() + if err := c.ReleaseLogGate(); err != nil { + c.UI.Error(fmt.Errorf("Error releasing event gate: %w", err).Error()) + return base.CommandCliError + } if c.Config.Controller != nil { c.EnabledPlugins = append(c.EnabledPlugins, base.EnabledPluginHostAws, base.EnabledPluginHostAzure) diff --git a/internal/observability/event/eventer.go b/internal/observability/event/eventer.go index 663eba6cce..f12cd9d360 100644 --- a/internal/observability/event/eventer.go +++ b/internal/observability/event/eventer.go @@ -22,6 +22,8 @@ import ( "github.com/hashicorp/eventlogger/sinks/writer" "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping/v2" + "github.com/hashicorp/go-multierror" + "go.uber.org/atomic" "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -59,6 +61,13 @@ type broker interface { RegisterPipeline(def eventlogger.Pipeline) error } +// queuedEvent stores an event and the context that was associated with it when +// gating +type queuedEvent struct { + ctx context.Context + event interface{} +} + // Eventer provides a method to send events to pipelines of sinks type Eventer struct { broker broker @@ -69,6 +78,13 @@ type Eventer struct { observationPipelines []pipeline errPipelines []pipeline auditWrapperNodes []interface{} + + // Gating is used to delay output of events until after we have a chance to + // render startup info, similar to what was done for hclog before eventing + // supplanted it. It affects only error and system events. + gated atomic.Bool + gatedQueue []*queuedEvent + gatedQueueLock *sync.Mutex } type pipeline struct { @@ -189,6 +205,7 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri } e := &Eventer{ + gatedQueueLock: new(sync.Mutex), logger: log, conf: c, broker: b, @@ -199,6 +216,10 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri e.broker.StopTimeAt(opts.withNow) } + if opts.withGating { + e.gated.Store(true) + } + // serializedStderr will be shared among all StderrSinks so their output is not // interwoven serializedStderr := serializedWriter{ @@ -253,6 +274,17 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri return nil, fmt.Errorf("%s: %w", op, err) } sinkId = eventlogger.NodeID(id) + case WriterSink: + wsc := s.WriterConfig + sinkNode = &writer.Sink{ + Format: string(s.Format), + Writer: wsc.Writer, + } + id, err := NewId("writer") + if err != nil { + return nil, fmt.Errorf("%s: %w", op, err) + } + sinkId = eventlogger.NodeID(id) default: return nil, fmt.Errorf("%s: unknown sink type %s", op, s.Type) } @@ -558,7 +590,7 @@ func (e *Eventer) RotateAuditWrapper(ctx context.Context, newWrapper wrapping.Wr } // writeObservation writes/sends an Observation event. -func (e *Eventer) writeObservation(ctx context.Context, event *observation) error { +func (e *Eventer) writeObservation(ctx context.Context, event *observation, _ ...Option) error { const op = "event.(Eventer).writeObservation" if event == nil { return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter) @@ -584,11 +616,23 @@ func (e *Eventer) writeObservation(ctx context.Context, event *observation) erro } // writeError writes/sends an Err event -func (e *Eventer) writeError(ctx context.Context, event *err) error { +func (e *Eventer) writeError(ctx context.Context, event *err, opt ...Option) error { const op = "event.(Eventer).writeError" if event == nil { return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter) } + opts := getOpts(opt...) + if e.gated.Load() && !opts.withNoGateLocking { + e.gatedQueueLock.Lock() + defer e.gatedQueueLock.Unlock() + if e.gated.Load() { // validate that it's still gated and we haven't just drained the queue + e.gatedQueue = append(e.gatedQueue, &queuedEvent{ + ctx: ctx, + event: event, + }) + return nil + } + } err := e.retrySend(ctx, stdRetryCount, expBackoff{}, func() (eventlogger.Status, error) { return e.broker.Send(ctx, eventlogger.EventType(ErrorType), event) }) @@ -600,11 +644,23 @@ func (e *Eventer) writeError(ctx context.Context, event *err) error { } // writeSysEvent writes/sends an sysEvent event -func (e *Eventer) writeSysEvent(ctx context.Context, event *sysEvent) error { +func (e *Eventer) writeSysEvent(ctx context.Context, event *sysEvent, opt ...Option) error { const op = "event.(Eventer).writeSysEvent" if event == nil { return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter) } + opts := getOpts(opt...) + if e.gated.Load() && !opts.withNoGateLocking { + e.gatedQueueLock.Lock() + defer e.gatedQueueLock.Unlock() + if e.gated.Load() { // validate that it's still gated and we haven't just drained the queue + e.gatedQueue = append(e.gatedQueue, &queuedEvent{ + ctx: ctx, + event: event, + }) + return nil + } + } err := e.retrySend(ctx, stdRetryCount, expBackoff{}, func() (eventlogger.Status, error) { return e.broker.Send(ctx, eventlogger.EventType(SystemType), event) }) @@ -616,7 +672,7 @@ func (e *Eventer) writeSysEvent(ctx context.Context, event *sysEvent) error { } // writeAudit writes/send an audit event -func (e *Eventer) writeAudit(ctx context.Context, event *audit) error { +func (e *Eventer) writeAudit(ctx context.Context, event *audit, _ ...Option) error { const op = "event.(Eventer).writeAudit" if event == nil { return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter) @@ -655,6 +711,52 @@ func (e *Eventer) FlushNodes(ctx context.Context) error { return nil } +// ReleaseGate releases queued events. If any event isn't successfully written, +// it remains in the queue and we could try a flush later. +func (e *Eventer) ReleaseGate() error { + const op = "event.(Eventer).ReleaseGate" + e.gatedQueueLock.Lock() + defer e.gatedQueueLock.Unlock() + + // Don't gate anymore. By the time we hit this we will have the lock and be + // guaranteed that nothing else will add onto the queue while we drain it. + // Logic in the functions that add to the queue validate this boolean again + // after they acquire the lock so running this function should fully drain + // the events except in the case of an error. + e.gated.Store(false) + + // Don't do anything if we're not gated and are fully drained + if len(e.gatedQueue) == 0 && !e.gated.Load() { + return nil + } + + var totalErrs *multierror.Error + for _, qe := range e.gatedQueue { + var writeErr error + if qe == nil { + continue // we may have already sent this but gotten errors later + } + var queuedOp string + switch t := qe.event.(type) { + case *sysEvent: + queuedOp = "system" + writeErr = e.writeSysEvent(qe.ctx, t, WithNoGateLocking(true)) + case *err: + queuedOp = "error" + writeErr = e.writeError(qe.ctx, t, WithNoGateLocking(true)) + default: + // Have no idea what this is and shouldn't have gotten in here to + // begin with, so just continue, and log it + writeErr = fmt.Errorf("unknown event type %T", t) + } + if writeErr != nil { + totalErrs = multierror.Append(fmt.Errorf("in %s, error sending queued %s event: %w", op, queuedOp, writeErr)) + } + } + e.gatedQueue = nil + return totalErrs.ErrorOrNil() +} + // StandardLogger will create *log.Logger that will emit events through this // Logger. This allows packages that require the stdlib log to emit events // instead. diff --git a/internal/observability/event/eventer_gate_test.go b/internal/observability/event/eventer_gate_test.go new file mode 100644 index 0000000000..a2e4c5a362 --- /dev/null +++ b/internal/observability/event/eventer_gate_test.go @@ -0,0 +1,166 @@ +package event + +import ( + "bytes" + "context" + "fmt" + "os" + "strings" + "sync" + "testing" + + "github.com/hashicorp/boundary/globals" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-secure-stdlib/strutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventer_Gating(t *testing.T) { + t.Parallel() + require := require.New(t) + + // FIXME: Remove this when no longer needed: + os.Setenv(globals.BOUNDARY_DEVELOPER_ENABLE_EVENTS, "true") + + buffer := new(bytes.Buffer) + eventerConfig := EventerConfig{ + AuditEnabled: true, + ObservationsEnabled: true, + SysEventsEnabled: true, + Sinks: []*SinkConfig{ + { + Name: "test-sink", + EventTypes: []Type{EveryType}, + Format: TextHclogSinkFormat, + Type: WriterSink, + WriterConfig: &WriterSinkTypeConfig{ + Writer: buffer, + }, + }, + }, + } + testLock := &sync.Mutex{} + testLogger := hclog.New(&hclog.LoggerOptions{ + Mutex: testLock, + Name: "test", + }) + eventer, err := NewEventer( + testLogger, + testLock, + "TestEventer_Gating", + eventerConfig, + WithGating(true), + ) + require.NoError(err) + + ctx, err := NewEventerContext(context.Background(), eventer) + require.NoError(err) + + // This test sends a series of events of different types. The tests are + // meant to be in order as we want to send some that should be gated and + // some that shouldn't and ensure correct behavior at each step. + var totalEvents int + tests := []struct { + name string + eventFn func() + expectedGatedEvents int + }{ + { + name: "system-event-1", + eventFn: func() { + WriteSysEvent(ctx, "system-event-1", "system-event-1") + totalEvents++ + }, + expectedGatedEvents: 1, + }, + { + name: "system-event-2", + eventFn: func() { + WriteSysEvent(ctx, "system-event-2", "system-event-2") + totalEvents++ + }, + expectedGatedEvents: 2, + }, + { + name: "audit-1", + eventFn: func() { + require.NoError(WriteAudit(ctx, "audit-1")) + totalEvents++ + }, + expectedGatedEvents: 2, + }, + { + name: "observation-1", + eventFn: func() { + require.NoError(WriteObservation(ctx, "observation-1", WithId("observation-1"), WithHeader("foo", "bar"))) + totalEvents++ + }, + expectedGatedEvents: 2, + }, + { + name: "error-1", + eventFn: func() { + WriteError(ctx, "error-1", fmt.Errorf("error-1")) + totalEvents++ + }, + expectedGatedEvents: 3, + }, + { + name: "error-2", + eventFn: func() { + WriteError(ctx, "error-2", fmt.Errorf("error-2")) + totalEvents++ + }, + expectedGatedEvents: 4, + }, + // This should result in all events being flushed so none gated + { + name: "release-gate", + eventFn: func() { + require.NoError(eventer.ReleaseGate()) + }, + expectedGatedEvents: 0, + }, + // From here on out we're verifying that all events of all types go through + { + name: "system-event-3", + eventFn: func() { + WriteSysEvent(ctx, "system-event-3", "system-event-3") + totalEvents++ + }, + expectedGatedEvents: 0, + }, + { + name: "audit-2", + eventFn: func() { + require.NoError(WriteAudit(ctx, "audit-2")) + totalEvents++ + }, + expectedGatedEvents: 0, + }, + { + name: "observation-2", + eventFn: func() { + require.NoError(WriteObservation(ctx, "observation-2", WithId("observation-2"), WithHeader("foo", "bar"))) + totalEvents++ + }, + expectedGatedEvents: 0, + }, + { + name: "error-2", + eventFn: func() { + WriteError(ctx, "error-2", fmt.Errorf("error-2")) + totalEvents++ + }, + expectedGatedEvents: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + tt.eventFn() + assert.Len(strutil.RemoveEmpty(strings.Split(buffer.String(), "\n")), totalEvents-tt.expectedGatedEvents, buffer.String()) + }) + } +} diff --git a/internal/observability/event/eventer_test.go b/internal/observability/event/eventer_test.go index e04ddae5d3..7146289c68 100644 --- a/internal/observability/event/eventer_test.go +++ b/internal/observability/event/eventer_test.go @@ -97,8 +97,9 @@ func Test_InitSysEventer(t *testing.T) { lock: testLock, serverName: "success-with-config", want: &Eventer{ - logger: testLogger, - conf: testConfig.EventerConfig, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), + conf: testConfig.EventerConfig, }, }, { @@ -108,7 +109,8 @@ func Test_InitSysEventer(t *testing.T) { lock: testLock, serverName: "success-with-default-config", want: &Eventer{ - logger: testLogger, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), conf: EventerConfig{ Sinks: []*SinkConfig{ { @@ -435,7 +437,8 @@ func Test_NewEventer(t *testing.T) { logger: testLogger, serverName: "valid-audit-config-with-wrapper", want: &Eventer{ - logger: testLogger, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), conf: EventerConfig{ AuditEnabled: true, Sinks: []*SinkConfig{ @@ -521,7 +524,8 @@ func Test_NewEventer(t *testing.T) { lock: testLock, serverName: "success-with-default-config", want: &Eventer{ - logger: testLogger, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), conf: EventerConfig{ Sinks: []*SinkConfig{ { @@ -561,8 +565,9 @@ func Test_NewEventer(t *testing.T) { lock: testLock, serverName: "testSetup", want: &Eventer{ - logger: testLogger, - conf: testSetup.EventerConfig, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), + conf: testSetup.EventerConfig, }, wantRegistered: []string{ "cloudevents", // stderr @@ -603,8 +608,9 @@ func Test_NewEventer(t *testing.T) { lock: testLock, serverName: "testSetup-with-all-opts", want: &Eventer{ - logger: testLogger, - conf: testSetupWithOpts.EventerConfig, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), + conf: testSetupWithOpts.EventerConfig, }, wantRegistered: []string{ "cloudevents", // stderr @@ -657,8 +663,9 @@ func Test_NewEventer(t *testing.T) { lock: testLock, serverName: "testSetup", want: &Eventer{ - logger: testLogger, - conf: testHclogSetup.EventerConfig, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), + conf: testHclogSetup.EventerConfig, }, wantRegistered: []string{ "hclog-text", // stderr diff --git a/internal/observability/event/hclog_event_adapter.go b/internal/observability/event/hclog_event_adapter.go new file mode 100644 index 0000000000..3162768cd4 --- /dev/null +++ b/internal/observability/event/hclog_event_adapter.go @@ -0,0 +1,237 @@ +package event + +import ( + "context" + "fmt" + "io" + "log" + "sync" + + "github.com/hashicorp/go-hclog" +) + +// HclogLoggerAdapter is used to provide an hclog-style interface to code that +// cannot natively handle eventing. Currently, all log lines are written as +// system events. Note that this is not meant for high throughput; some +// potential optimizations (such as using atomic values for name and such) are +// not current implemented. Additionally, some functions (such as fetching a +// stdlib logger/writer) are simply not supported right now. +type HclogLoggerAdapter struct { + eventCtx context.Context + l *sync.RWMutex + level hclog.Level + name string + withArgs []interface{} +} + +// Ensure that we are implementing Logger +var _ hclog.Logger = (*HclogLoggerAdapter)(nil) + +// NewHclogLogger creates a new hclog.Logger-compatible implementation that +// outputs to events +func NewHclogLogger(ctx context.Context, e *Eventer, opt ...Option) (hclog.Logger, error) { + const op = "event.HclogLogger" + eventCtx, err := NewEventerContext(ctx, e) + if err != nil { + return nil, fmt.Errorf("%s: %w", op, err) + } + opts := getOpts(opt...) + return &HclogLoggerAdapter{ + eventCtx: eventCtx, + l: new(sync.RWMutex), + level: opts.withHclogLevel, + }, nil +} + +// Args are alternating key, val pairs +// keys must be strings +// vals can be any type, but display is implementation specific +// Emit a message and key/value pairs at a provided log level +func (h *HclogLoggerAdapter) Log(level hclog.Level, msg string, args ...interface{}) { + switch { + case h.level == hclog.NoLevel: // If logger is not set to any level, accept it + case h.level <= level: // Otherwise if logger is same or more verbose, accept + default: + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the TRACE level +func (h *HclogLoggerAdapter) Trace(msg string, args ...interface{}) { + if h.level > hclog.Trace { + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the DEBUG level +func (h *HclogLoggerAdapter) Debug(msg string, args ...interface{}) { + if h.level > hclog.Debug { + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the INFO level +func (h *HclogLoggerAdapter) Info(msg string, args ...interface{}) { + if h.level > hclog.Info { + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the WARN level +func (h *HclogLoggerAdapter) Warn(msg string, args ...interface{}) { + if h.level > hclog.Warn { + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the ERROR level +func (h *HclogLoggerAdapter) Error(msg string, args ...interface{}) { + if h.level > hclog.Error { + return + } + h.writeEvent("", msg, args) +} + +func (h *HclogLoggerAdapter) writeEvent(caller Op, msg string, args []interface{}) { + h.l.RLock() + defer h.l.RUnlock() + var allArgs []interface{} + if len(h.withArgs)+len(args) > 0 { + allArgs = append(h.withArgs, args...) + } + if h.name != "" { + allArgs = append(allArgs, "@original-log-name", h.name) + } + allArgs = append(allArgs, "@original-log-level", h.level.String()) + WriteSysEvent(h.eventCtx, "", msg, allArgs...) +} + +// Indicate if TRACE logs would be emitted. This and the other Is* guards +// are used to elide expensive logging code based on the current level. +func (h *HclogLoggerAdapter) IsTrace() bool { + return h.level <= hclog.Trace +} + +// Indicate if DEBUG logs would be emitted. This and the other Is* guards +func (h *HclogLoggerAdapter) IsDebug() bool { + return h.level <= hclog.Debug +} + +// Indicate if INFO logs would be emitted. This and the other Is* guards +func (h *HclogLoggerAdapter) IsInfo() bool { + return h.level <= hclog.Info +} + +// Indicate if WARN logs would be emitted. This and the other Is* guards +func (h *HclogLoggerAdapter) IsWarn() bool { + return h.level <= hclog.Warn +} + +// Indicate if ERROR logs would be emitted. This and the other Is* guards +func (h *HclogLoggerAdapter) IsError() bool { + return h.level <= hclog.Error +} + +// ImpliedArgs returns With key/value pairs +func (h *HclogLoggerAdapter) ImpliedArgs() []interface{} { + return h.withArgs +} + +// Creates a sublogger that will always have the given key/value pairs +func (h *HclogLoggerAdapter) With(args ...interface{}) hclog.Logger { + h.l.Lock() + defer h.l.Unlock() + newArgs := args + if len(h.withArgs) > 0 { + newArgs = make([]interface{}, len(h.withArgs), len(h.withArgs)+len(args)) + copy(newArgs, h.withArgs) + newArgs = append(newArgs, args...) + } + return &HclogLoggerAdapter{ + eventCtx: h.eventCtx, + l: new(sync.RWMutex), + level: h.level, + name: h.name, + withArgs: newArgs, + } +} + +// Returns the Name of the logger +func (h *HclogLoggerAdapter) Name() string { + h.l.RLock() + defer h.l.RUnlock() + return h.name +} + +// Create a logger that will prepend the name string on the front of all messages. +// If the logger already has a name, the new value will be appended to the current +// name. That way, a major subsystem can use this to decorate all it's own logs +// without losing context. +func (h *HclogLoggerAdapter) Named(name string) hclog.Logger { + h.l.Lock() + defer h.l.Unlock() + var newArgs []interface{} + if len(h.withArgs) > 0 { + newArgs = make([]interface{}, len(h.withArgs)) + copy(newArgs, h.withArgs) + } + + newName := name + if h.name != "" { + newName = fmt.Sprintf("%s.%s", h.name, name) + } + + return &HclogLoggerAdapter{ + eventCtx: h.eventCtx, + l: new(sync.RWMutex), + level: h.level, + name: newName, + withArgs: newArgs, + } +} + +// Create a logger that will prepend the name string on the front of all messages. +// This sets the name of the logger to the value directly, unlike Named which honor +// the current name as well. +func (h *HclogLoggerAdapter) ResetNamed(name string) hclog.Logger { + h.l.Lock() + defer h.l.Unlock() + var newArgs []interface{} + if len(h.withArgs) > 0 { + newArgs = make([]interface{}, len(h.withArgs)) + copy(newArgs, h.withArgs) + } + return &HclogLoggerAdapter{ + eventCtx: h.eventCtx, + l: new(sync.RWMutex), + level: h.level, + name: name, + withArgs: newArgs, + } +} + +// Updates the level. This should affect all related loggers as well, +// unless they were created with IndependentLevels. If an +// implementation cannot update the level on the fly, it should no-op. +// +// This implementation is a no-op currently. +func (h *HclogLoggerAdapter) SetLevel(_ hclog.Level) {} + +// Return a value that conforms to the stdlib log.Logger interface +// +// This implementation does not currently support this and returns nil. +func (h *HclogLoggerAdapter) StandardLogger(opts *hclog.StandardLoggerOptions) *log.Logger { + return nil +} + +// Return a value that conforms to io.Writer, which can be passed into log.SetOutput() +// +// This implementation does not currently support this and returns nil. +func (h *HclogLoggerAdapter) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer { + return nil +} diff --git a/internal/observability/event/hclog_event_adapter_test.go b/internal/observability/event/hclog_event_adapter_test.go new file mode 100644 index 0000000000..c432c17a57 --- /dev/null +++ b/internal/observability/event/hclog_event_adapter_test.go @@ -0,0 +1,186 @@ +package event + +import ( + "bytes" + "context" + "sync" + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventer_HclogLoggerAdapter(t *testing.T) { + t.Parallel() + require := require.New(t) + ctx := context.Background() + buffer := new(bytes.Buffer) + eventerConfig := EventerConfig{ + AuditEnabled: true, + ObservationsEnabled: true, + SysEventsEnabled: true, + Sinks: []*SinkConfig{ + { + Name: "test-sink", + EventTypes: []Type{EveryType}, + Format: TextHclogSinkFormat, + Type: WriterSink, + WriterConfig: &WriterSinkTypeConfig{ + Writer: buffer, + }, + }, + }, + } + testLock := &sync.Mutex{} + testLogger := hclog.New(&hclog.LoggerOptions{ + Mutex: testLock, + Name: "test", + }) + eventer, err := NewEventer( + testLogger, + testLock, + "TestEventer_HclogLoggerAdapter", + eventerConfig, + ) + require.NoError(err) + + // This test sends a series of events through the hclog adapter and + // validates that we see the ones we expect to see on the other side. It + // also tests various features such as Named and With to ensure they turn + // into values on the other side. + logger, err := NewHclogLogger(ctx, eventer, WithHclogLevel(hclog.Info)) + require.NoError(err) + + tests := []struct { + name string + plainLog bool + level hclog.Level + shouldNotLog bool + logOverride hclog.Logger + input string + outputSubstrs []string + }{ + { + name: "over-level-error", + level: hclog.Error, + input: "over-error", + outputSubstrs: []string{"msg=over-error"}, + }, + { + name: "over-level-warn", + level: hclog.Warn, + input: "over-warn", + outputSubstrs: []string{"msg=over-warn"}, + }, + { + name: "at-level", + level: hclog.Info, + input: "at", + outputSubstrs: []string{"msg=at"}, + }, + { + name: "under-level-debug", + level: hclog.Debug, + input: "under-debug", + shouldNotLog: true, + }, + { + name: "under-level-trace", + level: hclog.Trace, + input: "under-trace", + shouldNotLog: true, + }, + { + name: "plain-under-trace", + plainLog: true, + level: hclog.Trace, + input: "plain-under-trace", + shouldNotLog: true, + }, + { + name: "plain-at", + plainLog: true, + level: hclog.Info, + input: "plain-at", + outputSubstrs: []string{"msg=plain-at"}, + }, + { + name: "plain-over-warn", + plainLog: true, + level: hclog.Warn, + input: "plain-over-warn", + outputSubstrs: []string{"msg=plain-over-warn"}, + }, + { + name: "with-named", + level: hclog.Info, + logOverride: logger.Named("named-logger"), + input: "named-input", + outputSubstrs: []string{"msg=named-input", "@original-log-name=named-logger"}, + }, + { + name: "sub-named", + level: hclog.Info, + logOverride: logger.Named("named-logger").Named("subnamed-logger"), + input: "subnamed-input", + outputSubstrs: []string{"msg=subnamed-input", "@original-log-name=named-logger.subnamed-logger"}, + }, + { + name: "reset-named", + level: hclog.Info, + logOverride: logger.Named("named-logger").ResetNamed("reset-logger"), + input: "reset-input", + outputSubstrs: []string{"msg=reset-input", "@original-log-name=reset-logger"}, + }, + { + name: "with-params", + level: hclog.Info, + logOverride: logger.With("with", "params"), + input: "with-params", + outputSubstrs: []string{"msg=with-params", "with=params"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + buffer.Reset() + loggerToUse := logger + if tt.logOverride != nil { + loggerToUse = tt.logOverride + } + + switch tt.plainLog { + case false: + switch tt.level { + case hclog.Error: + assert.True(loggerToUse.IsError() == !tt.shouldNotLog) + loggerToUse.Error(tt.input) + case hclog.Warn: + assert.True(loggerToUse.IsWarn() == !tt.shouldNotLog) + loggerToUse.Warn(tt.input) + case hclog.Info: + assert.True(loggerToUse.IsInfo() == !tt.shouldNotLog) + loggerToUse.Info(tt.input) + case hclog.Debug: + assert.True(loggerToUse.IsDebug() == !tt.shouldNotLog) + loggerToUse.Debug(tt.input) + case hclog.Trace: + assert.True(loggerToUse.IsTrace() == !tt.shouldNotLog) + loggerToUse.Trace(tt.input) + } + default: + loggerToUse.Log(tt.level, tt.input) + } + + switch tt.shouldNotLog { + case true: + assert.Len(buffer.String(), 0) + default: + for _, substr := range tt.outputSubstrs { + assert.Contains(buffer.String(), substr) + } + } + }) + } +} diff --git a/internal/observability/event/options.go b/internal/observability/event/options.go index 6be6ad437c..44740c458f 100644 --- a/internal/observability/event/options.go +++ b/internal/observability/event/options.go @@ -4,6 +4,7 @@ import ( "net/url" "time" + "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping/v2" ) @@ -42,6 +43,11 @@ type options struct { withSchema *url.URL withAuditWrapper wrapping.Wrapper withFilterOperations AuditFilterOperations + withGating bool + withNoGateLocking bool + + // These options are related to the hclog adapter + withHclogLevel hclog.Level withBroker broker // test only option withAuditSink bool // test only option @@ -196,3 +202,24 @@ func WithFilterOperations(fop AuditFilterOperations) Option { o.withFilterOperations = fop } } + +// WithHclogLevel is an option to specify a log level if using the adapter +func WithHclogLevel(with hclog.Level) Option { + return func(o *options) { + o.withHclogLevel = with + } +} + +// WithGating starts the eventer in gated mode +func WithGating(with bool) Option { + return func(o *options) { + o.withGating = with + } +} + +// WithNoGateLocking is used when trawling through the existing queue to ensure we don't deadlock +func WithNoGateLocking(with bool) Option { + return func(o *options) { + o.withNoGateLocking = with + } +} diff --git a/internal/observability/event/options_test.go b/internal/observability/event/options_test.go index 13852a5170..43aed98150 100644 --- a/internal/observability/event/options_test.go +++ b/internal/observability/event/options_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping/v2" "github.com/hashicorp/go-kms-wrapping/v2/aead" "github.com/stretchr/testify/assert" @@ -171,6 +172,27 @@ func Test_GetOpts(t *testing.T) { testOpts.withFilterOperations = overrides assert.Equal(opts, testOpts) }) + t.Run("WithHclogLevel", func(t *testing.T) { + assert := assert.New(t) + opts := getOpts(WithHclogLevel(hclog.Info)) + testOpts := getDefaultOptions() + testOpts.withHclogLevel = hclog.Info + assert.Equal(opts, testOpts) + }) + t.Run("withEventGating", func(t *testing.T) { + assert := assert.New(t) + testOpts := getDefaultOptions() + assert.False(testOpts.withGating) + opts := getOpts(WithGating(true)) + assert.True(opts.withGating) + }) + t.Run("withNoGateLocking", func(t *testing.T) { + assert := assert.New(t) + testOpts := getDefaultOptions() + assert.False(testOpts.withNoGateLocking) + opts := getOpts(WithNoGateLocking(true)) + assert.True(opts.withNoGateLocking) + }) } // testWrapper initializes an AEAD wrapping.Wrapper for testing. Note: this diff --git a/internal/observability/event/sink_config.go b/internal/observability/event/sink_config.go index 5d97b7b3bd..5d2ec8bfc1 100644 --- a/internal/observability/event/sink_config.go +++ b/internal/observability/event/sink_config.go @@ -2,6 +2,7 @@ package event import ( "fmt" + "io" "time" ) @@ -14,9 +15,10 @@ type SinkConfig struct { AllowFilters []string `hcl:"allow_filters"` // AllowFilters define a set predicates for including an event in the sink. If any filter matches, the event will be included. The filter should be in a format supported by hashicorp/go-bexpr. DenyFilters []string `hcl:"deny_filters"` // DenyFilters define a set predicates for excluding an event in the sink. If any filter matches, the event will be excluded. The filter should be in a format supported by hashicorp/go-bexpr. Format SinkFormat `hcl:"format"` // Format defines the format for the sink (JSONSinkFormat or TextSinkFormat). - Type SinkType `hcl:"type"` // Type defines the type of sink (StderrSink or FileSink). + Type SinkType `hcl:"type"` // Type defines the type of sink (StderrSink, FileSink, or WriterSink). StderrConfig *StderrSinkTypeConfig `hcl:"stderr"` // StderrConfig defines parameters for a stderr output. FileConfig *FileSinkTypeConfig `hcl:"file"` // FileConfig defines parameters for a file output. + WriterConfig *WriterSinkTypeConfig `hcl:"-"` // WriterConfig defines parameters for an io.Writer output. This is not available via HCL. AuditConfig *AuditConfig `hcl:"audit_config"` // AuditConfig defines optional parameters for audit events (if EventTypes contains audit) } @@ -36,6 +38,9 @@ func (sc *SinkConfig) Validate() error { if sc.FileConfig != nil { foundSinkTypeConfigs++ } + if sc.WriterConfig != nil { + foundSinkTypeConfigs++ + } if foundSinkTypeConfigs > 1 { return fmt.Errorf("%s: too many sink type config blocks: %w", op, ErrInvalidParameter) } @@ -56,6 +61,13 @@ func (sc *SinkConfig) Validate() error { if sc.FileConfig.FileName == "" { return fmt.Errorf("%s: missing file name: %w", op, ErrInvalidParameter) } + case WriterSink: + if sc.WriterConfig == nil { + return fmt.Errorf(`%s: missing writer config: %w`, op, ErrInvalidParameter) + } + if sc.WriterConfig.Writer == nil { + return fmt.Errorf("%s: missing writer: %w", op, ErrInvalidParameter) + } } if sc.Name == "" { return fmt.Errorf("%s: missing sink name: %w", op, ErrInvalidParameter) @@ -95,6 +107,11 @@ type FileSinkTypeConfig struct { RotateMaxFiles int `hcl:"rotate_max_files" mapstructure:"rotate_max_files"` // RotateMaxFiles defines how may historical rotated files should be kept for a FileSink } +// WriterSinkTypeConfig contains configuration structures for writer sink types +type WriterSinkTypeConfig struct { + Writer io.Writer `hcl:"-" mapstructure:"-"` // The writer to write to +} + // FilterType defines a type for filters (allow or deny) type FilterType string diff --git a/internal/observability/event/sink_type.go b/internal/observability/event/sink_type.go index a622e0c3e5..71782a4e2f 100644 --- a/internal/observability/event/sink_type.go +++ b/internal/observability/event/sink_type.go @@ -7,14 +7,15 @@ import ( const ( StderrSink SinkType = "stderr" // StderrSink is written to stderr FileSink SinkType = "file" // FileSink is written to a file + WriterSink SinkType = "writer" // WriterSink is written to an io.Writer ) -type SinkType string // SinkType defines the type of sink in a config stanza (file, stderr) +type SinkType string // SinkType defines the type of sink in a config stanza (file, stderr, writer) func (t SinkType) Validate() error { const op = "event.(SinkType).validate" switch t { - case StderrSink, FileSink: + case StderrSink, FileSink, WriterSink: return nil default: return fmt.Errorf("%s: '%s' is not a valid sink type: %w", op, t, ErrInvalidParameter) diff --git a/internal/servers/controller/controller.go b/internal/servers/controller/controller.go index 9c48b5bbd5..9c81c2f5a8 100644 --- a/internal/servers/controller/controller.go +++ b/internal/servers/controller/controller.go @@ -125,7 +125,14 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { } } + var pluginLogger hclog.Logger for _, enabledPlugin := range c.enabledPlugins { + if pluginLogger == nil { + pluginLogger, err = event.NewHclogLogger(ctx, c.conf.Server.Eventer) + if err != nil { + return nil, fmt.Errorf("error creating host catalog plugin logger: %w", err) + } + } switch enabledPlugin { case base.EnabledPluginHostLoopback: plg := pluginhost.NewWrappingPluginClient(pluginhost.NewLoopbackPlugin()) @@ -145,6 +152,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { pluginutil.WithPluginExecutionDirectory(conf.RawConfig.Plugins.ExecutionDir), pluginutil.WithPluginsFilesystem(host_plugin_assets.HostPluginPrefix, host_plugin_assets.FileSystem()), ), + external_host_plugins.WithLogger(pluginLogger.Named(pluginType)), ) if err != nil { return nil, fmt.Errorf("error creating %s host plugin: %w", pluginType, err)