Add hclog adapter and gating to eventer (#1915)

This contains to related things:

1. An adapter allowing an hclog interface to take in information and send them to the eventer
2. A gate on the eventer, which is the analogue to the behavior prior to eventing where we'd ensure logs were not printed prior to the startup information. Without this, plugin logs using the hclog adapter will start showing up too early.
pull/1928/head
Jeff Mitchell 4 years ago committed by GitHub
parent 7eb29261b2
commit e596daf34d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -39,6 +39,7 @@ type Options struct {
withAttributeFieldPrefix string
withStatusCode int
withHostPlugin func() (string, plugin.HostPluginServiceClient)
withEventGating bool
}
func getDefaultOptions() Options {
@ -180,3 +181,10 @@ func WithHostPlugin(pluginId string, plg plugin.HostPluginServiceClient) Option
}
}
}
// WithEventGating starts the eventer in gated mode
func WithEventGating(with bool) Option {
return func(o *Options) {
o.withEventGating = with
}
}

@ -117,4 +117,11 @@ func Test_GetOpts(t *testing.T) {
testOpts.withDialect = "test-dialect"
assert.Equal(opts, testOpts)
})
t.Run("withEventGating", func(t *testing.T) {
assert := assert.New(t)
testOpts := getDefaultOptions()
assert.False(testOpts.withEventGating)
opts := getOpts(WithEventGating(true))
assert.True(opts.withEventGating)
})
}

