diff --git a/internal/event/event_observation.go b/internal/event/event_observation.go index 86368272a3..c346d49182 100644 --- a/internal/event/event_observation.go +++ b/internal/event/event_observation.go @@ -4,9 +4,7 @@ package event import ( - "encoding/json" "fmt" - "reflect" "github.com/hashicorp/eventlogger" "github.com/hashicorp/eventlogger/filters/gated" @@ -53,10 +51,14 @@ func newObservation(fromOperation Op, opt ...Option) (*observation, error) { Flush: opts.withFlush, Op: fromOperation, RequestInfo: opts.withRequestInfo, - Request: opts.withRequest, - Response: opts.withResponse, Version: observationVersion, } + + if opts.withTelemetry { + i.Request = opts.withRequest + i.Response = opts.withResponse + } + if err := i.validate(); err != nil { return nil, fmt.Errorf("%s: %w", op, err) } @@ -117,16 +119,11 @@ func (o *observation) ComposeFrom(events []*eventlogger.Event) (eventlogger.Even } } if g.Request.Details != nil { - res := recurseStructureWithTagFilter( - g.Request.Details, - map[string]string{ - "eventstream": "observation", - }, - false, - ) - data, _ := json.Marshal(res) - fmt.Printf("RESULT request = %s", string(data)) - msgReq.Details = g.Request.Details + filteredRequest, err := filterProtoMessage(g.Request.Details, telemetryFilter) + if err != nil { + continue + } + msgReq.Details = filteredRequest } if g.Request.Operation != "" { msgReq.Operation = g.Request.Operation @@ -151,17 +148,11 @@ func (o *observation) ComposeFrom(events []*eventlogger.Event) (eventlogger.Even msgRes.StatusCode = g.Response.StatusCode } if g.Response.Details != nil { - res := recurseStructureWithTagFilter( - g.Response.Details, - map[string]string{ - "eventstream": "observation", - }, - false, - ) - data, _ := json.Marshal(res) - fmt.Printf("RESULT response = %s \n", string(data)) - - msgRes.Details = g.Response.Details + filteredResponse, err := filterProtoMessage(g.Response.Details, telemetryFilter) + if err != nil { + continue + } + msgRes.Details = filteredResponse } if g.Response.DetailsUpstreamMessage != nil { msgRes.DetailsUpstreamMessage = g.Response.DetailsUpstreamMessage @@ -184,92 +175,3 @@ func (o *observation) GetID() string { func (o *observation) FlushEvent() bool { return o.Flush } - -func recurseStructureWithTagFilter(value any, allowedStructTags map[string]string, allowLevel bool) any { - if allowedStructTags == nil { - allowedStructTags = map[string]string{} - } - - kind := reflect.ValueOf(value).Kind() - - switch kind { - case reflect.Interface, reflect.Ptr: - value = reflect.ValueOf(value).Elem().Interface() - return recurseStructureWithTagFilter(value, allowedStructTags, allowLevel) - case reflect.Map: - out := map[string]any{} - m := reflect.ValueOf(value) - for _, k := range m.MapKeys() { - mVal := m.MapIndex(k).Interface() - if vv := recurseStructureWithTagFilter(mVal, allowedStructTags, allowLevel); vv != nil { - if strKey, ok := k.Interface().(string); ok { - out[strKey] = vv - } else { - out[fmt.Sprintf("%v", k)] = vv - } - } - } - if allowLevel { - return out - } - return nil - case reflect.Array, reflect.Slice: - out := []any{} - s := reflect.ValueOf(value) - for i := 0; i < s.Len(); i++ { - sVal := s.Index(i).Interface() - if vv := recurseStructureWithTagFilter(sVal, allowedStructTags, allowLevel); vv != nil { - out = append(out, vv) - } - } - if allowLevel { - return out - } - return nil - case reflect.Struct: - // traverse the struct recursively, and apply any tag filters - fields := reflect.TypeOf(value) - values := reflect.ValueOf(value) - num := fields.NumField() - out := map[string]any{} - for i := 0; i < num; i++ { - field := fields.Field(i) - name := field.Name - allowLevel := false - if len(allowedStructTags) == 0 { - allowLevel = true - } else { - for tag, tagValue := range allowedStructTags { - if field.Tag.Get(tag) == tagValue { - // log.Printf("%s is allowed by Tags (%s:\"%s\")", name, tag, tagValue) - allowLevel = true - break - } - } - } - if !field.IsExported() { - continue - } - if values.Field(i).IsZero() { - if allowLevel { - out[name] = "" - } - continue - } - if !values.Field(i).IsValid() { - continue - } - value := values.Field(i).Interface() - if vv := recurseStructureWithTagFilter(value, allowedStructTags, allowLevel); vv != nil { - out[name] = vv - } - } - return out - default: - // any other non structured type, we will just barf out as is - if allowLevel { - return value - } - return nil - } -} diff --git a/internal/event/event_observation_test.go b/internal/event/event_observation_test.go index c766c6c18e..d46d4f68a4 100644 --- a/internal/event/event_observation_test.go +++ b/internal/event/event_observation_test.go @@ -4,14 +4,13 @@ package event import ( - "encoding/json" "testing" "time" - "github.com/hashicorp/boundary/internal/gen/controller/servers" - "github.com/hashicorp/boundary/internal/gen/controller/servers/services" + "github.com/hashicorp/eventlogger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" ) func Test_newObservation(t *testing.T) { @@ -143,48 +142,236 @@ func Test_observationEventType(t *testing.T) { assert.Equal(t, string(ObservationType), e.EventType()) } -func Test_iterateProto(t *testing.T) { - assert, _ := assert.New(t), require.New(t) - input := Request{ - Operation: "", - Endpoint: "", - Details: &services.StatusRequest{ - Jobs: []*services.JobStatus{ - {Job: &services.Job{ - Type: 1, - JobInfo: nil, - }}, - }, - UpdateTags: false, - WorkerStatus: &servers.ServerWorkerStatus{ - PublicId: "testID", - Name: "w_1234567890", - Description: "A default worker created in", - Address: "127.0.0.1:9202", - Tags: []*servers.TagPair{ - { - Key: "type", - Value: "dev", - }, +func Test_composeFromTelemetryFiltering(t *testing.T) { + t.Parallel() + + now := time.Now() + + tests := []struct { + name string + fromOp Op + opts []Option + wantObservation *observation + wantErrIs error + wantErrContains string + wantFilteredRequest *Request + wantFilteredResponse *Response + }{ + { + name: "with-request-no-telemetry", + fromOp: Op("with-request-no-telemetry"), + opts: []Option{ + WithId("with-request-no-telemetry"), + WithRequestInfo(TestRequestInfo(t)), + WithFlush(), + WithRequest(&Request{ + Operation: "op", + Endpoint: "/worker-status/", + Details: testWorkerStatus(t), + }), + }, + wantObservation: &observation{ + ID: "with-request-no-telemetry", + Flush: true, + Version: errorVersion, + Op: Op("with-request-no-telemetry"), + RequestInfo: TestRequestInfo(t), + }, + }, + { + name: "with-response-no-telemetry", + fromOp: Op("with-response-no-telemetry"), + opts: []Option{ + WithId("with-response-no-telemetry"), + WithRequestInfo(TestRequestInfo(t)), + WithFlush(), + WithResponse(&Response{ + StatusCode: 200, + Details: testWorkerStatus(t), + }), + }, + wantObservation: &observation{ + ID: "with-response-no-telemetry", + Flush: true, + Version: errorVersion, + Op: Op("with-response-no-telemetry"), + RequestInfo: TestRequestInfo(t), + }, + }, + { + name: "with-request-with-telemetry", + fromOp: Op("with-request-with-telemetry"), + opts: []Option{ + WithId("with-request-with-telemetry"), + WithRequestInfo(TestRequestInfo(t)), + WithFlush(), + WithRequest(&Request{ + Operation: "op", + Endpoint: "/worker-status/", + Details: testWorkerStatus(t), + }), + WithTelemetry(), + }, + wantObservation: &observation{ + ID: "with-request-with-telemetry", + Flush: true, + Version: errorVersion, + Op: Op("with-request-with-telemetry"), + RequestInfo: TestRequestInfo(t), + Request: &Request{ + Operation: "op", + Endpoint: "/worker-status/", + Details: testWorkerStatus(t), }, - KeyId: "ovary-valid-curler-scrambled-glutinous-alias-rework-debit", - ReleaseVersion: "Boundary v0.13.1", - OperationalState: "active", }, - ConnectedWorkerKeyIdentifiers: nil, - ConnectedUnmappedWorkerKeyIdentifiers: nil, - ConnectedWorkerPublicIds: nil, + wantFilteredRequest: &Request{ + Operation: "op", + Endpoint: "/worker-status/", + Details: testWorkerStatusObservable(t), + }, }, - DetailsUpstreamMessage: nil, - } - - res := recurseStructureWithTagFilter( - input.Details, - map[string]string{ - "eventstream": "observation", + { + name: "with-response-with-telemetry", + fromOp: Op("with-response-with-telemetry"), + opts: []Option{ + WithId("with-response-with-telemetry"), + WithRequestInfo(TestRequestInfo(t)), + WithFlush(), + WithResponse(&Response{ + StatusCode: 200, + Details: testWorkerStatus(t), + }), + WithTelemetry(), + }, + wantObservation: &observation{ + ID: "with-response-with-telemetry", + Flush: true, + Version: errorVersion, + Op: Op("with-response-with-telemetry"), + RequestInfo: TestRequestInfo(t), + Response: &Response{ + StatusCode: 200, + Details: testWorkerStatus(t), + }, + }, + wantFilteredResponse: &Response{ + StatusCode: 200, + Details: testWorkerStatusObservable(t), + }, }, - false, - ) - data, _ := json.Marshal(res) - assert.NotNil(data) + { + name: "nil-request-details-with-telemetry", + fromOp: Op("nil-request-details-with-telemetry"), + opts: []Option{ + WithId("nil-request-details-with-telemetry"), + WithRequestInfo(TestRequestInfo(t)), + WithFlush(), + WithRequest(&Request{ + Operation: "op", + Endpoint: "/worker-status/", + Details: nil, + }), + WithTelemetry(), + }, + wantObservation: &observation{ + ID: "nil-request-details-with-telemetry", + Flush: true, + Version: errorVersion, + Op: Op("nil-request-details-with-telemetry"), + RequestInfo: TestRequestInfo(t), + Request: &Request{ + Operation: "op", + Endpoint: "/worker-status/", + Details: nil, + }, + }, + wantFilteredRequest: &Request{ + Operation: "op", + Endpoint: "/worker-status/", + Details: nil, + }, + }, + { + name: "nil-response-details-with-telemetry", + fromOp: Op("nil-response-details-with-telemetry"), + opts: []Option{ + WithId("nil-response-details-with-telemetry"), + WithRequestInfo(TestRequestInfo(t)), + WithFlush(), + WithResponse(&Response{ + StatusCode: 200, + Details: nil, + }), + WithTelemetry(), + }, + wantObservation: &observation{ + ID: "nil-response-details-with-telemetry", + Flush: true, + Version: errorVersion, + Op: Op("nil-response-details-with-telemetry"), + RequestInfo: TestRequestInfo(t), + Response: &Response{ + StatusCode: 200, + Details: nil, + }, + }, + wantFilteredResponse: &Response{ + StatusCode: 200, + Details: nil, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert, require := assert.New(t), require.New(t) + got, err := newObservation(tt.fromOp, tt.opts...) + require.NoError(err) + require.NotNil(got) + opts := getOpts(tt.opts...) + if opts.withId == "" { + tt.wantObservation.ID = got.ID + } + assert.Equal(tt.wantObservation, got) + // feed the event through ComposeFrom which will filter any request/response data + _, ev, err := got.ComposeFrom( + []*eventlogger.Event{ + { + Type: "observation", + CreatedAt: now, + Payload: got, + }, + }, + ) + if tt.wantErrIs != nil { + require.Error(err) + assert.Nil(got) + assert.ErrorIs(err, tt.wantErrIs) + if tt.wantErrContains != "" { + assert.Contains(err.Error(), tt.wantErrContains) + } + return + } + payload, ok := ev.(map[string]any) + assert.True(ok) + assert.NotNil(payload) + if tt.wantFilteredRequest != nil { + req, ok := payload["request"] + assert.True(ok) + assert.NotNil(req) + pmsg, ok := req.(*Request) + assert.True(ok) + assert.NotNil(pmsg) + assert.True(proto.Equal(tt.wantFilteredRequest.Details, pmsg.Details)) + } + if tt.wantFilteredResponse != nil { + req, ok := payload["response"] + assert.True(ok) + assert.NotNil(req) + pmsg, ok := req.(*Response) + assert.True(ok) + assert.NotNil(pmsg) + assert.True(proto.Equal(tt.wantFilteredResponse.Details, pmsg.Details)) + } + }) + } } diff --git a/internal/event/options.go b/internal/event/options.go index 33e79ba456..fed89f333d 100644 --- a/internal/event/options.go +++ b/internal/event/options.go @@ -48,6 +48,7 @@ type options struct { withFilterOperations AuditFilterOperations withGating bool withNoGateLocking bool + withTelemetry bool // These options are related to the hclog adapter withHclogLevel hclog.Level @@ -228,3 +229,10 @@ func WithNoGateLocking(with bool) Option { o.withNoGateLocking = with } } + +// WithTelemetry allows an optional telemetry option. +func WithTelemetry() Option { + return func(o *options) { + o.withTelemetry = true + } +} diff --git a/internal/event/options_test.go b/internal/event/options_test.go index 34dbd5153b..80bd482bc8 100644 --- a/internal/event/options_test.go +++ b/internal/event/options_test.go @@ -196,6 +196,13 @@ func Test_GetOpts(t *testing.T) { opts := getOpts(WithNoGateLocking(true)) assert.True(opts.withNoGateLocking) }) + t.Run("WithTelemetry", func(t *testing.T) { + assert := assert.New(t) + opts := getOpts(WithTelemetry()) + testOpts := getDefaultOptions() + testOpts.withTelemetry = true + assert.Equal(opts, testOpts) + }) } // testWrapper initializes an AEAD wrapping.Wrapper for testing. Note: this diff --git a/internal/event/telemetry_filter.go b/internal/event/telemetry_filter.go new file mode 100644 index 0000000000..45e2cbd5cc --- /dev/null +++ b/internal/event/telemetry_filter.go @@ -0,0 +1,119 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package event + +import ( + "errors" + "reflect" + + "google.golang.org/protobuf/proto" +) + +// protoFilter is a signature for a struct field validation test +type protoFilter func(field reflect.StructField) bool + +// telemetryFilter checks a struct field should be included in observation telemetry data +func telemetryFilter(field reflect.StructField) bool { + if field.Tag.Get("eventstream") == "observation" { + // log.Printf("%s is allowed by Tags (%s:\"%s\")", name, tag, tagValue) + return true + } + return false +} + +// filterValue will preserve or zero a value based on if it is classed as observable +func filterValue(fv reflect.Value, isObservable bool) { + if isObservable { + return // let data persist to telemetry + } + + // check for nil value (prevent panics) + if fv == reflect.ValueOf(nil) { + return + } + + if fv.Kind() == reflect.Ptr { + fv = fv.Elem() + } + + // check to see if it's an exported struct field + if !fv.CanSet() { + return + } + + fv.SetZero() + + return +} + +func recurseStructureWithProtoFilter(value reflect.Value, filterFunc protoFilter, isObservable bool) error { + kind := value.Kind() + + switch kind { + case reflect.Interface, reflect.Ptr: + value = value.Elem() + return recurseStructureWithProtoFilter(value, filterFunc, isObservable) + case reflect.Map: + m := reflect.ValueOf(value) + for _, k := range m.MapKeys() { + mVal := m.MapIndex(k) + if err := recurseStructureWithProtoFilter(mVal, filterFunc, isObservable); err != nil { + return err + } + } + return nil + case reflect.Array, reflect.Slice: + if isObservable { + for i := 0; i < value.Len(); i++ { + sVal := value.Index(i) + if err := recurseStructureWithProtoFilter(sVal, filterFunc, isObservable); err != nil { + return err + } + } + } else { + if kind == reflect.Slice { + value.SetLen(0) // truncate + } else { + // fixed size, so we zero + for i := 0; i < value.Len(); i++ { + value.Index(i).SetZero() + } + } + } + case reflect.Struct: + fields := value.Type() + num := fields.NumField() + for i := 0; i < num; i++ { + field := fields.Field(i) + v := value.Field(i) + if !field.IsExported() { + continue + } + isObservable := true + if filterFunc != nil { + isObservable = filterFunc(field) + } + if err := recurseStructureWithProtoFilter(v, filterFunc, isObservable); err != nil { + return err + } + } + return nil + default: + // any other non structured type, we will output or not via filterValue + filterValue(value, isObservable) + return nil + } + + return nil +} + +func filterProtoMessage(msg proto.Message, filterFunc protoFilter) (proto.Message, error) { + if msg == nil { + return nil, errors.New("nil message") + } + + cloneMsg := proto.Clone(msg) + err := recurseStructureWithProtoFilter(reflect.ValueOf(cloneMsg), filterFunc, false) + return cloneMsg, err +} diff --git a/internal/event/telemetry_filter_test.go b/internal/event/telemetry_filter_test.go new file mode 100644 index 0000000000..f31e2423ba --- /dev/null +++ b/internal/event/telemetry_filter_test.go @@ -0,0 +1,105 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package event + +import ( + "testing" + + "github.com/hashicorp/boundary/internal/gen/controller/servers" + "github.com/hashicorp/boundary/internal/gen/controller/servers/services" + tassert "github.com/stretchr/testify/assert" +) + +func Test_OnlyObservationTaggedFieldsPopulated(t *testing.T) { + assert := tassert.New(t) + + input := &services.StatusRequest{ + Jobs: []*services.JobStatus{ + {Job: &services.Job{ + Type: 1, + JobInfo: nil, + }}, + }, + UpdateTags: false, + WorkerStatus: &servers.ServerWorkerStatus{ + PublicId: "testID", + Name: "w_1234567890", + Description: "A default worker created in", + Address: "127.0.0.1:9202", + Tags: []*servers.TagPair{ + { + Key: "type", + Value: "dev", + }, + }, + KeyId: "ovary-valid-curler-scrambled-glutinous-alias-rework-debit", + ReleaseVersion: "Boundary v0.13.1", + OperationalState: "active", + }, + } + + filtered, err := filterProtoMessage(input, telemetryFilter) + assert.NoError(err) + + output, ok := filtered.(*services.StatusRequest) + assert.True(ok) + + // expected content + assert.NotNil(output.WorkerStatus) + assert.Equal(input.WorkerStatus.PublicId, output.WorkerStatus.PublicId) + assert.Equal(input.WorkerStatus.ReleaseVersion, output.WorkerStatus.ReleaseVersion) + assert.Equal(input.WorkerStatus.OperationalState, output.WorkerStatus.OperationalState) + + // non expected content + assert.Zero(output.WorkerStatus.Name) + assert.Zero(output.WorkerStatus.Address) + assert.Zero(output.WorkerStatus.Description) + assert.Zero(output.WorkerStatus.KeyId) + assert.Len(output.WorkerStatus.Tags, 0) + assert.Len(output.Jobs, 0) +} + +func Test_AllFieldsPopulatedWithoutFilter(t *testing.T) { + assert := tassert.New(t) + input := &services.StatusRequest{ + Jobs: []*services.JobStatus{ + {Job: &services.Job{ + Type: 1, + JobInfo: nil, + }}, + }, + UpdateTags: false, + WorkerStatus: &servers.ServerWorkerStatus{ + PublicId: "testID", + Name: "w_1234567890", + Description: "A default worker created in", + Address: "127.0.0.1:9202", + Tags: []*servers.TagPair{ + { + Key: "type", + Value: "dev", + }, + }, + KeyId: "ovary-valid-curler-scrambled-glutinous-alias-rework-debit", + ReleaseVersion: "Boundary v0.13.1", + OperationalState: "active", + }, + } + + filtered, err := filterProtoMessage(input, nil) + assert.NoError(err) + + output, ok := filtered.(*services.StatusRequest) + assert.True(ok) + + // expected content + assert.Equal(input, output) +} + +func Test_NilMessageWillError(t *testing.T) { + assert := tassert.New(t) + + _, err := filterProtoMessage(nil, nil) + assert.Error(err) +} diff --git a/internal/event/testing.go b/internal/event/testing.go index bf4158a06b..8ad5bb8130 100644 --- a/internal/event/testing.go +++ b/internal/event/testing.go @@ -13,6 +13,8 @@ import ( "time" pbs "github.com/hashicorp/boundary/internal/gen/controller/api/services" + "github.com/hashicorp/boundary/internal/gen/controller/servers" + "github.com/hashicorp/boundary/internal/gen/controller/servers/services" "github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/groups" "github.com/hashicorp/eventlogger" "github.com/hashicorp/eventlogger/formatter_filters/cloudevents" @@ -381,3 +383,44 @@ func testLogger(t *testing.T, testLock hclog.Locker) hclog.Logger { JSONFormat: true, }) } + +func testWorkerStatus(t testing.TB) *services.StatusRequest { + t.Helper() + return &services.StatusRequest{ + Jobs: []*services.JobStatus{ + {Job: &services.Job{ + Type: 1, + JobInfo: nil, + }}, + }, + UpdateTags: false, + WorkerStatus: &servers.ServerWorkerStatus{ + PublicId: "testID", + Name: "w_1234567890", + Description: "A default worker created in", + Address: "127.0.0.1:9202", + Tags: []*servers.TagPair{ + { + Key: "type", + Value: "dev", + }, + }, + KeyId: "ovary-valid-curler-scrambled-glutinous-alias-rework-debit", + ReleaseVersion: "Boundary v0.13.1", + OperationalState: "active", + }, + } +} + +func testWorkerStatusObservable(t testing.TB) *services.StatusRequest { + t.Helper() + return &services.StatusRequest{ + Jobs: []*services.JobStatus{}, + WorkerStatus: &servers.ServerWorkerStatus{ + PublicId: "testID", + Tags: []*servers.TagPair{}, + ReleaseVersion: "Boundary v0.13.1", + OperationalState: "active", + }, + } +}