support new event formats of: hclog-text and hclog-json (#1440)

pull/1445/head
Jim 5 years ago committed by GitHub
parent e3012afb4f
commit 3a3f956615
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -85,6 +85,7 @@ Canonical reference for changes, improvements, and bugfixes for Boundary.
PRs:
[hclog json,text formats](https://github.com/hashicorp/boundary/pull/1440),
[log adapters](https://github.com/hashicorp/boundary/pull/1434),
[unneeded log deps](https://github.com/hashicorp/boundary/pull/1433),
[update eventlogger](https://github.com/hashicorp/boundary/pull/1411),

@ -11,6 +11,7 @@ require (
github.com/bufbuild/buf v0.37.0
github.com/dhui/dktest v0.3.4
github.com/fatih/color v1.12.0
github.com/fatih/structs v1.1.0 // indirect
github.com/favadi/protoc-go-inject-tag v1.1.0
github.com/golang-migrate/migrate/v4 v4.14.1
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe

@ -276,7 +276,7 @@ func (c *Command) Flags() *base.FlagSets {
f.StringVar(&base.StringVar{
Name: "event-format",
Target: &c.flagEventFormat,
Completion: complete.PredictSet("cloudevents-json", "cloudevents-text"),
Completion: complete.PredictSet("cloudevents-json", "cloudevents-text", "hclog-text", "hclog-json"),
Usage: `Event format. Supported values are "cloudevents-json" and "cloudevents-text".`,
})
f.StringVar(&base.StringVar{

@ -11,17 +11,17 @@ import (
"github.com/hashicorp/go-bexpr"
)
// Node represents an eventlogger.Node which filters events based on allow and
// cloudEventsFormatterFilter represents an eventlogger.cloudEventsFormatterFilter which filters events based on allow and
// deny bexpr filters
type Node struct {
type cloudEventsFormatterFilter struct {
*cloudevents.FormatterFilter
allow []*filter
deny []*filter
}
// NewCloudEventsNode creates a new filter node using the optional allow and deny filters
// newCloudEventsFormatterFilter creates a new filter node using the optional allow and deny filters
// provided. Support for WithAllow and WithDeny options.
func NewCloudEventsNode(source *url.URL, format cloudevents.Format, opt ...Option) (*Node, error) {
func newCloudEventsFormatterFilter(source *url.URL, format cloudevents.Format, opt ...Option) (*cloudEventsFormatterFilter, error) {
const op = "event.NewCloudEventsNode"
if source == nil {
return nil, fmt.Errorf("%s: missing source: %w", op, ErrInvalidParameter)
@ -32,7 +32,7 @@ func NewCloudEventsNode(source *url.URL, format cloudevents.Format, opt ...Optio
return nil, fmt.Errorf("%s: invalid format '%s': %w", op, format, ErrInvalidParameter)
}
opts := getOpts(opt...)
n := Node{
n := cloudEventsFormatterFilter{
FormatterFilter: &cloudevents.FormatterFilter{
Source: source,
Schema: opts.withSchema,
@ -91,7 +91,7 @@ func newPredicate(allow, deny []*filter) func(ctx context.Context, ce interface{
}
}
var _ eventlogger.Node = &Node{}
var _ eventlogger.Node = &cloudEventsFormatterFilter{}
type filter struct {
raw string

@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"
)
func Test_NewCloudEventsNode(t *testing.T) {
func Test_newCloudEventsFormatterFilter(t *testing.T) {
t.Parallel()
testSource, err := url.Parse("https://localhost:9200")
require.NoError(t, err)
@ -111,7 +111,7 @@ func Test_NewCloudEventsNode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
got, err := NewCloudEventsNode(tt.source, tt.format, tt.opt...)
got, err := newCloudEventsFormatterFilter(tt.source, tt.format, tt.opt...)
if tt.wantErr {
require.Error(err)
assert.Nil(got)
@ -144,7 +144,7 @@ func TestNode_Process(t *testing.T) {
require.NoError(t, err)
now := time.Now()
testNode, err := NewCloudEventsNode(testUrl, cloudevents.FormatJSON, WithSchema(testUrl))
testNode, err := newCloudEventsFormatterFilter(testUrl, cloudevents.FormatJSON, WithSchema(testUrl))
require.NoError(t, err)
f, err := newFilter(`Data == "match-filter"`)
@ -152,7 +152,7 @@ func TestNode_Process(t *testing.T) {
tests := []struct {
name string
n *Node
n *cloudEventsFormatterFilter
e *eventlogger.Event
format cloudevents.Format
predicate func(ctx context.Context, ce interface{}) (bool, error)

@ -473,7 +473,6 @@ func Test_WriteObservation(t *testing.T) {
actualJson, err := json.Marshal(gotObservation)
require.NoError(err)
fmt.Println(string(actualJson))
wantJson := testObservationJsonFromCtx(t, tt.ctx, event.Op(tt.name), gotObservation, tt.header, tt.details)
assert.JSONEq(string(wantJson), string(actualJson))
@ -1051,9 +1050,6 @@ func Test_WriteError(t *testing.T) {
gotError := &cloudevents.Event{}
err = json.Unmarshal(b, gotError)
require.NoErrorf(err, "json: %s", string(b))
fmt.Println("raw: ", string(b))
require.NoError(err)
if _, ok := gotError.Data.(map[string]interface{})["error_fields"].(map[string]interface{})["Msg"]; ok {
actualError := fakeError{

@ -417,29 +417,47 @@ func newFmtFilterNode(serverName string, c SinkConfig) (eventlogger.NodeID, even
if serverName == "" {
return "", nil, fmt.Errorf("%s: missing server name: %w", op, ErrInvalidParameter)
}
id, err := NewId("cloudevents")
if err != nil {
return "", nil, fmt.Errorf("%s: unable to generate id: %w", op, err)
}
var sourceUrl *url.URL
switch {
case c.EventSourceUrl != "":
sourceUrl, err = url.Parse(c.EventSourceUrl)
var fmtId eventlogger.NodeID
var fmtNode eventlogger.Node
switch c.Format {
case TextHclogSinkFormat, JSONHclogSinkFormat:
id, err := NewId(string(c.Format))
if err != nil {
return "", nil, fmt.Errorf("%s: invalid event source URL (%s): %w", op, c.EventSourceUrl, err)
return "", nil, fmt.Errorf("%s: unable to generate id: %w", op, err)
}
fmtId = eventlogger.NodeID(id)
fmtNode, err = newHclogFormatterFilter(c.Format == JSONHclogSinkFormat, WithAllow(c.AllowFilters...), WithDeny(c.DenyFilters...))
if err != nil {
return "", nil, fmt.Errorf("%s: %w", op, err)
}
default:
s := fmt.Sprintf("https://hashicorp.com/boundary/%s", serverName)
sourceUrl, err = url.Parse(s)
id, err := NewId("cloudevents")
if err != nil {
return "", nil, fmt.Errorf("%s: invalid event source URL (%s): %w", op, s, err)
return "", nil, fmt.Errorf("%s: unable to generate id: %w", op, err)
}
fmtId = eventlogger.NodeID(id)
var sourceUrl *url.URL
switch {
case c.EventSourceUrl != "":
sourceUrl, err = url.Parse(c.EventSourceUrl)
if err != nil {
return "", nil, fmt.Errorf("%s: invalid event source URL (%s): %w", op, c.EventSourceUrl, err)
}
default:
s := fmt.Sprintf("https://hashicorp.com/boundary/%s", serverName)
sourceUrl, err = url.Parse(s)
if err != nil {
return "", nil, fmt.Errorf("%s: invalid event source URL (%s): %w", op, s, err)
}
}
fmtNode, err = newCloudEventsFormatterFilter(sourceUrl, cloudevents.Format(c.Format), WithAllow(c.AllowFilters...), WithDeny(c.DenyFilters...))
if err != nil {
return "", nil, fmt.Errorf("%s: %w", op, err)
}
}
fmtNode, err := NewCloudEventsNode(sourceUrl, cloudevents.Format(c.Format), WithAllow(c.AllowFilters...), WithDeny(c.DenyFilters...))
if err != nil {
return "", nil, fmt.Errorf("%s: %w", op, err)
}
return eventlogger.NodeID(id), fmtNode, nil
return fmtId, fmtNode, nil
}
func DefaultEventerConfig() *EventerConfig {

@ -354,6 +354,8 @@ func Test_NewEventer(t *testing.T) {
testSetupWithOpts := TestEventerConfig(t, "Test_NewEventer", TestWithAuditSink(t), TestWithObservationSink(t), testWithSysSink(t))
testHclogSetup := TestEventerConfig(t, "Test_NewEventer", testWithSinkFormat(t, TextHclogSinkFormat))
testLock := &sync.Mutex{}
testLogger := hclog.New(&hclog.LoggerOptions{
Mutex: testLock,
@ -523,6 +525,46 @@ func Test_NewEventer(t *testing.T) {
"audit": 3,
},
},
{
name: "testSetup-with-hclog",
config: testHclogSetup.EventerConfig,
logger: testLogger,
lock: testLock,
serverName: "testSetup",
want: &Eventer{
logger: testLogger,
conf: testHclogSetup.EventerConfig,
},
wantRegistered: []string{
"hclog-text", // stderr
"stderr", // stderr
"gated-observation", // stderr
"gated-audit", // stderr
"hclog-text", // every-type-file-sync
"tmp-all-events", // every-type-file-sync
"gated-observation", // every-type-file-sync
"gated-audit", // every-type-file-sync
"hclog-text", // error-file-sink
"tmp-errors", // error-file-sink
},
wantPipelines: []string{
"audit", // every-type-file-sync
"audit", // stderr
"observation", // every-type-file-sync
"observation", // stderr
"error", // every-type-file-sync
"error", // stderr
"error", // error-file-sink
"system", // stderr
"system", // stderr
},
wantThresholds: map[eventlogger.EventType]int{
"error": 3,
"system": 2,
"observation": 2,
"audit": 2,
},
},
}
for _, tt := range tests {

@ -0,0 +1,179 @@
package event
import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
"github.com/fatih/structs"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/go-hclog"
)
const (
infoField = "Info"
errorFields = "ErrorFields"
requestInfoField = "RequestInfo"
wrappedField = "Wrapped"
hclogNodeName = "hclog-formatter-filter"
)
// hclogFormatterFilter will format a boundary event an an hclog entry.
type hclogFormatterFilter struct {
// jsonFormat allows you to specify that the hclog entry should be in JSON
// fmt.
jsonFormat bool
predicate func(ctx context.Context, i interface{}) (bool, error)
allow []*filter
deny []*filter
}
func newHclogFormatterFilter(jsonFormat bool, opt ...Option) (*hclogFormatterFilter, error) {
const op = "event.NewHclogFormatter"
n := hclogFormatterFilter{
jsonFormat: jsonFormat,
}
opts := getOpts(opt...)
// intentionally not checking if allow and/or deny optional filters were
// supplied since having a filter node with no filters is okay.
if len(opts.withAllow) > 0 {
n.allow = make([]*filter, 0, len((opts.withAllow)))
for i := range opts.withAllow {
f, err := newFilter(opts.withAllow[i])
if err != nil {
return nil, fmt.Errorf("%s: invalid allow filter '%s': %w", op, opts.withAllow[i], err)
}
n.allow = append(n.allow, f)
}
}
if len(opts.withDeny) > 0 {
n.deny = make([]*filter, 0, len((opts.withDeny)))
for i := range opts.withDeny {
f, err := newFilter(opts.withDeny[i])
if err != nil {
return nil, fmt.Errorf("%s: invalid deny filter '%s': %w", op, opts.withDeny[i], err)
}
n.deny = append(n.deny, f)
}
}
n.predicate = newPredicate(n.allow, n.deny)
return &n, nil
}
// Reopen is a no op
func (_ *hclogFormatterFilter) Reopen() error { return nil }
// Type describes the type of the node as a Formatter.
func (_ *hclogFormatterFilter) Type() eventlogger.NodeType {
return eventlogger.NodeTypeFormatterFilter
}
// Name returns a representation of the HclogFormatter's name
func (_ *hclogFormatterFilter) Name() string {
return hclogNodeName
}
// Process formats the Boundary event as an hclog entry and stores that
// formatted data in Event.Formatted with a key of either "hclog-text"
// (TextHclogSinkFormat) or "hclog-json" (JSONHclogSinkFormat) based on the
// HclogFormatter.JSONFormat value.
//
// If the node has a Predicate, then the filter will be applied to event.Payload
func (f *hclogFormatterFilter) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) {
const op = "event.(HclogFormatter).Process"
if e == nil {
return nil, errors.New("event is nil")
}
if f.predicate != nil {
// Use the predicate to see if we want to keep the event using it's
// formatted struct as a parmeter to the predicate.
keep, err := f.predicate(ctx, e.Payload)
if err != nil {
return nil, fmt.Errorf("%s: unable to filter: %w", op, err)
}
if !keep {
// Return nil to signal that the event should be discarded.
return nil, nil
}
}
var m map[string]interface{}
switch string(e.Type) {
case string(ErrorType), string(AuditType), string(SystemType):
m = structs.Map(e.Payload)
case string(ObservationType):
m = e.Payload.(map[string]interface{})
default:
return nil, fmt.Errorf("%s: unknown event type %s", op, e.Type)
}
args := make([]interface{}, 0, len(m))
for k, v := range m {
if k == requestInfoField && v == nil {
continue
}
if !f.jsonFormat && v != nil {
valueKind := reflect.TypeOf(v).Kind()
if valueKind == reflect.Ptr {
valueKind = reflect.TypeOf(v).Elem().Kind()
}
switch {
case valueKind == reflect.Map:
for sk, sv := range v.(map[string]interface{}) {
args = append(args, k+":"+sk, sv)
}
continue
case valueKind == reflect.Struct && v != nil && !reflect.ValueOf(v).IsNil():
for sk, sv := range structs.Map(v) {
args = append(args, k+":"+sk, sv)
}
continue
}
}
switch string(e.Type) {
case string(ErrorType):
switch {
case k == errorFields && v == nil:
continue
case k == infoField && len(v.(map[string]interface{})) == 0:
continue
case k == wrappedField && v == nil:
continue
}
}
args = append(args, k, v)
}
var buf bytes.Buffer
logger := hclog.New(&hclog.LoggerOptions{
Output: &buf,
Level: hclog.Trace,
JSONFormat: f.jsonFormat,
})
const eventMarker = " event"
switch string(e.Type) {
case string(ErrorType):
logger.Error(string(e.Type)+eventMarker, args...)
case string(ObservationType), string(SystemType), string(AuditType):
logger.Info(string(e.Type)+eventMarker, args...)
default:
// well, we should ever hit this, since we should be specific about the
// event type we're processing, but adding this default to just be sure
// we haven't missed anything.
logger.Trace(string(e.Type)+eventMarker, args...)
}
switch f.jsonFormat {
case true:
e.FormattedAs(string(JSONHclogSinkFormat), buf.Bytes())
case false:
e.FormattedAs(string(TextHclogSinkFormat), buf.Bytes())
}
return e, nil
}

@ -0,0 +1,368 @@
package event
import (
"context"
"testing"
"github.com/hashicorp/eventlogger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestHclogFormatter_Process(t *testing.T) {
t.Parallel()
ctx := context.Background()
f, e := newFilter(`Op == "match-filter"`)
require.NoError(t, e)
testPredicate := newPredicate([]*filter{f}, nil)
tests := []struct {
name string
formatter *hclogFormatterFilter
e *eventlogger.Event
wantErrContains string
want []string
}{
{
name: "nil event",
formatter: &hclogFormatterFilter{
jsonFormat: false,
},
wantErrContains: "event is nil",
},
{
name: "invalid-event-type",
formatter: &hclogFormatterFilter{
jsonFormat: false,
},
e: &eventlogger.Event{Type: eventlogger.EventType("invalid-type")},
wantErrContains: "unknown event type invalid-type",
},
{
name: "sys-text",
formatter: &hclogFormatterFilter{
jsonFormat: false,
},
e: &eventlogger.Event{
Type: eventlogger.EventType(SystemType),
Payload: &sysEvent{
Id: "1",
Version: errorVersion,
Op: Op("text"),
Data: map[string]interface{}{
"msg": "hello",
},
},
},
want: []string{
"[INFO] system event:",
"Data:msg=hello",
"Id=1",
"Version=v0.1",
"Op=text",
},
},
{
name: "observation-text",
formatter: &hclogFormatterFilter{
jsonFormat: false,
},
e: &eventlogger.Event{
Type: eventlogger.EventType(ObservationType),
Payload: map[string]interface{}{
"id": "1",
"version": observationVersion,
"latency-ms": 10,
},
},
want: []string{
"[INFO] observation event:",
"latency-ms=10",
"id=1",
"version=v0.1",
},
},
{
name: "observation-json",
formatter: &hclogFormatterFilter{
jsonFormat: true,
},
e: &eventlogger.Event{
Type: eventlogger.EventType(ObservationType),
Payload: map[string]interface{}{
"id": "1",
"version": observationVersion,
"latency-ms": 10,
},
},
want: []string{
"{\"@level\":\"info\",\"@message\":\"observation event\"",
"\"latency-ms\":10",
"\"id\":\"1\"",
"\"version\":\"v0.1\"}\n",
},
},
{
name: "err-text",
formatter: &hclogFormatterFilter{
jsonFormat: false,
},
e: &eventlogger.Event{
Type: eventlogger.EventType(ErrorType),
Payload: &err{
Id: "1",
Version: errorVersion,
Error: ErrInvalidParameter.Error(),
Op: Op("text"),
},
},
want: []string{
"[ERROR] error event:",
"Error=\"invalid parameter\"",
"Id=1",
"Version=v0.1",
"Op=text",
},
},
{
name: "err-json",
formatter: &hclogFormatterFilter{
jsonFormat: true,
},
e: &eventlogger.Event{
Type: eventlogger.EventType(ErrorType),
Payload: &err{
Id: "1",
Version: errorVersion,
Error: ErrInvalidParameter.Error(),
Op: Op("text"),
},
},
want: []string{
"{\"@level\":\"error\",\"@message\":\"error event\"",
"\"Error\":\"invalid parameter\"",
"\"Id\":\"1\"",
"\"Version\":\"v0.1\"",
"\"Op\":\"text\""},
},
{
name: "err-text-with-optional",
formatter: &hclogFormatterFilter{
jsonFormat: false,
},
e: &eventlogger.Event{
Type: eventlogger.EventType(ErrorType),
Payload: &err{
Id: "1",
Version: errorVersion,
Error: ErrInvalidParameter.Error(),
Op: Op("text"),
Info: map[string]interface{}{"name": "alice"},
},
},
want: []string{
"[ERROR] error event:",
"Error=\"invalid parameter\"",
"Id=1",
"Version=v0.1",
"Info:name=alice",
"Op=text",
},
},
{
name: "filter-match",
formatter: &hclogFormatterFilter{
jsonFormat: false,
predicate: testPredicate,
},
e: &eventlogger.Event{
Type: eventlogger.EventType(SystemType),
Payload: &sysEvent{
Id: "1",
Version: errorVersion,
Op: Op("match-filter"),
Data: map[string]interface{}{
"msg": "hello",
},
},
},
want: []string{
"[INFO] system event:",
"Data:msg=hello",
"Id=1",
"Version=v0.1",
"Op=match-filter",
},
},
{
name: "filter-no-match",
formatter: &hclogFormatterFilter{
jsonFormat: false,
predicate: testPredicate,
},
e: &eventlogger.Event{
Type: eventlogger.EventType(SystemType),
Payload: &sysEvent{
Id: "1",
Version: errorVersion,
Op: Op("doesn't match"),
Data: map[string]interface{}{
"msg": "hello",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
e, err := tt.formatter.Process(ctx, tt.e)
if tt.wantErrContains != "" {
require.Error(err)
assert.Contains(err.Error(), tt.wantErrContains)
return
}
require.NoError(err)
if len(tt.want) == 0 {
assert.Nil(e)
return
}
assert.NotNil(e)
var b []byte
var ok bool
switch tt.formatter.jsonFormat {
case true:
b, ok = e.Format(string(JSONHclogSinkFormat))
case false:
b, ok = e.Format(string(TextHclogSinkFormat))
}
t.Log(string(b))
require.True(ok)
for _, txt := range tt.want {
assert.Contains(string(b), txt)
}
})
}
}
func Test_hclogFormatterFilter_Name(t *testing.T) {
t.Parallel()
t.Run("simple", func(t *testing.T) {
ff := &hclogFormatterFilter{}
assert.Equal(t, hclogNodeName, ff.Name())
})
}
func Test_hclogFormatterFilter_Reopen(t *testing.T) {
t.Parallel()
t.Run("simple", func(t *testing.T) {
ff := &hclogFormatterFilter{}
assert.Equal(t, nil, ff.Reopen())
})
}
func Test_hclogFormatterFilter_Type(t *testing.T) {
t.Parallel()
t.Run("simple", func(t *testing.T) {
ff := &hclogFormatterFilter{}
assert.Equal(t, eventlogger.NodeTypeFormatterFilter, ff.Type())
})
}
func Test_newHclogFormatterFilter(t *testing.T) {
t.Parallel()
tests := []struct {
name string
jsonFormat bool
opt []Option
wantErr bool
wantIsError error
wantErrContains string
wantAllow []string
wantDeny []string
}{
{
name: "no-opts",
},
{
name: "bad-allow-filter",
jsonFormat: true,
opt: []Option{
WithAllow("foo=;22", "foo==bar"),
},
wantErr: true,
wantErrContains: "invalid allow filter 'foo=;22'",
},
{
name: "bad-deny-filter",
jsonFormat: true,
opt: []Option{
WithDeny("foo=;22", "foo==bar"),
},
wantErr: true,
wantErrContains: "invalid deny filter 'foo=;22'",
},
{
name: "empty-allow-filter",
jsonFormat: true,
opt: []Option{
WithAllow(""),
},
wantErr: true,
wantErrContains: "missing filter",
},
{
name: "empty-deny-filter",
jsonFormat: true,
opt: []Option{
WithDeny(""),
},
wantErr: true,
wantErrContains: "missing filter",
},
{
name: "valid-filters",
jsonFormat: true,
opt: []Option{
WithAllow("alice==friend", "bob==friend"),
WithDeny("eve==acquaintance", "fido!=dog"),
},
wantAllow: []string{"alice==friend", "bob==friend"},
wantDeny: []string{"eve==acquaintance", "fido!=dog"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
got, err := newHclogFormatterFilter(tt.jsonFormat, tt.opt...)
if tt.wantErr {
require.Error(err)
assert.Nil(got)
if tt.wantIsError != nil {
assert.ErrorIs(err, tt.wantIsError)
}
if tt.wantErrContains != "" {
assert.Contains(err.Error(), tt.wantErrContains)
}
return
}
require.NoError(err)
assert.NotNil(got)
assert.Equal(tt.jsonFormat, got.jsonFormat)
assert.Len(got.allow, len(tt.wantAllow))
for _, f := range got.allow {
assert.Contains(tt.wantAllow, f.raw)
}
assert.Len(got.deny, len(tt.wantDeny))
for _, f := range got.deny {
assert.Contains(tt.wantDeny, f.raw)
}
})
}
}

@ -39,10 +39,11 @@ type options struct {
withDeny []string
withSchema *url.URL
withBroker broker // test only option
withAuditSink bool // test only option
withObservationSink bool // test only option
withSysSink bool // test only option
withBroker broker // test only option
withAuditSink bool // test only option
withObservationSink bool // test only option
withSysSink bool // test only option
withSinkFormat SinkFormat // test only option
}
func getDefaultOptions() options {

@ -5,8 +5,10 @@ import (
)
const (
JSONSinkFormat SinkFormat = "cloudevents-json" // JSONSinkFormat means the event is formatted as JSON
TextSinkFormat SinkFormat = "cloudevents-text" // TextSinkFormat means the event is formmatted as text
JSONSinkFormat SinkFormat = "cloudevents-json" // JSONSinkFormat means the event is formatted as JSON
TextSinkFormat SinkFormat = "cloudevents-text" // TextSinkFormat means the event is formmatted as text
TextHclogSinkFormat SinkFormat = "hclog-text" // TextHclogSinkFormat means the event is formatted as an hclog text entry
JSONHclogSinkFormat SinkFormat = "hclog-json" // JSONHclogSinkFormat means the event is formated as an hclog json entry
)
type SinkFormat string // SinkFormat defines the formatting for a sink in a config file stanza (json)
@ -16,6 +18,8 @@ func (f SinkFormat) Validate() error {
switch f {
case JSONSinkFormat, TextSinkFormat:
return nil
case TextHclogSinkFormat, JSONHclogSinkFormat:
return nil
default:
return fmt.Errorf("%s: '%s' is not a valid sink format: %w", op, f, ErrInvalidParameter)
}

@ -67,6 +67,11 @@ func TestEventerConfig(t *testing.T, testName string, opt ...Option) TestConfig
os.Remove(tmpErrFile.Name())
})
opts := getOpts(opt...)
if opts.withSinkFormat == "" {
opts.withSinkFormat = JSONSinkFormat
}
c := TestConfig{
EventerConfig: EventerConfig{
ObservationsEnabled: true,
@ -76,7 +81,7 @@ func TestEventerConfig(t *testing.T, testName string, opt ...Option) TestConfig
Name: "every-type-file-sink",
SinkType: FileSink,
EventTypes: []Type{EveryType},
Format: JSONSinkFormat,
Format: opts.withSinkFormat,
Path: "./",
FileName: tmpAllFile.Name(),
},
@ -84,13 +89,13 @@ func TestEventerConfig(t *testing.T, testName string, opt ...Option) TestConfig
Name: "stderr",
SinkType: StderrSink,
EventTypes: []Type{EveryType},
Format: JSONSinkFormat,
Format: opts.withSinkFormat,
},
{
Name: "err-file-sink",
SinkType: FileSink,
EventTypes: []Type{ErrorType},
Format: JSONSinkFormat,
Format: opts.withSinkFormat,
Path: "./",
FileName: tmpErrFile.Name(),
},
@ -99,7 +104,6 @@ func TestEventerConfig(t *testing.T, testName string, opt ...Option) TestConfig
AllEvents: tmpAllFile,
ErrorEvents: tmpErrFile,
}
opts := getOpts(opt...)
if opts.withAuditSink {
tmpFile, err := ioutil.TempFile("./", "tmp-audit-"+testName)
require.NoError(err)
@ -110,7 +114,7 @@ func TestEventerConfig(t *testing.T, testName string, opt ...Option) TestConfig
Name: "audit-file-sink",
SinkType: FileSink,
EventTypes: []Type{AuditType},
Format: JSONSinkFormat,
Format: opts.withSinkFormat,
Path: "./",
FileName: tmpFile.Name(),
})
@ -126,7 +130,7 @@ func TestEventerConfig(t *testing.T, testName string, opt ...Option) TestConfig
Name: "err-observation-sink",
SinkType: FileSink,
EventTypes: []Type{ObservationType},
Format: JSONSinkFormat,
Format: opts.withSinkFormat,
Path: "./",
FileName: tmpFile.Name(),
})
@ -142,7 +146,7 @@ func TestEventerConfig(t *testing.T, testName string, opt ...Option) TestConfig
Name: "err-sysevents-sink",
SinkType: FileSink,
EventTypes: []Type{SystemType},
Format: JSONSinkFormat,
Format: opts.withSinkFormat,
Path: "./",
FileName: tmpFile.Name(),
})
@ -229,6 +233,14 @@ func testWithSysSink(t *testing.T) Option {
}
}
// testWithSinkFormat is an unexported and a test option
func testWithSinkFormat(t *testing.T, fmt SinkFormat) Option {
t.Helper()
return func(o *options) {
o.withSinkFormat = fmt
}
}
type testMockBroker struct {
reopened bool
stopTimeAt time.Time

Loading…
Cancel
Save