@ -59,6 +59,8 @@ func TestServer_SetupKMSes(t *testing.T) {
purposes: []string{globals.KmsPurposeRoot, globals.KmsPurposeRecovery, globals.KmsPurposeWorkerAuth, globals.KmsPurposeConfig},
},
}
logger := hclog.Default()
serLock := new(sync.Mutex)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
@ -70,8 +72,9 @@ func TestServer_SetupKMSes(t *testing.T) {
},
},
}
s := NewServer(&Command{})
err := s.SetupKMSes(context.Background(), cli.NewMockUi(), &config.Config{SharedConfig: conf})
s := NewServer(&Command{Context: context.Background()})
require.NoError(s.SetupEventing(logger, serLock, "setup-kms-testing"))
err := s.SetupKMSes(s.Context, cli.NewMockUi(), &config.Config{SharedConfig: conf})
if tt.wantErrContains != "" {
require.Error(err)

@ -205,7 +205,8 @@ func (b *Server) SetupEventing(logger hclog.Logger, serializationLock *sync.Mute
serializationLock,
serverName,
*opts.withEventerConfig,
event.WithAuditWrapper(opts.withEventWrapper))
event.WithAuditWrapper(opts.withEventWrapper),
event.WithGating(opts.withEventGating))
if err != nil {
return berrors.WrapDeprecated(err, op, berrors.WithMsg("unable to create eventer"))
}
@ -283,11 +284,15 @@ func (b *Server) SetupLogging(flagLogLevel, flagLogFormat, configLogLevel, confi
return nil
}
func (b *Server) ReleaseLogGate() {
func (b *Server) ReleaseLogGate() error {
// Release the log gate.
b.Logger.(hclog.OutputResettable).ResetOutputWithFlush(&hclog.LoggerOptions{
Output: b.logOutput,
}, b.GatedWriter)
if b.Eventer != nil {
return b.Eventer.ReleaseGate()
}
return nil
}
func (b *Server) StorePidFile(pidPath string) error {
@ -482,6 +487,7 @@ func (b *Server) SetupListeners(ui cli.Ui, config *configutil.SharedConfig, allo
// and sends each off to configutil to instantiate a wrapper.
func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Config) error {
sharedConfig := config.SharedConfig
var pluginLogger hclog.Logger
var err error
for _, kms := range sharedConfig.Seals {
for _, purpose := range kms.Purpose {
@ -498,6 +504,13 @@ func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Confi
return fmt.Errorf("Unknown KMS purpose %q", kms.Purpose)
}
if pluginLogger == nil {
pluginLogger, err = event.NewHclogLogger(b.Context, b.Eventer)
if err != nil {
return fmt.Errorf("Error creating KMS plugin logger: %w", err)
}
}
// This can be modified by configutil so store the original value
origPurpose := kms.Purpose
kms.Purpose = []string{purpose}
@ -509,7 +522,9 @@ func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Confi
&b.Info,
configutil.WithPluginOptions(
pluginutil.WithPluginsMap(kms_plugin_assets.BuiltinKmsPlugins()),
pluginutil.WithPluginsFilesystem(kms_plugin_assets.KmsPluginPrefix, kms_plugin_assets.FileSystem())),
pluginutil.WithPluginsFilesystem(kms_plugin_assets.KmsPluginPrefix, kms_plugin_assets.FileSystem()),
),
configutil.WithLogger(pluginLogger.Named(kms.Type).With("purpose", purpose)),
)
if wrapperConfigError != nil {
return fmt.Errorf(
@ -551,7 +566,6 @@ func (b *Server) SetupKMSes(ctx context.Context, ui cli.Ui, config *config.Confi
default:
return fmt.Errorf("KMS purpose of %q is unknown", purpose)
}
}
}

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/boundary/internal/cmd/base"
"github.com/hashicorp/boundary/internal/cmd/config"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/observability/event"
host_plugin_assets "github.com/hashicorp/boundary/plugins/host"
kms_plugin_assets "github.com/hashicorp/boundary/plugins/kms"
external_host_plugins "github.com/hashicorp/boundary/sdk/plugins/host"
@ -138,13 +139,49 @@ func (c *MigrateCommand) Run(args []string) (retCode int) {
}()
}
dialect := "postgres"
c.srv = base.NewServer(&base.Command{UI: c.UI})
if err := c.srv.SetupLogging(c.flagLogLevel, c.flagLogFormat, c.Config.LogLevel, c.Config.LogFormat); err != nil {
c.UI.Error(err.Error())
return base.CommandCliError
}
var serverName string
switch {
case c.Config.Controller == nil:
serverName = "boundary-database-migrate"
default:
if _, err := c.Config.Controller.InitNameIfEmpty(); err != nil {
c.UI.Error(err.Error())
return base.CommandCliError
}
serverName = c.Config.Controller.Name + "/boundary-database-migrate"
}
if err := c.srv.SetupEventing(
c.srv.Logger,
c.srv.StderrLock,
serverName,
base.WithEventerConfig(c.Config.Eventing)); err != nil {
c.UI.Error(err.Error())
return base.CommandCliError
}
pluginLogger, err := event.NewHclogLogger(c.Context, c.srv.Eventer)
if err != nil {
c.UI.Error(fmt.Sprintf("Error creating host catalog plugin logger: %v", err))
return base.CommandCliError
}
_, awsCleanup, err := external_host_plugins.CreateHostPlugin(
c.Context,
"aws",
external_host_plugins.WithPluginOptions(
pluginutil.WithPluginExecutionDirectory(c.Config.Plugins.ExecutionDir),
pluginutil.WithPluginsFilesystem(host_plugin_assets.HostPluginPrefix, host_plugin_assets.FileSystem()),
))
),
external_host_plugins.WithLogger(pluginLogger.Named("aws")),
)
if err != nil {
c.UI.Error(fmt.Errorf("Error creating dynamic host plugin: %w", err).Error())
c.UI.Warn(base.WrapAtLength(
@ -176,30 +213,6 @@ plugins {
return base.CommandCliError
}
dialect := "postgres"
c.srv = base.NewServer(&base.Command{UI: c.UI})
if err := c.srv.SetupLogging(c.flagLogLevel, c.flagLogFormat, c.Config.LogLevel, c.Config.LogFormat); err != nil {
c.UI.Error(err.Error())
return base.CommandCliError
}
var serverName string
switch {
case c.Config.Controller == nil:
serverName = "boundary-database-migrate"
default:
if _, err := c.Config.Controller.InitNameIfEmpty(); err != nil {
c.UI.Error(err.Error())
return base.CommandCliError
}
serverName = c.Config.Controller.Name + "/boundary-database-migrate"
}
if err := c.srv.SetupEventing(c.srv.Logger, c.srv.StderrLock, serverName, base.WithEventerConfig(c.Config.Eventing)); err != nil {
c.UI.Error(err.Error())
return base.CommandCliError
}
// If mlockall(2) isn't supported, show a warning. We disable this in dev
// because it is quite scary to see when first using Boundary. We also disable
// this if the user has explicitly disabled mlock in configuration.

@ -514,7 +514,8 @@ func (c *Command) Run(args []string) int {
c.StderrLock,
serverName,
base.WithEventerConfig(c.Config.Eventing),
base.WithEventFlags(eventFlags)); err != nil {
base.WithEventFlags(eventFlags),
base.WithEventGating(true)); err != nil {
c.UI.Error(err.Error())
return base.CommandCliError
}
@ -596,7 +597,10 @@ func (c *Command) Run(args []string) int {
}
c.PrintInfo(c.UI)
c.ReleaseLogGate()
if err := c.ReleaseLogGate(); err != nil {
c.UI.Error(fmt.Errorf("Error releasing event gate: %w", err).Error())
return base.CommandCliError
}
{
c.EnabledPlugins = append(c.EnabledPlugins, base.EnabledPluginHostAws, base.EnabledPluginHostAzure)

@ -164,7 +164,8 @@ func (c *Command) Run(args []string) int {
if err := c.SetupEventing(c.Logger,
c.StderrLock,
strings.Join(serverNames, "/"),
base.WithEventerConfig(c.Config.Eventing)); err != nil {
base.WithEventerConfig(c.Config.Eventing),
base.WithEventGating(true)); err != nil {
c.UI.Error(err.Error())
return base.CommandUserError
}
@ -447,7 +448,10 @@ func (c *Command) Run(args []string) int {
}()
c.PrintInfo(c.UI)
c.ReleaseLogGate()
if err := c.ReleaseLogGate(); err != nil {
c.UI.Error(fmt.Errorf("Error releasing event gate: %w", err).Error())
return base.CommandCliError
}
if c.Config.Controller != nil {
c.EnabledPlugins = append(c.EnabledPlugins, base.EnabledPluginHostAws, base.EnabledPluginHostAzure)

@ -22,6 +22,8 @@ import (
"github.com/hashicorp/eventlogger/sinks/writer"
"github.com/hashicorp/go-hclog"
wrapping "github.com/hashicorp/go-kms-wrapping/v2"
"github.com/hashicorp/go-multierror"
"go.uber.org/atomic"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)
@ -59,6 +61,13 @@ type broker interface {
RegisterPipeline(def eventlogger.Pipeline) error
}
// queuedEvent stores an event and the context that was associated with it when
// gating
type queuedEvent struct {
ctx context.Context
event interface{}
}
// Eventer provides a method to send events to pipelines of sinks
type Eventer struct {
broker broker
@ -69,6 +78,13 @@ type Eventer struct {
observationPipelines []pipeline
errPipelines []pipeline
auditWrapperNodes []interface{}
// Gating is used to delay output of events until after we have a chance to
// render startup info, similar to what was done for hclog before eventing
// supplanted it. It affects only error and system events.
gated atomic.Bool
gatedQueue []*queuedEvent
gatedQueueLock *sync.Mutex
}
type pipeline struct {
@ -189,6 +205,7 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri
}
e := &Eventer{
gatedQueueLock: new(sync.Mutex),
logger: log,
conf: c,
broker: b,
@ -199,6 +216,10 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri
e.broker.StopTimeAt(opts.withNow)
}
if opts.withGating {
e.gated.Store(true)
}
// serializedStderr will be shared among all StderrSinks so their output is not
// interwoven
serializedStderr := serializedWriter{
@ -253,6 +274,17 @@ func NewEventer(log hclog.Logger, serializationLock *sync.Mutex, serverName stri
return nil, fmt.Errorf("%s: %w", op, err)
}
sinkId = eventlogger.NodeID(id)
case WriterSink:
wsc := s.WriterConfig
sinkNode = &writer.Sink{
Format: string(s.Format),
Writer: wsc.Writer,
}
id, err := NewId("writer")
if err != nil {
return nil, fmt.Errorf("%s: %w", op, err)
}
sinkId = eventlogger.NodeID(id)
default:
return nil, fmt.Errorf("%s: unknown sink type %s", op, s.Type)
}
@ -558,7 +590,7 @@ func (e *Eventer) RotateAuditWrapper(ctx context.Context, newWrapper wrapping.Wr
}
// writeObservation writes/sends an Observation event.
func (e *Eventer) writeObservation(ctx context.Context, event *observation) error {
func (e *Eventer) writeObservation(ctx context.Context, event *observation, _ ...Option) error {
const op = "event.(Eventer).writeObservation"
if event == nil {
return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter)
@ -584,11 +616,23 @@ func (e *Eventer) writeObservation(ctx context.Context, event *observation) erro
}
// writeError writes/sends an Err event
func (e *Eventer) writeError(ctx context.Context, event *err) error {
func (e *Eventer) writeError(ctx context.Context, event *err, opt ...Option) error {
const op = "event.(Eventer).writeError"
if event == nil {
return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter)
}
opts := getOpts(opt...)
if e.gated.Load() && !opts.withNoGateLocking {
e.gatedQueueLock.Lock()
defer e.gatedQueueLock.Unlock()
if e.gated.Load() { // validate that it's still gated and we haven't just drained the queue
e.gatedQueue = append(e.gatedQueue, &queuedEvent{
ctx: ctx,
event: event,
})
return nil
}
}
err := e.retrySend(ctx, stdRetryCount, expBackoff{}, func() (eventlogger.Status, error) {
return e.broker.Send(ctx, eventlogger.EventType(ErrorType), event)
})
@ -600,11 +644,23 @@ func (e *Eventer) writeError(ctx context.Context, event *err) error {
}
// writeSysEvent writes/sends an sysEvent event
func (e *Eventer) writeSysEvent(ctx context.Context, event *sysEvent) error {
func (e *Eventer) writeSysEvent(ctx context.Context, event *sysEvent, opt ...Option) error {
const op = "event.(Eventer).writeSysEvent"
if event == nil {
return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter)
}
opts := getOpts(opt...)
if e.gated.Load() && !opts.withNoGateLocking {
e.gatedQueueLock.Lock()
defer e.gatedQueueLock.Unlock()
if e.gated.Load() { // validate that it's still gated and we haven't just drained the queue
e.gatedQueue = append(e.gatedQueue, &queuedEvent{
ctx: ctx,
event: event,
})
return nil
}
}
err := e.retrySend(ctx, stdRetryCount, expBackoff{}, func() (eventlogger.Status, error) {
return e.broker.Send(ctx, eventlogger.EventType(SystemType), event)
})
@ -616,7 +672,7 @@ func (e *Eventer) writeSysEvent(ctx context.Context, event *sysEvent) error {
}
// writeAudit writes/send an audit event
func (e *Eventer) writeAudit(ctx context.Context, event *audit) error {
func (e *Eventer) writeAudit(ctx context.Context, event *audit, _ ...Option) error {
const op = "event.(Eventer).writeAudit"
if event == nil {
return fmt.Errorf("%s: missing event: %w", op, ErrInvalidParameter)
@ -655,6 +711,52 @@ func (e *Eventer) FlushNodes(ctx context.Context) error {
return nil
}
// ReleaseGate releases queued events. If any event isn't successfully written,
// it remains in the queue and we could try a flush later.
func (e *Eventer) ReleaseGate() error {
const op = "event.(Eventer).ReleaseGate"
e.gatedQueueLock.Lock()
defer e.gatedQueueLock.Unlock()
// Don't gate anymore. By the time we hit this we will have the lock and be
// guaranteed that nothing else will add onto the queue while we drain it.
// Logic in the functions that add to the queue validate this boolean again
// after they acquire the lock so running this function should fully drain
// the events except in the case of an error.
e.gated.Store(false)
// Don't do anything if we're not gated and are fully drained
if len(e.gatedQueue) == 0 && !e.gated.Load() {
return nil
}
var totalErrs *multierror.Error
for _, qe := range e.gatedQueue {
var writeErr error
if qe == nil {
continue // we may have already sent this but gotten errors later
}
var queuedOp string
switch t := qe.event.(type) {
case *sysEvent:
queuedOp = "system"
writeErr = e.writeSysEvent(qe.ctx, t, WithNoGateLocking(true))
case *err:
queuedOp = "error"
writeErr = e.writeError(qe.ctx, t, WithNoGateLocking(true))
default:
// Have no idea what this is and shouldn't have gotten in here to
// begin with, so just continue, and log it
writeErr = fmt.Errorf("unknown event type %T", t)
}
if writeErr != nil {
totalErrs = multierror.Append(fmt.Errorf("in %s, error sending queued %s event: %w", op, queuedOp, writeErr))
}
}
e.gatedQueue = nil
return totalErrs.ErrorOrNil()
}
// StandardLogger will create *log.Logger that will emit events through this
// Logger. This allows packages that require the stdlib log to emit events
// instead.

@ -0,0 +1,166 @@
package event
import (
"bytes"
"context"
"fmt"
"os"
"strings"
"sync"
"testing"
"github.com/hashicorp/boundary/globals"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEventer_Gating(t *testing.T) {
t.Parallel()
require := require.New(t)
// FIXME: Remove this when no longer needed:
os.Setenv(globals.BOUNDARY_DEVELOPER_ENABLE_EVENTS, "true")
buffer := new(bytes.Buffer)
eventerConfig := EventerConfig{
AuditEnabled: true,
ObservationsEnabled: true,
SysEventsEnabled: true,
Sinks: []*SinkConfig{
{
Name: "test-sink",
EventTypes: []Type{EveryType},
Format: TextHclogSinkFormat,
Type: WriterSink,
WriterConfig: &WriterSinkTypeConfig{
Writer: buffer,
},
},
},
}
testLock := &sync.Mutex{}
testLogger := hclog.New(&hclog.LoggerOptions{
Mutex: testLock,
Name: "test",
})
eventer, err := NewEventer(
testLogger,
testLock,
"TestEventer_Gating",
eventerConfig,
WithGating(true),
)
require.NoError(err)
ctx, err := NewEventerContext(context.Background(), eventer)
require.NoError(err)
// This test sends a series of events of different types. The tests are
// meant to be in order as we want to send some that should be gated and
// some that shouldn't and ensure correct behavior at each step.
var totalEvents int
tests := []struct {
name string
eventFn func()
expectedGatedEvents int
}{
{
name: "system-event-1",
eventFn: func() {
WriteSysEvent(ctx, "system-event-1", "system-event-1")
totalEvents++
},
expectedGatedEvents: 1,
},
{
name: "system-event-2",
eventFn: func() {
WriteSysEvent(ctx, "system-event-2", "system-event-2")
totalEvents++
},
expectedGatedEvents: 2,
},
{
name: "audit-1",
eventFn: func() {
require.NoError(WriteAudit(ctx, "audit-1"))
totalEvents++
},
expectedGatedEvents: 2,
},
{
name: "observation-1",
eventFn: func() {
require.NoError(WriteObservation(ctx, "observation-1", WithId("observation-1"), WithHeader("foo", "bar")))
totalEvents++
},
expectedGatedEvents: 2,
},
{
name: "error-1",
eventFn: func() {
WriteError(ctx, "error-1", fmt.Errorf("error-1"))
totalEvents++
},
expectedGatedEvents: 3,
},
{
name: "error-2",
eventFn: func() {
WriteError(ctx, "error-2", fmt.Errorf("error-2"))
totalEvents++
},
expectedGatedEvents: 4,
},
// This should result in all events being flushed so none gated
{
name: "release-gate",
eventFn: func() {
require.NoError(eventer.ReleaseGate())
},
expectedGatedEvents: 0,
},
// From here on out we're verifying that all events of all types go through
{
name: "system-event-3",
eventFn: func() {
WriteSysEvent(ctx, "system-event-3", "system-event-3")
totalEvents++
},
expectedGatedEvents: 0,
},
{
name: "audit-2",
eventFn: func() {
require.NoError(WriteAudit(ctx, "audit-2"))
totalEvents++
},
expectedGatedEvents: 0,
},
{
name: "observation-2",
eventFn: func() {
require.NoError(WriteObservation(ctx, "observation-2", WithId("observation-2"), WithHeader("foo", "bar")))
totalEvents++
},
expectedGatedEvents: 0,
},
{
name: "error-2",
eventFn: func() {
WriteError(ctx, "error-2", fmt.Errorf("error-2"))
totalEvents++
},
expectedGatedEvents: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)
tt.eventFn()
assert.Len(strutil.RemoveEmpty(strings.Split(buffer.String(), "\n")), totalEvents-tt.expectedGatedEvents, buffer.String())
})
}
}

@ -97,8 +97,9 @@ func Test_InitSysEventer(t *testing.T) {
lock: testLock,
serverName: "success-with-config",
want: &Eventer{
logger: testLogger,
conf: testConfig.EventerConfig,
logger: testLogger,
gatedQueueLock: new(sync.Mutex),
conf: testConfig.EventerConfig,
},
},
{
@ -108,7 +109,8 @@ func Test_InitSysEventer(t *testing.T) {
lock: testLock,
serverName: "success-with-default-config",
want: &Eventer{
logger: testLogger,
logger: testLogger,
gatedQueueLock: new(sync.Mutex),
conf: EventerConfig{
Sinks: []*SinkConfig{
{
@ -435,7 +437,8 @@ func Test_NewEventer(t *testing.T) {
logger: testLogger,
serverName: "valid-audit-config-with-wrapper",
want: &Eventer{
logger: testLogger,
logger: testLogger,
gatedQueueLock: new(sync.Mutex),
conf: EventerConfig{
AuditEnabled: true,
Sinks: []*SinkConfig{
@ -521,7 +524,8 @@ func Test_NewEventer(t *testing.T) {
lock: testLock,
serverName: "success-with-default-config",
want: &Eventer{
logger: testLogger,
logger: testLogger,
gatedQueueLock: new(sync.Mutex),
conf: EventerConfig{
Sinks: []*SinkConfig{
{
@ -561,8 +565,9 @@ func Test_NewEventer(t *testing.T) {
lock: testLock,
serverName: "testSetup",
want: &Eventer{
logger: testLogger,
conf: testSetup.EventerConfig,
logger: testLogger,
gatedQueueLock: new(sync.Mutex),
conf: testSetup.EventerConfig,
},
wantRegistered: []string{
"cloudevents", // stderr
@ -603,8 +608,9 @@ func Test_NewEventer(t *testing.T) {
lock: testLock,
serverName: "testSetup-with-all-opts",
want: &Eventer{
logger: testLogger,
conf: testSetupWithOpts.EventerConfig,
logger: testLogger,
gatedQueueLock: new(sync.Mutex),
conf: testSetupWithOpts.EventerConfig,
},
wantRegistered: []string{
"cloudevents", // stderr
@ -657,8 +663,9 @@ func Test_NewEventer(t *testing.T) {
lock: testLock,
serverName: "testSetup",
want: &Eventer{
logger: testLogger,
conf: testHclogSetup.EventerConfig,
logger: testLogger,
gatedQueueLock: new(sync.Mutex),
conf: testHclogSetup.EventerConfig,
},
wantRegistered: []string{
"hclog-text", // stderr

@ -0,0 +1,237 @@
package event
import (
"context"
"fmt"
"io"
"log"
"sync"
"github.com/hashicorp/go-hclog"
)
// HclogLoggerAdapter is used to provide an hclog-style interface to code that
// cannot natively handle eventing. Currently, all log lines are written as
// system events. Note that this is not meant for high throughput; some
// potential optimizations (such as using atomic values for name and such) are
// not current implemented. Additionally, some functions (such as fetching a
// stdlib logger/writer) are simply not supported right now.
type HclogLoggerAdapter struct {
eventCtx context.Context
l *sync.RWMutex
level hclog.Level
name string
withArgs []interface{}
}
// Ensure that we are implementing Logger
var _ hclog.Logger = (*HclogLoggerAdapter)(nil)
// NewHclogLogger creates a new hclog.Logger-compatible implementation that
// outputs to events
func NewHclogLogger(ctx context.Context, e *Eventer, opt ...Option) (hclog.Logger, error) {
const op = "event.HclogLogger"
eventCtx, err := NewEventerContext(ctx, e)
if err != nil {
return nil, fmt.Errorf("%s: %w", op, err)
}
opts := getOpts(opt...)
return &HclogLoggerAdapter{
eventCtx: eventCtx,
l: new(sync.RWMutex),
level: opts.withHclogLevel,
}, nil
}
// Args are alternating key, val pairs
// keys must be strings
// vals can be any type, but display is implementation specific
// Emit a message and key/value pairs at a provided log level
func (h *HclogLoggerAdapter) Log(level hclog.Level, msg string, args ...interface{}) {
switch {
case h.level == hclog.NoLevel: // If logger is not set to any level, accept it
case h.level <= level: // Otherwise if logger is same or more verbose, accept
default:
return
}
h.writeEvent("", msg, args)
}
// Emit a message and key/value pairs at the TRACE level
func (h *HclogLoggerAdapter) Trace(msg string, args ...interface{}) {
if h.level > hclog.Trace {
return
}
h.writeEvent("", msg, args)
}
// Emit a message and key/value pairs at the DEBUG level
func (h *HclogLoggerAdapter) Debug(msg string, args ...interface{}) {
if h.level > hclog.Debug {
return
}
h.writeEvent("", msg, args)
}
// Emit a message and key/value pairs at the INFO level
func (h *HclogLoggerAdapter) Info(msg string, args ...interface{}) {
if h.level > hclog.Info {
return
}
h.writeEvent("", msg, args)
}
// Emit a message and key/value pairs at the WARN level
func (h *HclogLoggerAdapter) Warn(msg string, args ...interface{}) {
if h.level > hclog.Warn {
return
}
h.writeEvent("", msg, args)
}
// Emit a message and key/value pairs at the ERROR level
func (h *HclogLoggerAdapter) Error(msg string, args ...interface{}) {
if h.level > hclog.Error {
return
}
h.writeEvent("", msg, args)
}
func (h *HclogLoggerAdapter) writeEvent(caller Op, msg string, args []interface{}) {
h.l.RLock()
defer h.l.RUnlock()
var allArgs []interface{}
if len(h.withArgs)+len(args) > 0 {
allArgs = append(h.withArgs, args...)
}
if h.name != "" {
allArgs = append(allArgs, "@original-log-name", h.name)
}
allArgs = append(allArgs, "@original-log-level", h.level.String())
WriteSysEvent(h.eventCtx, "", msg, allArgs...)
}
// Indicate if TRACE logs would be emitted. This and the other Is* guards
// are used to elide expensive logging code based on the current level.
func (h *HclogLoggerAdapter) IsTrace() bool {
return h.level <= hclog.Trace
}
// Indicate if DEBUG logs would be emitted. This and the other Is* guards
func (h *HclogLoggerAdapter) IsDebug() bool {
return h.level <= hclog.Debug
}
// Indicate if INFO logs would be emitted. This and the other Is* guards
func (h *HclogLoggerAdapter) IsInfo() bool {
return h.level <= hclog.Info
}
// Indicate if WARN logs would be emitted. This and the other Is* guards
func (h *HclogLoggerAdapter) IsWarn() bool {
return h.level <= hclog.Warn
}
// Indicate if ERROR logs would be emitted. This and the other Is* guards
func (h *HclogLoggerAdapter) IsError() bool {
return h.level <= hclog.Error
}
// ImpliedArgs returns With key/value pairs
func (h *HclogLoggerAdapter) ImpliedArgs() []interface{} {
return h.withArgs
}
// Creates a sublogger that will always have the given key/value pairs
func (h *HclogLoggerAdapter) With(args ...interface{}) hclog.Logger {
h.l.Lock()
defer h.l.Unlock()
newArgs := args
if len(h.withArgs) > 0 {
newArgs = make([]interface{}, len(h.withArgs), len(h.withArgs)+len(args))
copy(newArgs, h.withArgs)
newArgs = append(newArgs, args...)
}
return &HclogLoggerAdapter{
eventCtx: h.eventCtx,
l: new(sync.RWMutex),
level: h.level,
name: h.name,
withArgs: newArgs,
}
}
// Returns the Name of the logger
func (h *HclogLoggerAdapter) Name() string {
h.l.RLock()
defer h.l.RUnlock()
return h.name
}
// Create a logger that will prepend the name string on the front of all messages.
// If the logger already has a name, the new value will be appended to the current
// name. That way, a major subsystem can use this to decorate all it's own logs
// without losing context.
func (h *HclogLoggerAdapter) Named(name string) hclog.Logger {
h.l.Lock()
defer h.l.Unlock()
var newArgs []interface{}
if len(h.withArgs) > 0 {
newArgs = make([]interface{}, len(h.withArgs))
copy(newArgs, h.withArgs)
}
newName := name
if h.name != "" {
newName = fmt.Sprintf("%s.%s", h.name, name)
}
return &HclogLoggerAdapter{
eventCtx: h.eventCtx,
l: new(sync.RWMutex),
level: h.level,
name: newName,
withArgs: newArgs,
}
}
// Create a logger that will prepend the name string on the front of all messages.
// This sets the name of the logger to the value directly, unlike Named which honor
// the current name as well.
func (h *HclogLoggerAdapter) ResetNamed(name string) hclog.Logger {
h.l.Lock()
defer h.l.Unlock()
var newArgs []interface{}
if len(h.withArgs) > 0 {
newArgs = make([]interface{}, len(h.withArgs))
copy(newArgs, h.withArgs)
}
return &HclogLoggerAdapter{
eventCtx: h.eventCtx,
l: new(sync.RWMutex),
level: h.level,
name: name,
withArgs: newArgs,
}
}
// Updates the level. This should affect all related loggers as well,
// unless they were created with IndependentLevels. If an
// implementation cannot update the level on the fly, it should no-op.
//
// This implementation is a no-op currently.
func (h *HclogLoggerAdapter) SetLevel(_ hclog.Level) {}
// Return a value that conforms to the stdlib log.Logger interface
//
// This implementation does not currently support this and returns nil.
func (h *HclogLoggerAdapter) StandardLogger(opts *hclog.StandardLoggerOptions) *log.Logger {
return nil
}
// Return a value that conforms to io.Writer, which can be passed into log.SetOutput()
//
// This implementation does not currently support this and returns nil.
func (h *HclogLoggerAdapter) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer {
return nil
}

@ -0,0 +1,186 @@
package event
import (
"bytes"
"context"
"sync"
"testing"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEventer_HclogLoggerAdapter(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx := context.Background()
buffer := new(bytes.Buffer)
eventerConfig := EventerConfig{
AuditEnabled: true,
ObservationsEnabled: true,
SysEventsEnabled: true,
Sinks: []*SinkConfig{
{
Name: "test-sink",
EventTypes: []Type{EveryType},
Format: TextHclogSinkFormat,
Type: WriterSink,
WriterConfig: &WriterSinkTypeConfig{
Writer: buffer,
},
},
},
}
testLock := &sync.Mutex{}
testLogger := hclog.New(&hclog.LoggerOptions{
Mutex: testLock,
Name: "test",
})
eventer, err := NewEventer(
testLogger,
testLock,
"TestEventer_HclogLoggerAdapter",
eventerConfig,
)
require.NoError(err)
// This test sends a series of events through the hclog adapter and
// validates that we see the ones we expect to see on the other side. It
// also tests various features such as Named and With to ensure they turn
// into values on the other side.
logger, err := NewHclogLogger(ctx, eventer, WithHclogLevel(hclog.Info))
require.NoError(err)
tests := []struct {
name string
plainLog bool
level hclog.Level
shouldNotLog bool
logOverride hclog.Logger
input string
outputSubstrs []string
}{
{
name: "over-level-error",
level: hclog.Error,
input: "over-error",
outputSubstrs: []string{"msg=over-error"},
},
{
name: "over-level-warn",
level: hclog.Warn,
input: "over-warn",
outputSubstrs: []string{"msg=over-warn"},
},
{
name: "at-level",
level: hclog.Info,
input: "at",
outputSubstrs: []string{"msg=at"},
},
{
name: "under-level-debug",
level: hclog.Debug,
input: "under-debug",
shouldNotLog: true,
},
{
name: "under-level-trace",
level: hclog.Trace,
input: "under-trace",
shouldNotLog: true,
},
{
name: "plain-under-trace",
plainLog: true,
level: hclog.Trace,
input: "plain-under-trace",
shouldNotLog: true,
},
{
name: "plain-at",
plainLog: true,
level: hclog.Info,
input: "plain-at",
outputSubstrs: []string{"msg=plain-at"},
},
{
name: "plain-over-warn",
plainLog: true,
level: hclog.Warn,
input: "plain-over-warn",
outputSubstrs: []string{"msg=plain-over-warn"},
},
{
name: "with-named",
level: hclog.Info,
logOverride: logger.Named("named-logger"),
input: "named-input",
outputSubstrs: []string{"msg=named-input", "@original-log-name=named-logger"},
},
{
name: "sub-named",
level: hclog.Info,
logOverride: logger.Named("named-logger").Named("subnamed-logger"),
input: "subnamed-input",
outputSubstrs: []string{"msg=subnamed-input", "@original-log-name=named-logger.subnamed-logger"},
},
{
name: "reset-named",
level: hclog.Info,
logOverride: logger.Named("named-logger").ResetNamed("reset-logger"),
input: "reset-input",
outputSubstrs: []string{"msg=reset-input", "@original-log-name=reset-logger"},
},
{
name: "with-params",
level: hclog.Info,
logOverride: logger.With("with", "params"),
input: "with-params",
outputSubstrs: []string{"msg=with-params", "with=params"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)
buffer.Reset()
loggerToUse := logger
if tt.logOverride != nil {
loggerToUse = tt.logOverride
}
switch tt.plainLog {
case false:
switch tt.level {
case hclog.Error:
assert.True(loggerToUse.IsError() == !tt.shouldNotLog)
loggerToUse.Error(tt.input)
case hclog.Warn:
assert.True(loggerToUse.IsWarn() == !tt.shouldNotLog)
loggerToUse.Warn(tt.input)
case hclog.Info:
assert.True(loggerToUse.IsInfo() == !tt.shouldNotLog)
loggerToUse.Info(tt.input)
case hclog.Debug:
assert.True(loggerToUse.IsDebug() == !tt.shouldNotLog)
loggerToUse.Debug(tt.input)
case hclog.Trace:
assert.True(loggerToUse.IsTrace() == !tt.shouldNotLog)
loggerToUse.Trace(tt.input)
}
default:
loggerToUse.Log(tt.level, tt.input)
}
switch tt.shouldNotLog {
case true:
assert.Len(buffer.String(), 0)
default:
for _, substr := range tt.outputSubstrs {
assert.Contains(buffer.String(), substr)
}
}
})
}
}

@ -4,6 +4,7 @@ import (
"net/url"
"time"
"github.com/hashicorp/go-hclog"
wrapping "github.com/hashicorp/go-kms-wrapping/v2"
)
@ -42,6 +43,11 @@ type options struct {
withSchema *url.URL
withAuditWrapper wrapping.Wrapper
withFilterOperations AuditFilterOperations
withGating bool
withNoGateLocking bool
// These options are related to the hclog adapter
withHclogLevel hclog.Level
withBroker broker // test only option
withAuditSink bool // test only option
@ -196,3 +202,24 @@ func WithFilterOperations(fop AuditFilterOperations) Option {
o.withFilterOperations = fop
}
}
// WithHclogLevel is an option to specify a log level if using the adapter
func WithHclogLevel(with hclog.Level) Option {
return func(o *options) {
o.withHclogLevel = with
}
}
// WithGating starts the eventer in gated mode
func WithGating(with bool) Option {
return func(o *options) {
o.withGating = with
}
}
// WithNoGateLocking is used when trawling through the existing queue to ensure we don't deadlock
func WithNoGateLocking(with bool) Option {
return func(o *options) {
o.withNoGateLocking = with
}
}

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/hashicorp/go-hclog"
wrapping "github.com/hashicorp/go-kms-wrapping/v2"
"github.com/hashicorp/go-kms-wrapping/v2/aead"
"github.com/stretchr/testify/assert"
@ -171,6 +172,27 @@ func Test_GetOpts(t *testing.T) {
testOpts.withFilterOperations = overrides
assert.Equal(opts, testOpts)
})
t.Run("WithHclogLevel", func(t *testing.T) {
assert := assert.New(t)
opts := getOpts(WithHclogLevel(hclog.Info))
testOpts := getDefaultOptions()
testOpts.withHclogLevel = hclog.Info
assert.Equal(opts, testOpts)
})
t.Run("withEventGating", func(t *testing.T) {
assert := assert.New(t)
testOpts := getDefaultOptions()
assert.False(testOpts.withGating)
opts := getOpts(WithGating(true))
assert.True(opts.withGating)
})
t.Run("withNoGateLocking", func(t *testing.T) {
assert := assert.New(t)
testOpts := getDefaultOptions()
assert.False(testOpts.withNoGateLocking)
opts := getOpts(WithNoGateLocking(true))
assert.True(opts.withNoGateLocking)
})
}
// testWrapper initializes an AEAD wrapping.Wrapper for testing. Note: this

@ -2,6 +2,7 @@ package event
import (
"fmt"
"io"
"time"
)
@ -14,9 +15,10 @@ type SinkConfig struct {
AllowFilters []string `hcl:"allow_filters"` // AllowFilters define a set predicates for including an event in the sink. If any filter matches, the event will be included. The filter should be in a format supported by hashicorp/go-bexpr.
DenyFilters []string `hcl:"deny_filters"` // DenyFilters define a set predicates for excluding an event in the sink. If any filter matches, the event will be excluded. The filter should be in a format supported by hashicorp/go-bexpr.
Format SinkFormat `hcl:"format"` // Format defines the format for the sink (JSONSinkFormat or TextSinkFormat).
Type SinkType `hcl:"type"` // Type defines the type of sink (StderrSink or FileSink).
Type SinkType `hcl:"type"` // Type defines the type of sink (StderrSink, FileSink, or WriterSink).
StderrConfig *StderrSinkTypeConfig `hcl:"stderr"` // StderrConfig defines parameters for a stderr output.
FileConfig *FileSinkTypeConfig `hcl:"file"` // FileConfig defines parameters for a file output.
WriterConfig *WriterSinkTypeConfig `hcl:"-"` // WriterConfig defines parameters for an io.Writer output. This is not available via HCL.
AuditConfig *AuditConfig `hcl:"audit_config"` // AuditConfig defines optional parameters for audit events (if EventTypes contains audit)
}
@ -36,6 +38,9 @@ func (sc *SinkConfig) Validate() error {
if sc.FileConfig != nil {
foundSinkTypeConfigs++
}
if sc.WriterConfig != nil {
foundSinkTypeConfigs++
}
if foundSinkTypeConfigs > 1 {
return fmt.Errorf("%s: too many sink type config blocks: %w", op, ErrInvalidParameter)
}
@ -56,6 +61,13 @@ func (sc *SinkConfig) Validate() error {
if sc.FileConfig.FileName == "" {
return fmt.Errorf("%s: missing file name: %w", op, ErrInvalidParameter)
}
case WriterSink:
if sc.WriterConfig == nil {
return fmt.Errorf(`%s: missing writer config: %w`, op, ErrInvalidParameter)
}
if sc.WriterConfig.Writer == nil {
return fmt.Errorf("%s: missing writer: %w", op, ErrInvalidParameter)
}
}
if sc.Name == "" {
return fmt.Errorf("%s: missing sink name: %w", op, ErrInvalidParameter)
@ -95,6 +107,11 @@ type FileSinkTypeConfig struct {
RotateMaxFiles int `hcl:"rotate_max_files" mapstructure:"rotate_max_files"` // RotateMaxFiles defines how may historical rotated files should be kept for a FileSink
}
// WriterSinkTypeConfig contains configuration structures for writer sink types
type WriterSinkTypeConfig struct {
Writer io.Writer `hcl:"-" mapstructure:"-"` // The writer to write to
}
// FilterType defines a type for filters (allow or deny)
type FilterType string

@ -7,14 +7,15 @@ import (
const (
StderrSink SinkType = "stderr" // StderrSink is written to stderr
FileSink SinkType = "file" // FileSink is written to a file
WriterSink SinkType = "writer" // WriterSink is written to an io.Writer
)
type SinkType string // SinkType defines the type of sink in a config stanza (file, stderr)
type SinkType string // SinkType defines the type of sink in a config stanza (file, stderr, writer)
func (t SinkType) Validate() error {
const op = "event.(SinkType).validate"
switch t {
case StderrSink, FileSink:
case StderrSink, FileSink, WriterSink:
return nil
default:
return fmt.Errorf("%s: '%s' is not a valid sink type: %w", op, t, ErrInvalidParameter)

@ -125,7 +125,14 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
}
}
var pluginLogger hclog.Logger
for _, enabledPlugin := range c.enabledPlugins {
if pluginLogger == nil {
pluginLogger, err = event.NewHclogLogger(ctx, c.conf.Server.Eventer)
if err != nil {
return nil, fmt.Errorf("error creating host catalog plugin logger: %w", err)
}
}
switch enabledPlugin {
case base.EnabledPluginHostLoopback:
plg := pluginhost.NewWrappingPluginClient(pluginhost.NewLoopbackPlugin())
@ -145,6 +152,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
pluginutil.WithPluginExecutionDirectory(conf.RawConfig.Plugins.ExecutionDir),
pluginutil.WithPluginsFilesystem(host_plugin_assets.HostPluginPrefix, host_plugin_assets.FileSystem()),
),
external_host_plugins.WithLogger(pluginLogger.Named(pluginType)),
)
if err != nil {
return nil, fmt.Errorf("error creating %s host plugin: %w", pluginType, err)

Loading…
Cancel
Save