diff --git a/internal/cmd/base/option.go b/internal/cmd/base/option.go index 4ab3b754bf..d5c4735e00 100644 --- a/internal/cmd/base/option.go +++ b/internal/cmd/base/option.go @@ -39,6 +39,7 @@ type Options struct { withAttributeFieldPrefix string withStatusCode int withHostPlugin func() (string, plugin.HostPluginServiceClient) + withEventGating bool } func getDefaultOptions() Options { @@ -180,3 +181,10 @@ func WithHostPlugin(pluginId string, plg plugin.HostPluginServiceClient) Option } } } + +// WithEventGating starts the eventer in gated mode +func WithEventGating(with bool) Option { + return func(o *Options) { + o.withEventGating = with + } +} diff --git a/internal/cmd/base/option_test.go b/internal/cmd/base/option_test.go index cd082ea3ca..ef4d913fe0 100644 --- a/internal/cmd/base/option_test.go +++ b/internal/cmd/base/option_test.go @@ -117,4 +117,11 @@ func Test_GetOpts(t *testing.T) { testOpts.withDialect = "test-dialect" assert.Equal(opts, testOpts) }) + t.Run("withEventGating", func(t *testing.T) { + assert := assert.New(t) + testOpts := getDefaultOptions() + assert.False(testOpts.withEventGating) + opts := getOpts(WithEventGating(true)) + assert.True(opts.withEventGating) + }) } diff --git a/internal/cmd/base/server_test.go b/internal/cmd/base/server_test.go index 37dd0885a6..a973036792 100644 --- a/internal/cmd/base/server_test.go +++ b/internal/cmd/base/server_test.go @@ -59,6 +59,8 @@ func TestServer_SetupKMSes(t *testing.T) { purposes: []string{globals.KmsPurposeRoot, globals.KmsPurposeRecovery, globals.KmsPurposeWorkerAuth, globals.KmsPurposeConfig}, }, } + logger := hclog.Default() + serLock := new(sync.Mutex) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { assert, require := assert.New(t), require.New(t) @@ -70,8 +72,9 @@ func TestServer_SetupKMSes(t *testing.T) { }, }, } - s := NewServer(&Command{}) - err := s.SetupKMSes(context.Background(), cli.NewMockUi(), &config.Config{SharedConfig: conf}) + s := NewServer(&Command{Context: context.Background()}) + require.NoError(s.SetupEventing(logger, serLock, "setup-kms-testing")) + err := s.SetupKMSes(s.Context, cli.NewMockUi(), &config.Config{SharedConfig: conf}) if tt.wantErrContains != "" { require.Error(err) diff --git a/internal/cmd/base/servers.go b/internal/cmd/base/servers.go index 9e21f3cfbc..738fd92c76 100644 --- a/internal/cmd/base/servers.go +++ b/internal/cmd/base/servers.go @@ -205,7 +205,8 @@ func (b *Server) SetupEventing(logger hclog.Logger, serializationLock *sync.Mute serializationLock, serverName, *opts.withEventerConfig, - event.WithAuditWrapper(opts.withEventWrapper)) + event.WithAuditWrapper(opts.withEventWrapper), + event.WithGating(opts.withEventGating)) if err != nil { return berrors.WrapDeprecated(err, op, berrors.WithMsg("unable to create eventer")) } @@ -283,11 +284,15 @@ func (b *Server) SetupLogging(flagLogLevel, flagLogFormat, configLogLevel, confi return nil } -func (b *Server) ReleaseLogGate() { +func (b *Server) ReleaseLogGate() error { // Release the log gate. b.Logger.(hclog.OutputResettable).ResetOutputWithFlush(&hclog.LoggerOptions{ Output: b.logOutput, }, b.GatedWriter) + if b.Eventer != nil { + return b.Eventer.ReleaseGate() + } + return nil } func (b *Server) StorePidFile(pidPath string) error { @@ -482,6 +487,7 @@ func (b *Server) SetupListeners(ui cli.Ui, config *configutil.SharedConfig, allo // and sends each off to configutil to instantiate a wrapper. func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Config) error { sharedConfig := config.SharedConfig + var pluginLogger hclog.Logger var err error for _, kms := range sharedConfig.Seals { for _, purpose := range kms.Purpose { @@ -498,6 +504,13 @@ func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Confi return fmt.Errorf("Unknown KMS purpose %q", kms.Purpose) } + if pluginLogger == nil { + pluginLogger, err = event.NewHclogLogger(b.Context, b.Eventer) + if err != nil { + return fmt.Errorf("Error creating KMS plugin logger: %w", err) + } + } + // This can be modified by configutil so store the original value origPurpose := kms.Purpose kms.Purpose = []string{purpose} @@ -509,7 +522,9 @@ func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Confi &b.Info, configutil.WithPluginOptions( pluginutil.WithPluginsMap(kms_plugin_assets.BuiltinKmsPlugins()), - pluginutil.WithPluginsFilesystem(kms_plugin_assets.KmsPluginPrefix, kms_plugin_assets.FileSystem())), + pluginutil.WithPluginsFilesystem(kms_plugin_assets.KmsPluginPrefix, kms_plugin_assets.FileSystem()), + ), + configutil.WithLogger(pluginLogger.Named(kms.Type).With("purpose", purpose)), ) if wrapperConfigError != nil { return fmt.Errorf( @@ -551,7 +566,6 @@ func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Confi default: return fmt.Errorf("KMS purpose of %q is unknown", purpose) } - } } diff --git a/internal/cmd/commands/database/migrate.go b/internal/cmd/commands/database/migrate.go index 7db8e2ccd5..2330d2343d 100644 --- a/internal/cmd/commands/database/migrate.go +++ b/internal/cmd/commands/database/migrate.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/boundary/internal/cmd/base" "github.com/hashicorp/boundary/internal/cmd/config" "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/observability/event" host_plugin_assets "github.com/hashicorp/boundary/plugins/host" kms_plugin_assets "github.com/hashicorp/boundary/plugins/kms" external_host_plugins "github.com/hashicorp/boundary/sdk/plugins/host" @@ -138,13 +139,49 @@ func (c *MigrateCommand) Run(args []string) (retCode int) { }() } + dialect := "postgres" + + c.srv = base.NewServer(&base.Command{UI: c.UI}) + + if err := c.srv.SetupLogging(c.flagLogLevel, c.flagLogFormat, c.Config.LogLevel, c.Config.LogFormat); err != nil { + c.UI.Error(err.Error()) + return base.CommandCliError + } + var serverName string + switch { + case c.Config.Controller == nil: + serverName = "boundary-database-migrate" + default: + if _, err := c.Config.Controller.InitNameIfEmpty(); err != nil { + c.UI.Error(err.Error()) + return base.CommandCliError + } + serverName = c.Config.Controller.Name + "/boundary-database-migrate" + } + if err := c.srv.SetupEventing( + c.srv.Logger, + c.srv.StderrLock, + serverName, + base.WithEventerConfig(c.Config.Eventing)); err != nil { + c.UI.Error(err.Error()) + return base.CommandCliError + } + + pluginLogger, err := event.NewHclogLogger(c.Context, c.srv.Eventer) + if err != nil { + c.UI.Error(fmt.Sprintf("Error creating host catalog plugin logger: %v", err)) + return base.CommandCliError + } + _, awsCleanup, err := external_host_plugins.CreateHostPlugin( c.Context, "aws", external_host_plugins.WithPluginOptions( pluginutil.WithPluginExecutionDirectory(c.Config.Plugins.ExecutionDir), pluginutil.WithPluginsFilesystem(host_plugin_assets.HostPluginPrefix, host_plugin_assets.FileSystem()), - )) + ), + external_host_plugins.WithLogger(pluginLogger.Named("aws")), + ) if err != nil { c.UI.Error(fmt.Errorf("Error creating dynamic host plugin: %w", err).Error()) c.UI.Warn(base.WrapAtLength( @@ -176,30 +213,6 @@ plugins { return base.CommandCliError } - dialect := "postgres" - - c.srv = base.NewServer(&base.Command{UI: c.UI}) - - if err := c.srv.SetupLogging(c.flagLogLevel, c.flagLogFormat, c.Config.LogLevel, c.Config.LogFormat); err != nil { - c.UI.Error(err.Error()) - return base.CommandCliError - } - var serverName string - switch { - case c.Config.Controller == nil: - serverName = "boundary-database-migrate" - default: - if _, err := c.Config.Controller.InitNameIfEmpty(); err != nil { - c.UI.Error(err.Error()) - return base.CommandCliError - } - serverName = c.Config.Controller.Name + "/boundary-database-migrate" - } - if err := c.srv.SetupEventing(c.srv.Logger, c.srv.StderrLock, serverName, base.WithEventerConfig(c.Config.Eventing)); err != nil { - c.UI.Error(err.Error()) - return base.CommandCliError - } - // If mlockall(2) isn't supported, show a warning. We disable this in dev // because it is quite scary to see when first using Boundary. We also disable // this if the user has explicitly disabled mlock in configuration. diff --git a/internal/cmd/commands/dev/dev.go b/internal/cmd/commands/dev/dev.go index a1a5647509..80a22b60ef 100644 --- a/internal/cmd/commands/dev/dev.go +++ b/internal/cmd/commands/dev/dev.go @@ -514,7 +514,8 @@ func (c *Command) Run(args []string) int { c.StderrLock, serverName, base.WithEventerConfig(c.Config.Eventing), - base.WithEventFlags(eventFlags)); err != nil { + base.WithEventFlags(eventFlags), + base.WithEventGating(true)); err != nil { c.UI.Error(err.Error()) return base.CommandCliError } @@ -596,7 +597,10 @@ func (c *Command) Run(args []string) int { } c.PrintInfo(c.UI) - c.ReleaseLogGate() + if err := c.ReleaseLogGate(); err != nil { + c.UI.Error(fmt.Errorf("Error releasing event gate: %w", err).Error()) + return base.CommandCliError + } { c.EnabledPlugins = append(c.EnabledPlugins, base.EnabledPluginHostAws, base.EnabledPluginHostAzure) diff --git a/internal/cmd/commands/server/server.go b/internal/cmd/commands/server/server.go index 15910852ab..5cc12688ee 100644 --- a/internal/cmd/commands/server/server.go +++ b/internal/cmd/commands/server/server.go @@ -164,7 +164,8 @@ func (c *Command) Run(args []string) int { if err := c.SetupEventing(c.Logger, c.StderrLock, strings.Join(serverNames, "/"), - base.WithEventerConfig(c.Config.Eventing)); err != nil { + base.WithEventerConfig(c.Config.Eventing), + base.WithEventGating(true)); err != nil { c.UI.Error(err.Error()) return base.CommandUserError } @@ -447,7 +448,10 @@ func (c *Command) Run(args []string) int { }() c.PrintInfo(c.UI) - c.ReleaseLogGate() + if err := c.ReleaseLogGate(); err != nil { + c.UI.Error(fmt.Errorf("Error releasing event gate: %w", err).Error()) + return base.CommandCliError + } if c.Config.Controller != nil { c.EnabledPlugins = append(c.EnabledPlugins, base.EnabledPluginHostAws, base.EnabledPluginHostAzure) diff --git a/internal/observability/event/eventer.go b/internal/observability/event/eventer.go index 663eba6cce..f12cd9d360 100644 --- a/internal/observability/event/eventer.go +++ b/internal/observability/event/eventer.go @@ -22,6 +22,8 @@ import ( "github.com/hashicorp/eventlogger/sinks/writer" "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping/v2" + "github.com/hashicorp/go-multierror" + "go.uber.org/atomic" "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -59,6 +61,13 @@ type broker interface { RegisterPipeline(def eventlogger.Pipeline) error } +// queuedEvent stores an event and the context that was associated with it when +// gating +type queuedEvent struct { + ctx context.Context + event interface{} +} + // Eventer provides a method to send events to pipelines of sinks type Eventer struct { broker broker @@ -69,6 +78,13 @@ type Eventer struct { observationPipelines []pipeline errPipelines []pipeline auditWrapperNodes []interface{} + + // Gating is used to delay output of events until after we have a chance to + // render startup info, similar to what was done for hclog before eventing + // supplanted it. It affects only error and system events. + gated atomic.Bool + gatedQueue []*queuedEvent + gatedQueueLock *sync.Mutex } type pipeline struct { @@ -189,6 +205,7 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri } e := &Eventer{ + gatedQueueLock: new(sync.Mutex), logger: log, conf: c, broker: b, @@ -199,6 +216,10 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri e.broker.StopTimeAt(opts.withNow) } + if opts.withGating { + e.gated.Store(true) + } + // serializedStderr will be shared among all StderrSinks so their output is not // interwoven serializedStderr := serializedWriter{ @@ -253,6 +274,17 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri return nil, fmt.Errorf("%s: %w", op, err) } sinkId = eventlogger.NodeID(id) + case WriterSink: + wsc := s.WriterConfig + sinkNode = &writer.Sink{ + Format: string(s.Format), + Writer: wsc.Writer, + } + id, err := NewId("writer") + if err != nil { + return nil, fmt.Errorf("%s: %w", op, err) + } + sinkId = eventlogger.NodeID(id) default: return nil, fmt.Errorf("%s: unknown sink type %s", op, s.Type) } @@ -558,7 +590,7 @@ func (e *Eventer) RotateAuditWrapper(ctx context.Context, newWrapper wrapping.Wr } // writeObservation writes/sends an Observation event. -func (e *Eventer) writeObservation(ctx context.Context, event *observation) error { +func (e *Eventer) writeObservation(ctx context.Context, event *observation, _ ...Option) error { const op = "event.(Eventer).writeObservation" if event == nil { return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter) @@ -584,11 +616,23 @@ func (e *Eventer) writeObservation(ctx context.Context, event *observation) erro } // writeError writes/sends an Err event -func (e *Eventer) writeError(ctx context.Context, event *err) error { +func (e *Eventer) writeError(ctx context.Context, event *err, opt ...Option) error { const op = "event.(Eventer).writeError" if event == nil { return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter) } + opts := getOpts(opt...) + if e.gated.Load() && !opts.withNoGateLocking { + e.gatedQueueLock.Lock() + defer e.gatedQueueLock.Unlock() + if e.gated.Load() { // validate that it's still gated and we haven't just drained the queue + e.gatedQueue = append(e.gatedQueue, &queuedEvent{ + ctx: ctx, + event: event, + }) + return nil + } + } err := e.retrySend(ctx, stdRetryCount, expBackoff{}, func() (eventlogger.Status, error) { return e.broker.Send(ctx, eventlogger.EventType(ErrorType), event) }) @@ -600,11 +644,23 @@ func (e *Eventer) writeError(ctx context.Context, event *err) error { } // writeSysEvent writes/sends an sysEvent event -func (e *Eventer) writeSysEvent(ctx context.Context, event *sysEvent) error { +func (e *Eventer) writeSysEvent(ctx context.Context, event *sysEvent, opt ...Option) error { const op = "event.(Eventer).writeSysEvent" if event == nil { return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter) } + opts := getOpts(opt...) + if e.gated.Load() && !opts.withNoGateLocking { + e.gatedQueueLock.Lock() + defer e.gatedQueueLock.Unlock() + if e.gated.Load() { // validate that it's still gated and we haven't just drained the queue + e.gatedQueue = append(e.gatedQueue, &queuedEvent{ + ctx: ctx, + event: event, + }) + return nil + } + } err := e.retrySend(ctx, stdRetryCount, expBackoff{}, func() (eventlogger.Status, error) { return e.broker.Send(ctx, eventlogger.EventType(SystemType), event) }) @@ -616,7 +672,7 @@ func (e *Eventer) writeSysEvent(ctx context.Context, event *sysEvent) error { } // writeAudit writes/send an audit event -func (e *Eventer) writeAudit(ctx context.Context, event *audit) error { +func (e *Eventer) writeAudit(ctx context.Context, event *audit, _ ...Option) error { const op = "event.(Eventer).writeAudit" if event == nil { return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter) @@ -655,6 +711,52 @@ func (e *Eventer) FlushNodes(ctx context.Context) error { return nil } +// ReleaseGate releases queued events. If any event isn't successfully written, +// it remains in the queue and we could try a flush later. +func (e *Eventer) ReleaseGate() error { + const op = "event.(Eventer).ReleaseGate" + e.gatedQueueLock.Lock() + defer e.gatedQueueLock.Unlock() + + // Don't gate anymore. By the time we hit this we will have the lock and be + // guaranteed that nothing else will add onto the queue while we drain it. + // Logic in the functions that add to the queue validate this boolean again + // after they acquire the lock so running this function should fully drain + // the events except in the case of an error. + e.gated.Store(false) + + // Don't do anything if we're not gated and are fully drained + if len(e.gatedQueue) == 0 && !e.gated.Load() { + return nil + } + + var totalErrs *multierror.Error + for _, qe := range e.gatedQueue { + var writeErr error + if qe == nil { + continue // we may have already sent this but gotten errors later + } + var queuedOp string + switch t := qe.event.(type) { + case *sysEvent: + queuedOp = "system" + writeErr = e.writeSysEvent(qe.ctx, t, WithNoGateLocking(true)) + case *err: + queuedOp = "error" + writeErr = e.writeError(qe.ctx, t, WithNoGateLocking(true)) + default: + // Have no idea what this is and shouldn't have gotten in here to + // begin with, so just continue, and log it + writeErr = fmt.Errorf("unknown event type %T", t) + } + if writeErr != nil { + totalErrs = multierror.Append(fmt.Errorf("in %s, error sending queued %s event: %w", op, queuedOp, writeErr)) + } + } + e.gatedQueue = nil + return totalErrs.ErrorOrNil() +} + // StandardLogger will create *log.Logger that will emit events through this // Logger. This allows packages that require the stdlib log to emit events // instead. diff --git a/internal/observability/event/eventer_gate_test.go b/internal/observability/event/eventer_gate_test.go new file mode 100644 index 0000000000..a2e4c5a362 --- /dev/null +++ b/internal/observability/event/eventer_gate_test.go @@ -0,0 +1,166 @@ +package event + +import ( + "bytes" + "context" + "fmt" + "os" + "strings" + "sync" + "testing" + + "github.com/hashicorp/boundary/globals" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-secure-stdlib/strutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventer_Gating(t *testing.T) { + t.Parallel() + require := require.New(t) + + // FIXME: Remove this when no longer needed: + os.Setenv(globals.BOUNDARY_DEVELOPER_ENABLE_EVENTS, "true") + + buffer := new(bytes.Buffer) + eventerConfig := EventerConfig{ + AuditEnabled: true, + ObservationsEnabled: true, + SysEventsEnabled: true, + Sinks: []*SinkConfig{ + { + Name: "test-sink", + EventTypes: []Type{EveryType}, + Format: TextHclogSinkFormat, + Type: WriterSink, + WriterConfig: &WriterSinkTypeConfig{ + Writer: buffer, + }, + }, + }, + } + testLock := &sync.Mutex{} + testLogger := hclog.New(&hclog.LoggerOptions{ + Mutex: testLock, + Name: "test", + }) + eventer, err := NewEventer( + testLogger, + testLock, + "TestEventer_Gating", + eventerConfig, + WithGating(true), + ) + require.NoError(err) + + ctx, err := NewEventerContext(context.Background(), eventer) + require.NoError(err) + + // This test sends a series of events of different types. The tests are + // meant to be in order as we want to send some that should be gated and + // some that shouldn't and ensure correct behavior at each step. + var totalEvents int + tests := []struct { + name string + eventFn func() + expectedGatedEvents int + }{ + { + name: "system-event-1", + eventFn: func() { + WriteSysEvent(ctx, "system-event-1", "system-event-1") + totalEvents++ + }, + expectedGatedEvents: 1, + }, + { + name: "system-event-2", + eventFn: func() { + WriteSysEvent(ctx, "system-event-2", "system-event-2") + totalEvents++ + }, + expectedGatedEvents: 2, + }, + { + name: "audit-1", + eventFn: func() { + require.NoError(WriteAudit(ctx, "audit-1")) + totalEvents++ + }, + expectedGatedEvents: 2, + }, + { + name: "observation-1", + eventFn: func() { + require.NoError(WriteObservation(ctx, "observation-1", WithId("observation-1"), WithHeader("foo", "bar"))) + totalEvents++ + }, + expectedGatedEvents: 2, + }, + { + name: "error-1", + eventFn: func() { + WriteError(ctx, "error-1", fmt.Errorf("error-1")) + totalEvents++ + }, + expectedGatedEvents: 3, + }, + { + name: "error-2", + eventFn: func() { + WriteError(ctx, "error-2", fmt.Errorf("error-2")) + totalEvents++ + }, + expectedGatedEvents: 4, + }, + // This should result in all events being flushed so none gated + { + name: "release-gate", + eventFn: func() { + require.NoError(eventer.ReleaseGate()) + }, + expectedGatedEvents: 0, + }, + // From here on out we're verifying that all events of all types go through + { + name: "system-event-3", + eventFn: func() { + WriteSysEvent(ctx, "system-event-3", "system-event-3") + totalEvents++ + }, + expectedGatedEvents: 0, + }, + { + name: "audit-2", + eventFn: func() { + require.NoError(WriteAudit(ctx, "audit-2")) + totalEvents++ + }, + expectedGatedEvents: 0, + }, + { + name: "observation-2", + eventFn: func() { + require.NoError(WriteObservation(ctx, "observation-2", WithId("observation-2"), WithHeader("foo", "bar"))) + totalEvents++ + }, + expectedGatedEvents: 0, + }, + { + name: "error-2", + eventFn: func() { + WriteError(ctx, "error-2", fmt.Errorf("error-2")) + totalEvents++ + }, + expectedGatedEvents: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + tt.eventFn() + assert.Len(strutil.RemoveEmpty(strings.Split(buffer.String(), "\n")), totalEvents-tt.expectedGatedEvents, buffer.String()) + }) + } +} diff --git a/internal/observability/event/eventer_test.go b/internal/observability/event/eventer_test.go index e04ddae5d3..7146289c68 100644 --- a/internal/observability/event/eventer_test.go +++ b/internal/observability/event/eventer_test.go @@ -97,8 +97,9 @@ func Test_InitSysEventer(t *testing.T) { lock: testLock, serverName: "success-with-config", want: &Eventer{ - logger: testLogger, - conf: testConfig.EventerConfig, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), + conf: testConfig.EventerConfig, }, }, { @@ -108,7 +109,8 @@ func Test_InitSysEventer(t *testing.T) { lock: testLock, serverName: "success-with-default-config", want: &Eventer{ - logger: testLogger, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), conf: EventerConfig{ Sinks: []*SinkConfig{ { @@ -435,7 +437,8 @@ func Test_NewEventer(t *testing.T) { logger: testLogger, serverName: "valid-audit-config-with-wrapper", want: &Eventer{ - logger: testLogger, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), conf: EventerConfig{ AuditEnabled: true, Sinks: []*SinkConfig{ @@ -521,7 +524,8 @@ func Test_NewEventer(t *testing.T) { lock: testLock, serverName: "success-with-default-config", want: &Eventer{ - logger: testLogger, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), conf: EventerConfig{ Sinks: []*SinkConfig{ { @@ -561,8 +565,9 @@ func Test_NewEventer(t *testing.T) { lock: testLock, serverName: "testSetup", want: &Eventer{ - logger: testLogger, - conf: testSetup.EventerConfig, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), + conf: testSetup.EventerConfig, }, wantRegistered: []string{ "cloudevents", // stderr @@ -603,8 +608,9 @@ func Test_NewEventer(t *testing.T) { lock: testLock, serverName: "testSetup-with-all-opts", want: &Eventer{ - logger: testLogger, - conf: testSetupWithOpts.EventerConfig, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), + conf: testSetupWithOpts.EventerConfig, }, wantRegistered: []string{ "cloudevents", // stderr @@ -657,8 +663,9 @@ func Test_NewEventer(t *testing.T) { lock: testLock, serverName: "testSetup", want: &Eventer{ - logger: testLogger, - conf: testHclogSetup.EventerConfig, + logger: testLogger, + gatedQueueLock: new(sync.Mutex), + conf: testHclogSetup.EventerConfig, }, wantRegistered: []string{ "hclog-text", // stderr diff --git a/internal/observability/event/hclog_event_adapter.go b/internal/observability/event/hclog_event_adapter.go new file mode 100644 index 0000000000..3162768cd4 --- /dev/null +++ b/internal/observability/event/hclog_event_adapter.go @@ -0,0 +1,237 @@ +package event + +import ( + "context" + "fmt" + "io" + "log" + "sync" + + "github.com/hashicorp/go-hclog" +) + +// HclogLoggerAdapter is used to provide an hclog-style interface to code that +// cannot natively handle eventing. Currently, all log lines are written as +// system events. Note that this is not meant for high throughput; some +// potential optimizations (such as using atomic values for name and such) are +// not current implemented. Additionally, some functions (such as fetching a +// stdlib logger/writer) are simply not supported right now. +type HclogLoggerAdapter struct { + eventCtx context.Context + l *sync.RWMutex + level hclog.Level + name string + withArgs []interface{} +} + +// Ensure that we are implementing Logger +var _ hclog.Logger = (*HclogLoggerAdapter)(nil) + +// NewHclogLogger creates a new hclog.Logger-compatible implementation that +// outputs to events +func NewHclogLogger(ctx context.Context, e *Eventer, opt ...Option) (hclog.Logger, error) { + const op = "event.HclogLogger" + eventCtx, err := NewEventerContext(ctx, e) + if err != nil { + return nil, fmt.Errorf("%s: %w", op, err) + } + opts := getOpts(opt...) + return &HclogLoggerAdapter{ + eventCtx: eventCtx, + l: new(sync.RWMutex), + level: opts.withHclogLevel, + }, nil +} + +// Args are alternating key, val pairs +// keys must be strings +// vals can be any type, but display is implementation specific +// Emit a message and key/value pairs at a provided log level +func (h *HclogLoggerAdapter) Log(level hclog.Level, msg string, args ...interface{}) { + switch { + case h.level == hclog.NoLevel: // If logger is not set to any level, accept it + case h.level <= level: // Otherwise if logger is same or more verbose, accept + default: + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the TRACE level +func (h *HclogLoggerAdapter) Trace(msg string, args ...interface{}) { + if h.level > hclog.Trace { + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the DEBUG level +func (h *HclogLoggerAdapter) Debug(msg string, args ...interface{}) { + if h.level > hclog.Debug { + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the INFO level +func (h *HclogLoggerAdapter) Info(msg string, args ...interface{}) { + if h.level > hclog.Info { + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the WARN level +func (h *HclogLoggerAdapter) Warn(msg string, args ...interface{}) { + if h.level > hclog.Warn { + return + } + h.writeEvent("", msg, args) +} + +// Emit a message and key/value pairs at the ERROR level +func (h *HclogLoggerAdapter) Error(msg string, args ...interface{}) { + if h.level > hclog.Error { + return + } + h.writeEvent("", msg, args) +} + +func (h *HclogLoggerAdapter) writeEvent(caller Op, msg string, args []interface{}) { + h.l.RLock() + defer h.l.RUnlock() + var allArgs []interface{} + if len(h.withArgs)+len(args) > 0 { + allArgs = append(h.withArgs, args...) + } + if h.name != "" { + allArgs = append(allArgs, "@original-log-name", h.name) + } + allArgs = append(allArgs, "@original-log-level", h.level.String()) + WriteSysEvent(h.eventCtx, "", msg, allArgs...) +} + +// Indicate if TRACE logs would be emitted. This and the other Is* guards +// are used to elide expensive logging code based on the current level. +func (h *HclogLoggerAdapter) IsTrace() bool { + return h.level <= hclog.Trace +} + +// Indicate if DEBUG logs would be emitted. This and the other Is* guards +func (h *HclogLoggerAdapter) IsDebug() bool { + return h.level <= hclog.Debug +} + +// Indicate if INFO logs would be emitted. This and the other Is* guards +func (h *HclogLoggerAdapter) IsInfo() bool { + return h.level <= hclog.Info +} + +// Indicate if WARN logs would be emitted. This and the other Is* guards +func (h *HclogLoggerAdapter) IsWarn() bool { + return h.level <= hclog.Warn +} + +// Indicate if ERROR logs would be emitted. This and the other Is* guards +func (h *HclogLoggerAdapter) IsError() bool { + return h.level <= hclog.Error +} + +// ImpliedArgs returns With key/value pairs +func (h *HclogLoggerAdapter) ImpliedArgs() []interface{} { + return h.withArgs +} + +// Creates a sublogger that will always have the given key/value pairs +func (h *HclogLoggerAdapter) With(args ...interface{}) hclog.Logger { + h.l.Lock() + defer h.l.Unlock() + newArgs := args + if len(h.withArgs) > 0 { + newArgs = make([]interface{}, len(h.withArgs), len(h.withArgs)+len(args)) + copy(newArgs, h.withArgs) + newArgs = append(newArgs, args...) + } + return &HclogLoggerAdapter{ + eventCtx: h.eventCtx, + l: new(sync.RWMutex), + level: h.level, + name: h.name, + withArgs: newArgs, + } +} + +// Returns the Name of the logger +func (h *HclogLoggerAdapter) Name() string { + h.l.RLock() + defer h.l.RUnlock() + return h.name +} + +// Create a logger that will prepend the name string on the front of all messages. +// If the logger already has a name, the new value will be appended to the current +// name. That way, a major subsystem can use this to decorate all it's own logs +// without losing context. +func (h *HclogLoggerAdapter) Named(name string) hclog.Logger { + h.l.Lock() + defer h.l.Unlock() + var newArgs []interface{} + if len(h.withArgs) > 0 { + newArgs = make([]interface{}, len(h.withArgs)) + copy(newArgs, h.withArgs) + } + + newName := name + if h.name != "" { + newName = fmt.Sprintf("%s.%s", h.name, name) + } + + return &HclogLoggerAdapter{ + eventCtx: h.eventCtx, + l: new(sync.RWMutex), + level: h.level, + name: newName, + withArgs: newArgs, + } +} + +// Create a logger that will prepend the name string on the front of all messages. +// This sets the name of the logger to the value directly, unlike Named which honor +// the current name as well. +func (h *HclogLoggerAdapter) ResetNamed(name string) hclog.Logger { + h.l.Lock() + defer h.l.Unlock() + var newArgs []interface{} + if len(h.withArgs) > 0 { + newArgs = make([]interface{}, len(h.withArgs)) + copy(newArgs, h.withArgs) + } + return &HclogLoggerAdapter{ + eventCtx: h.eventCtx, + l: new(sync.RWMutex), + level: h.level, + name: name, + withArgs: newArgs, + } +} + +// Updates the level. This should affect all related loggers as well, +// unless they were created with IndependentLevels. If an +// implementation cannot update the level on the fly, it should no-op. +// +// This implementation is a no-op currently. +func (h *HclogLoggerAdapter) SetLevel(_ hclog.Level) {} + +// Return a value that conforms to the stdlib log.Logger interface +// +// This implementation does not currently support this and returns nil. +func (h *HclogLoggerAdapter) StandardLogger(opts *hclog.StandardLoggerOptions) *log.Logger { + return nil +} + +// Return a value that conforms to io.Writer, which can be passed into log.SetOutput() +// +// This implementation does not currently support this and returns nil. +func (h *HclogLoggerAdapter) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer { + return nil +} diff --git a/internal/observability/event/hclog_event_adapter_test.go b/internal/observability/event/hclog_event_adapter_test.go new file mode 100644 index 0000000000..c432c17a57 --- /dev/null +++ b/internal/observability/event/hclog_event_adapter_test.go @@ -0,0 +1,186 @@ +package event + +import ( + "bytes" + "context" + "sync" + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventer_HclogLoggerAdapter(t *testing.T) { + t.Parallel() + require := require.New(t) + ctx := context.Background() + buffer := new(bytes.Buffer) + eventerConfig := EventerConfig{ + AuditEnabled: true, + ObservationsEnabled: true, + SysEventsEnabled: true, + Sinks: []*SinkConfig{ + { + Name: "test-sink", + EventTypes: []Type{EveryType}, + Format: TextHclogSinkFormat, + Type: WriterSink, + WriterConfig: &WriterSinkTypeConfig{ + Writer: buffer, + }, + }, + }, + } + testLock := &sync.Mutex{} + testLogger := hclog.New(&hclog.LoggerOptions{ + Mutex: testLock, + Name: "test", + }) + eventer, err := NewEventer( + testLogger, + testLock, + "TestEventer_HclogLoggerAdapter", + eventerConfig, + ) + require.NoError(err) + + // This test sends a series of events through the hclog adapter and + // validates that we see the ones we expect to see on the other side. It + // also tests various features such as Named and With to ensure they turn + // into values on the other side. + logger, err := NewHclogLogger(ctx, eventer, WithHclogLevel(hclog.Info)) + require.NoError(err) + + tests := []struct { + name string + plainLog bool + level hclog.Level + shouldNotLog bool + logOverride hclog.Logger + input string + outputSubstrs []string + }{ + { + name: "over-level-error", + level: hclog.Error, + input: "over-error", + outputSubstrs: []string{"msg=over-error"}, + }, + { + name: "over-level-warn", + level: hclog.Warn, + input: "over-warn", + outputSubstrs: []string{"msg=over-warn"}, + }, + { + name: "at-level", + level: hclog.Info, + input: "at", + outputSubstrs: []string{"msg=at"}, + }, + { + name: "under-level-debug", + level: hclog.Debug, + input: "under-debug", + shouldNotLog: true, + }, + { + name: "under-level-trace", + level: hclog.Trace, + input: "under-trace", + shouldNotLog: true, + }, + { + name: "plain-under-trace", + plainLog: true, + level: hclog.Trace, + input: "plain-under-trace", + shouldNotLog: true, + }, + { + name: "plain-at", + plainLog: true, + level: hclog.Info, + input: "plain-at", + outputSubstrs: []string{"msg=plain-at"}, + }, + { + name: "plain-over-warn", + plainLog: true, + level: hclog.Warn, + input: "plain-over-warn", + outputSubstrs: []string{"msg=plain-over-warn"}, + }, + { + name: "with-named", + level: hclog.Info, + logOverride: logger.Named("named-logger"), + input: "named-input", + outputSubstrs: []string{"msg=named-input", "@original-log-name=named-logger"}, + }, + { + name: "sub-named", + level: hclog.Info, + logOverride: logger.Named("named-logger").Named("subnamed-logger"), + input: "subnamed-input", + outputSubstrs: []string{"msg=subnamed-input", "@original-log-name=named-logger.subnamed-logger"}, + }, + { + name: "reset-named", + level: hclog.Info, + logOverride: logger.Named("named-logger").ResetNamed("reset-logger"), + input: "reset-input", + outputSubstrs: []string{"msg=reset-input", "@original-log-name=reset-logger"}, + }, + { + name: "with-params", + level: hclog.Info, + logOverride: logger.With("with", "params"), + input: "with-params", + outputSubstrs: []string{"msg=with-params", "with=params"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + buffer.Reset() + loggerToUse := logger + if tt.logOverride != nil { + loggerToUse = tt.logOverride + } + + switch tt.plainLog { + case false: + switch tt.level { + case hclog.Error: + assert.True(loggerToUse.IsError() == !tt.shouldNotLog) + loggerToUse.Error(tt.input) + case hclog.Warn: + assert.True(loggerToUse.IsWarn() == !tt.shouldNotLog) + loggerToUse.Warn(tt.input) + case hclog.Info: + assert.True(loggerToUse.IsInfo() == !tt.shouldNotLog) + loggerToUse.Info(tt.input) + case hclog.Debug: + assert.True(loggerToUse.IsDebug() == !tt.shouldNotLog) + loggerToUse.Debug(tt.input) + case hclog.Trace: + assert.True(loggerToUse.IsTrace() == !tt.shouldNotLog) + loggerToUse.Trace(tt.input) + } + default: + loggerToUse.Log(tt.level, tt.input) + } + + switch tt.shouldNotLog { + case true: + assert.Len(buffer.String(), 0) + default: + for _, substr := range tt.outputSubstrs { + assert.Contains(buffer.String(), substr) + } + } + }) + } +} diff --git a/internal/observability/event/options.go b/internal/observability/event/options.go index 6be6ad437c..44740c458f 100644 --- a/internal/observability/event/options.go +++ b/internal/observability/event/options.go @@ -4,6 +4,7 @@ import ( "net/url" "time" + "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping/v2" ) @@ -42,6 +43,11 @@ type options struct { withSchema *url.URL withAuditWrapper wrapping.Wrapper withFilterOperations AuditFilterOperations + withGating bool + withNoGateLocking bool + + // These options are related to the hclog adapter + withHclogLevel hclog.Level withBroker broker // test only option withAuditSink bool // test only option @@ -196,3 +202,24 @@ func WithFilterOperations(fop AuditFilterOperations) Option { o.withFilterOperations = fop } } + +// WithHclogLevel is an option to specify a log level if using the adapter +func WithHclogLevel(with hclog.Level) Option { + return func(o *options) { + o.withHclogLevel = with + } +} + +// WithGating starts the eventer in gated mode +func WithGating(with bool) Option { + return func(o *options) { + o.withGating = with + } +} + +// WithNoGateLocking is used when trawling through the existing queue to ensure we don't deadlock +func WithNoGateLocking(with bool) Option { + return func(o *options) { + o.withNoGateLocking = with + } +} diff --git a/internal/observability/event/options_test.go b/internal/observability/event/options_test.go index 13852a5170..43aed98150 100644 --- a/internal/observability/event/options_test.go +++ b/internal/observability/event/options_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" wrapping "github.com/hashicorp/go-kms-wrapping/v2" "github.com/hashicorp/go-kms-wrapping/v2/aead" "github.com/stretchr/testify/assert" @@ -171,6 +172,27 @@ func Test_GetOpts(t *testing.T) { testOpts.withFilterOperations = overrides assert.Equal(opts, testOpts) }) + t.Run("WithHclogLevel", func(t *testing.T) { + assert := assert.New(t) + opts := getOpts(WithHclogLevel(hclog.Info)) + testOpts := getDefaultOptions() + testOpts.withHclogLevel = hclog.Info + assert.Equal(opts, testOpts) + }) + t.Run("withEventGating", func(t *testing.T) { + assert := assert.New(t) + testOpts := getDefaultOptions() + assert.False(testOpts.withGating) + opts := getOpts(WithGating(true)) + assert.True(opts.withGating) + }) + t.Run("withNoGateLocking", func(t *testing.T) { + assert := assert.New(t) + testOpts := getDefaultOptions() + assert.False(testOpts.withNoGateLocking) + opts := getOpts(WithNoGateLocking(true)) + assert.True(opts.withNoGateLocking) + }) } // testWrapper initializes an AEAD wrapping.Wrapper for testing. Note: this diff --git a/internal/observability/event/sink_config.go b/internal/observability/event/sink_config.go index 5d97b7b3bd..5d2ec8bfc1 100644 --- a/internal/observability/event/sink_config.go +++ b/internal/observability/event/sink_config.go @@ -2,6 +2,7 @@ package event import ( "fmt" + "io" "time" ) @@ -14,9 +15,10 @@ type SinkConfig struct { AllowFilters []string `hcl:"allow_filters"` // AllowFilters define a set predicates for including an event in the sink. If any filter matches, the event will be included. The filter should be in a format supported by hashicorp/go-bexpr. DenyFilters []string `hcl:"deny_filters"` // DenyFilters define a set predicates for excluding an event in the sink. If any filter matches, the event will be excluded. The filter should be in a format supported by hashicorp/go-bexpr. Format SinkFormat `hcl:"format"` // Format defines the format for the sink (JSONSinkFormat or TextSinkFormat). - Type SinkType `hcl:"type"` // Type defines the type of sink (StderrSink or FileSink). + Type SinkType `hcl:"type"` // Type defines the type of sink (StderrSink, FileSink, or WriterSink). StderrConfig *StderrSinkTypeConfig `hcl:"stderr"` // StderrConfig defines parameters for a stderr output. FileConfig *FileSinkTypeConfig `hcl:"file"` // FileConfig defines parameters for a file output. + WriterConfig *WriterSinkTypeConfig `hcl:"-"` // WriterConfig defines parameters for an io.Writer output. This is not available via HCL. AuditConfig *AuditConfig `hcl:"audit_config"` // AuditConfig defines optional parameters for audit events (if EventTypes contains audit) } @@ -36,6 +38,9 @@ func (sc *SinkConfig) Validate() error { if sc.FileConfig != nil { foundSinkTypeConfigs++ } + if sc.WriterConfig != nil { + foundSinkTypeConfigs++ + } if foundSinkTypeConfigs > 1 { return fmt.Errorf("%s: too many sink type config blocks: %w", op, ErrInvalidParameter) } @@ -56,6 +61,13 @@ func (sc *SinkConfig) Validate() error { if sc.FileConfig.FileName == "" { return fmt.Errorf("%s: missing file name: %w", op, ErrInvalidParameter) } + case WriterSink: + if sc.WriterConfig == nil { + return fmt.Errorf(`%s: missing writer config: %w`, op, ErrInvalidParameter) + } + if sc.WriterConfig.Writer == nil { + return fmt.Errorf("%s: missing writer: %w", op, ErrInvalidParameter) + } } if sc.Name == "" { return fmt.Errorf("%s: missing sink name: %w", op, ErrInvalidParameter) @@ -95,6 +107,11 @@ type FileSinkTypeConfig struct { RotateMaxFiles int `hcl:"rotate_max_files" mapstructure:"rotate_max_files"` // RotateMaxFiles defines how may historical rotated files should be kept for a FileSink } +// WriterSinkTypeConfig contains configuration structures for writer sink types +type WriterSinkTypeConfig struct { + Writer io.Writer `hcl:"-" mapstructure:"-"` // The writer to write to +} + // FilterType defines a type for filters (allow or deny) type FilterType string diff --git a/internal/observability/event/sink_type.go b/internal/observability/event/sink_type.go index a622e0c3e5..71782a4e2f 100644 --- a/internal/observability/event/sink_type.go +++ b/internal/observability/event/sink_type.go @@ -7,14 +7,15 @@ import ( const ( StderrSink SinkType = "stderr" // StderrSink is written to stderr FileSink SinkType = "file" // FileSink is written to a file + WriterSink SinkType = "writer" // WriterSink is written to an io.Writer ) -type SinkType string // SinkType defines the type of sink in a config stanza (file, stderr) +type SinkType string // SinkType defines the type of sink in a config stanza (file, stderr, writer) func (t SinkType) Validate() error { const op = "event.(SinkType).validate" switch t { - case StderrSink, FileSink: + case StderrSink, FileSink, WriterSink: return nil default: return fmt.Errorf("%s: '%s' is not a valid sink type: %w", op, t, ErrInvalidParameter) diff --git a/internal/servers/controller/controller.go b/internal/servers/controller/controller.go index 9c48b5bbd5..9c81c2f5a8 100644 --- a/internal/servers/controller/controller.go +++ b/internal/servers/controller/controller.go @@ -125,7 +125,14 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { } } + var pluginLogger hclog.Logger for _, enabledPlugin := range c.enabledPlugins { + if pluginLogger == nil { + pluginLogger, err = event.NewHclogLogger(ctx, c.conf.Server.Eventer) + if err != nil { + return nil, fmt.Errorf("error creating host catalog plugin logger: %w", err) + } + } switch enabledPlugin { case base.EnabledPluginHostLoopback: plg := pluginhost.NewWrappingPluginClient(pluginhost.NewLoopbackPlugin()) @@ -145,6 +152,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { pluginutil.WithPluginExecutionDirectory(conf.RawConfig.Plugins.ExecutionDir), pluginutil.WithPluginsFilesystem(host_plugin_assets.HostPluginPrefix, host_plugin_assets.FileSystem()), ), + external_host_plugins.WithLogger(pluginLogger.Named(pluginType)), ) if err != nil { return nil, fmt.Errorf("error creating %s host plugin: %w", pluginType, err)