observation(event): refine telemetry filter, and WithTelemetry()

pull/3753/head
April Ayres 3 years ago committed by Jim
parent fcb29ed919
commit 49a1563f6a
No known key found for this signature in database

@ -4,9 +4,7 @@
package event
import (
"encoding/json"
"fmt"
"reflect"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/eventlogger/filters/gated"
@ -53,10 +51,14 @@ func newObservation(fromOperation Op, opt ...Option) (*observation, error) {
Flush: opts.withFlush,
Op: fromOperation,
RequestInfo: opts.withRequestInfo,
Request: opts.withRequest,
Response: opts.withResponse,
Version: observationVersion,
}
if opts.withTelemetry {
i.Request = opts.withRequest
i.Response = opts.withResponse
}
if err := i.validate(); err != nil {
return nil, fmt.Errorf("%s: %w", op, err)
}
@ -117,16 +119,11 @@ func (o *observation) ComposeFrom(events []*eventlogger.Event) (eventlogger.Even
}
}
if g.Request.Details != nil {
res := recurseStructureWithTagFilter(
g.Request.Details,
map[string]string{
"eventstream": "observation",
},
false,
)
data, _ := json.Marshal(res)
fmt.Printf("RESULT request = %s", string(data))
msgReq.Details = g.Request.Details
filteredRequest, err := filterProtoMessage(g.Request.Details, telemetryFilter)
if err != nil {
continue
}
msgReq.Details = filteredRequest
}
if g.Request.Operation != "" {
msgReq.Operation = g.Request.Operation
@ -151,17 +148,11 @@ func (o *observation) ComposeFrom(events []*eventlogger.Event) (eventlogger.Even
msgRes.StatusCode = g.Response.StatusCode
}
if g.Response.Details != nil {
res := recurseStructureWithTagFilter(
g.Response.Details,
map[string]string{
"eventstream": "observation",
},
false,
)
data, _ := json.Marshal(res)
fmt.Printf("RESULT response = %s \n", string(data))
msgRes.Details = g.Response.Details
filteredResponse, err := filterProtoMessage(g.Response.Details, telemetryFilter)
if err != nil {
continue
}
msgRes.Details = filteredResponse
}
if g.Response.DetailsUpstreamMessage != nil {
msgRes.DetailsUpstreamMessage = g.Response.DetailsUpstreamMessage
@ -184,92 +175,3 @@ func (o *observation) GetID() string {
func (o *observation) FlushEvent() bool {
return o.Flush
}
func recurseStructureWithTagFilter(value any, allowedStructTags map[string]string, allowLevel bool) any {
if allowedStructTags == nil {
allowedStructTags = map[string]string{}
}
kind := reflect.ValueOf(value).Kind()
switch kind {
case reflect.Interface, reflect.Ptr:
value = reflect.ValueOf(value).Elem().Interface()
return recurseStructureWithTagFilter(value, allowedStructTags, allowLevel)
case reflect.Map:
out := map[string]any{}
m := reflect.ValueOf(value)
for _, k := range m.MapKeys() {
mVal := m.MapIndex(k).Interface()
if vv := recurseStructureWithTagFilter(mVal, allowedStructTags, allowLevel); vv != nil {
if strKey, ok := k.Interface().(string); ok {
out[strKey] = vv
} else {
out[fmt.Sprintf("%v", k)] = vv
}
}
}
if allowLevel {
return out
}
return nil
case reflect.Array, reflect.Slice:
out := []any{}
s := reflect.ValueOf(value)
for i := 0; i < s.Len(); i++ {
sVal := s.Index(i).Interface()
if vv := recurseStructureWithTagFilter(sVal, allowedStructTags, allowLevel); vv != nil {
out = append(out, vv)
}
}
if allowLevel {
return out
}
return nil
case reflect.Struct:
// traverse the struct recursively, and apply any tag filters
fields := reflect.TypeOf(value)
values := reflect.ValueOf(value)
num := fields.NumField()
out := map[string]any{}
for i := 0; i < num; i++ {
field := fields.Field(i)
name := field.Name
allowLevel := false
if len(allowedStructTags) == 0 {
allowLevel = true
} else {
for tag, tagValue := range allowedStructTags {
if field.Tag.Get(tag) == tagValue {
// log.Printf("%s is allowed by Tags (%s:\"%s\")", name, tag, tagValue)
allowLevel = true
break
}
}
}
if !field.IsExported() {
continue
}
if values.Field(i).IsZero() {
if allowLevel {
out[name] = ""
}
continue
}
if !values.Field(i).IsValid() {
continue
}
value := values.Field(i).Interface()
if vv := recurseStructureWithTagFilter(value, allowedStructTags, allowLevel); vv != nil {
out[name] = vv
}
}
return out
default:
// any other non structured type, we will just barf out as is
if allowLevel {
return value
}
return nil
}
}

@ -4,14 +4,13 @@
package event
import (
"encoding/json"
"testing"
"time"
"github.com/hashicorp/boundary/internal/gen/controller/servers"
"github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/hashicorp/eventlogger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)
func Test_newObservation(t *testing.T) {
@ -143,48 +142,236 @@ func Test_observationEventType(t *testing.T) {
assert.Equal(t, string(ObservationType), e.EventType())
}
func Test_iterateProto(t *testing.T) {
assert, _ := assert.New(t), require.New(t)
input := Request{
Operation: "",
Endpoint: "",
Details: &services.StatusRequest{
Jobs: []*services.JobStatus{
{Job: &services.Job{
Type: 1,
JobInfo: nil,
}},
},
UpdateTags: false,
WorkerStatus: &servers.ServerWorkerStatus{
PublicId: "testID",
Name: "w_1234567890",
Description: "A default worker created in",
Address: "127.0.0.1:9202",
Tags: []*servers.TagPair{
{
Key: "type",
Value: "dev",
},
func Test_composeFromTelemetryFiltering(t *testing.T) {
t.Parallel()
now := time.Now()
tests := []struct {
name string
fromOp Op
opts []Option
wantObservation *observation
wantErrIs error
wantErrContains string
wantFilteredRequest *Request
wantFilteredResponse *Response
}{
{
name: "with-request-no-telemetry",
fromOp: Op("with-request-no-telemetry"),
opts: []Option{
WithId("with-request-no-telemetry"),
WithRequestInfo(TestRequestInfo(t)),
WithFlush(),
WithRequest(&Request{
Operation: "op",
Endpoint: "/worker-status/<id>",
Details: testWorkerStatus(t),
}),
},
wantObservation: &observation{
ID: "with-request-no-telemetry",
Flush: true,
Version: errorVersion,
Op: Op("with-request-no-telemetry"),
RequestInfo: TestRequestInfo(t),
},
},
{
name: "with-response-no-telemetry",
fromOp: Op("with-response-no-telemetry"),
opts: []Option{
WithId("with-response-no-telemetry"),
WithRequestInfo(TestRequestInfo(t)),
WithFlush(),
WithResponse(&Response{
StatusCode: 200,
Details: testWorkerStatus(t),
}),
},
wantObservation: &observation{
ID: "with-response-no-telemetry",
Flush: true,
Version: errorVersion,
Op: Op("with-response-no-telemetry"),
RequestInfo: TestRequestInfo(t),
},
},
{
name: "with-request-with-telemetry",
fromOp: Op("with-request-with-telemetry"),
opts: []Option{
WithId("with-request-with-telemetry"),
WithRequestInfo(TestRequestInfo(t)),
WithFlush(),
WithRequest(&Request{
Operation: "op",
Endpoint: "/worker-status/<id>",
Details: testWorkerStatus(t),
}),
WithTelemetry(),
},
wantObservation: &observation{
ID: "with-request-with-telemetry",
Flush: true,
Version: errorVersion,
Op: Op("with-request-with-telemetry"),
RequestInfo: TestRequestInfo(t),
Request: &Request{
Operation: "op",
Endpoint: "/worker-status/<id>",
Details: testWorkerStatus(t),
},
KeyId: "ovary-valid-curler-scrambled-glutinous-alias-rework-debit",
ReleaseVersion: "Boundary v0.13.1",
OperationalState: "active",
},
ConnectedWorkerKeyIdentifiers: nil,
ConnectedUnmappedWorkerKeyIdentifiers: nil,
ConnectedWorkerPublicIds: nil,
wantFilteredRequest: &Request{
Operation: "op",
Endpoint: "/worker-status/<id>",
Details: testWorkerStatusObservable(t),
},
},
DetailsUpstreamMessage: nil,
}
res := recurseStructureWithTagFilter(
input.Details,
map[string]string{
"eventstream": "observation",
{
name: "with-response-with-telemetry",
fromOp: Op("with-response-with-telemetry"),
opts: []Option{
WithId("with-response-with-telemetry"),
WithRequestInfo(TestRequestInfo(t)),
WithFlush(),
WithResponse(&Response{
StatusCode: 200,
Details: testWorkerStatus(t),
}),
WithTelemetry(),
},
wantObservation: &observation{
ID: "with-response-with-telemetry",
Flush: true,
Version: errorVersion,
Op: Op("with-response-with-telemetry"),
RequestInfo: TestRequestInfo(t),
Response: &Response{
StatusCode: 200,
Details: testWorkerStatus(t),
},
},
wantFilteredResponse: &Response{
StatusCode: 200,
Details: testWorkerStatusObservable(t),
},
},
false,
)
data, _ := json.Marshal(res)
assert.NotNil(data)
{
name: "nil-request-details-with-telemetry",
fromOp: Op("nil-request-details-with-telemetry"),
opts: []Option{
WithId("nil-request-details-with-telemetry"),
WithRequestInfo(TestRequestInfo(t)),
WithFlush(),
WithRequest(&Request{
Operation: "op",
Endpoint: "/worker-status/<id>",
Details: nil,
}),
WithTelemetry(),
},
wantObservation: &observation{
ID: "nil-request-details-with-telemetry",
Flush: true,
Version: errorVersion,
Op: Op("nil-request-details-with-telemetry"),
RequestInfo: TestRequestInfo(t),
Request: &Request{
Operation: "op",
Endpoint: "/worker-status/<id>",
Details: nil,
},
},
wantFilteredRequest: &Request{
Operation: "op",
Endpoint: "/worker-status/<id>",
Details: nil,
},
},
{
name: "nil-response-details-with-telemetry",
fromOp: Op("nil-response-details-with-telemetry"),
opts: []Option{
WithId("nil-response-details-with-telemetry"),
WithRequestInfo(TestRequestInfo(t)),
WithFlush(),
WithResponse(&Response{
StatusCode: 200,
Details: nil,
}),
WithTelemetry(),
},
wantObservation: &observation{
ID: "nil-response-details-with-telemetry",
Flush: true,
Version: errorVersion,
Op: Op("nil-response-details-with-telemetry"),
RequestInfo: TestRequestInfo(t),
Response: &Response{
StatusCode: 200,
Details: nil,
},
},
wantFilteredResponse: &Response{
StatusCode: 200,
Details: nil,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
got, err := newObservation(tt.fromOp, tt.opts...)
require.NoError(err)
require.NotNil(got)
opts := getOpts(tt.opts...)
if opts.withId == "" {
tt.wantObservation.ID = got.ID
}
assert.Equal(tt.wantObservation, got)
// feed the event through ComposeFrom which will filter any request/response data
_, ev, err := got.ComposeFrom(
[]*eventlogger.Event{
{
Type: "observation",
CreatedAt: now,
Payload: got,
},
},
)
if tt.wantErrIs != nil {
require.Error(err)
assert.Nil(got)
assert.ErrorIs(err, tt.wantErrIs)
if tt.wantErrContains != "" {
assert.Contains(err.Error(), tt.wantErrContains)
}
return
}
payload, ok := ev.(map[string]any)
assert.True(ok)
assert.NotNil(payload)
if tt.wantFilteredRequest != nil {
req, ok := payload["request"]
assert.True(ok)
assert.NotNil(req)
pmsg, ok := req.(*Request)
assert.True(ok)
assert.NotNil(pmsg)
assert.True(proto.Equal(tt.wantFilteredRequest.Details, pmsg.Details))
}
if tt.wantFilteredResponse != nil {
req, ok := payload["response"]
assert.True(ok)
assert.NotNil(req)
pmsg, ok := req.(*Response)
assert.True(ok)
assert.NotNil(pmsg)
assert.True(proto.Equal(tt.wantFilteredResponse.Details, pmsg.Details))
}
})
}
}

@ -48,6 +48,7 @@ type options struct {
withFilterOperations AuditFilterOperations
withGating bool
withNoGateLocking bool
withTelemetry bool
// These options are related to the hclog adapter
withHclogLevel hclog.Level
@ -228,3 +229,10 @@ func WithNoGateLocking(with bool) Option {
o.withNoGateLocking = with
}
}
// WithTelemetry allows an optional telemetry option.
func WithTelemetry() Option {
return func(o *options) {
o.withTelemetry = true
}
}

@ -196,6 +196,13 @@ func Test_GetOpts(t *testing.T) {
opts := getOpts(WithNoGateLocking(true))
assert.True(opts.withNoGateLocking)
})
t.Run("WithTelemetry", func(t *testing.T) {
assert := assert.New(t)
opts := getOpts(WithTelemetry())
testOpts := getDefaultOptions()
testOpts.withTelemetry = true
assert.Equal(opts, testOpts)
})
}
// testWrapper initializes an AEAD wrapping.Wrapper for testing. Note: this

@ -0,0 +1,119 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package event
import (
"errors"
"reflect"
"google.golang.org/protobuf/proto"
)
// protoFilter is a signature for a struct field validation test
type protoFilter func(field reflect.StructField) bool
// telemetryFilter checks a struct field should be included in observation telemetry data
func telemetryFilter(field reflect.StructField) bool {
if field.Tag.Get("eventstream") == "observation" {
// log.Printf("%s is allowed by Tags (%s:\"%s\")", name, tag, tagValue)
return true
}
return false
}
// filterValue will preserve or zero a value based on if it is classed as observable
func filterValue(fv reflect.Value, isObservable bool) {
if isObservable {
return // let data persist to telemetry
}
// check for nil value (prevent panics)
if fv == reflect.ValueOf(nil) {
return
}
if fv.Kind() == reflect.Ptr {
fv = fv.Elem()
}
// check to see if it's an exported struct field
if !fv.CanSet() {
return
}
fv.SetZero()
return
}
func recurseStructureWithProtoFilter(value reflect.Value, filterFunc protoFilter, isObservable bool) error {
kind := value.Kind()
switch kind {
case reflect.Interface, reflect.Ptr:
value = value.Elem()
return recurseStructureWithProtoFilter(value, filterFunc, isObservable)
case reflect.Map:
m := reflect.ValueOf(value)
for _, k := range m.MapKeys() {
mVal := m.MapIndex(k)
if err := recurseStructureWithProtoFilter(mVal, filterFunc, isObservable); err != nil {
return err
}
}
return nil
case reflect.Array, reflect.Slice:
if isObservable {
for i := 0; i < value.Len(); i++ {
sVal := value.Index(i)
if err := recurseStructureWithProtoFilter(sVal, filterFunc, isObservable); err != nil {
return err
}
}
} else {
if kind == reflect.Slice {
value.SetLen(0) // truncate
} else {
// fixed size, so we zero
for i := 0; i < value.Len(); i++ {
value.Index(i).SetZero()
}
}
}
case reflect.Struct:
fields := value.Type()
num := fields.NumField()
for i := 0; i < num; i++ {
field := fields.Field(i)
v := value.Field(i)
if !field.IsExported() {
continue
}
isObservable := true
if filterFunc != nil {
isObservable = filterFunc(field)
}
if err := recurseStructureWithProtoFilter(v, filterFunc, isObservable); err != nil {
return err
}
}
return nil
default:
// any other non structured type, we will output or not via filterValue
filterValue(value, isObservable)
return nil
}
return nil
}
func filterProtoMessage(msg proto.Message, filterFunc protoFilter) (proto.Message, error) {
if msg == nil {
return nil, errors.New("nil message")
}
cloneMsg := proto.Clone(msg)
err := recurseStructureWithProtoFilter(reflect.ValueOf(cloneMsg), filterFunc, false)
return cloneMsg, err
}

@ -0,0 +1,105 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package event
import (
"testing"
"github.com/hashicorp/boundary/internal/gen/controller/servers"
"github.com/hashicorp/boundary/internal/gen/controller/servers/services"
tassert "github.com/stretchr/testify/assert"
)
func Test_OnlyObservationTaggedFieldsPopulated(t *testing.T) {
assert := tassert.New(t)
input := &services.StatusRequest{
Jobs: []*services.JobStatus{
{Job: &services.Job{
Type: 1,
JobInfo: nil,
}},
},
UpdateTags: false,
WorkerStatus: &servers.ServerWorkerStatus{
PublicId: "testID",
Name: "w_1234567890",
Description: "A default worker created in",
Address: "127.0.0.1:9202",
Tags: []*servers.TagPair{
{
Key: "type",
Value: "dev",
},
},
KeyId: "ovary-valid-curler-scrambled-glutinous-alias-rework-debit",
ReleaseVersion: "Boundary v0.13.1",
OperationalState: "active",
},
}
filtered, err := filterProtoMessage(input, telemetryFilter)
assert.NoError(err)
output, ok := filtered.(*services.StatusRequest)
assert.True(ok)
// expected content
assert.NotNil(output.WorkerStatus)
assert.Equal(input.WorkerStatus.PublicId, output.WorkerStatus.PublicId)
assert.Equal(input.WorkerStatus.ReleaseVersion, output.WorkerStatus.ReleaseVersion)
assert.Equal(input.WorkerStatus.OperationalState, output.WorkerStatus.OperationalState)
// non expected content
assert.Zero(output.WorkerStatus.Name)
assert.Zero(output.WorkerStatus.Address)
assert.Zero(output.WorkerStatus.Description)
assert.Zero(output.WorkerStatus.KeyId)
assert.Len(output.WorkerStatus.Tags, 0)
assert.Len(output.Jobs, 0)
}
func Test_AllFieldsPopulatedWithoutFilter(t *testing.T) {
assert := tassert.New(t)
input := &services.StatusRequest{
Jobs: []*services.JobStatus{
{Job: &services.Job{
Type: 1,
JobInfo: nil,
}},
},
UpdateTags: false,
WorkerStatus: &servers.ServerWorkerStatus{
PublicId: "testID",
Name: "w_1234567890",
Description: "A default worker created in",
Address: "127.0.0.1:9202",
Tags: []*servers.TagPair{
{
Key: "type",
Value: "dev",
},
},
KeyId: "ovary-valid-curler-scrambled-glutinous-alias-rework-debit",
ReleaseVersion: "Boundary v0.13.1",
OperationalState: "active",
},
}
filtered, err := filterProtoMessage(input, nil)
assert.NoError(err)
output, ok := filtered.(*services.StatusRequest)
assert.True(ok)
// expected content
assert.Equal(input, output)
}
func Test_NilMessageWillError(t *testing.T) {
assert := tassert.New(t)
_, err := filterProtoMessage(nil, nil)
assert.Error(err)
}

@ -13,6 +13,8 @@ import (
"time"
pbs "github.com/hashicorp/boundary/internal/gen/controller/api/services"
"github.com/hashicorp/boundary/internal/gen/controller/servers"
"github.com/hashicorp/boundary/internal/gen/controller/servers/services"
"github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/groups"
"github.com/hashicorp/eventlogger"
"github.com/hashicorp/eventlogger/formatter_filters/cloudevents"
@ -381,3 +383,44 @@ func testLogger(t *testing.T, testLock hclog.Locker) hclog.Logger {
JSONFormat: true,
})
}
func testWorkerStatus(t testing.TB) *services.StatusRequest {
t.Helper()
return &services.StatusRequest{
Jobs: []*services.JobStatus{
{Job: &services.Job{
Type: 1,
JobInfo: nil,
}},
},
UpdateTags: false,
WorkerStatus: &servers.ServerWorkerStatus{
PublicId: "testID",
Name: "w_1234567890",
Description: "A default worker created in",
Address: "127.0.0.1:9202",
Tags: []*servers.TagPair{
{
Key: "type",
Value: "dev",
},
},
KeyId: "ovary-valid-curler-scrambled-glutinous-alias-rework-debit",
ReleaseVersion: "Boundary v0.13.1",
OperationalState: "active",
},
}
}
func testWorkerStatusObservable(t testing.TB) *services.StatusRequest {
t.Helper()
return &services.StatusRequest{
Jobs: []*services.JobStatus{},
WorkerStatus: &servers.ServerWorkerStatus{
PublicId: "testID",
Tags: []*servers.TagPair{},
ReleaseVersion: "Boundary v0.13.1",
OperationalState: "active",
},
}
}

Loading…
Cancel
Save