diff --git a/internal/event/event_observation.go b/internal/event/event_observation.go index 796541605e..86368272a3 100644 --- a/internal/event/event_observation.go +++ b/internal/event/event_observation.go @@ -4,7 +4,9 @@ package event import ( + "encoding/json" "fmt" + "reflect" "github.com/hashicorp/eventlogger" "github.com/hashicorp/eventlogger/filters/gated" @@ -22,6 +24,8 @@ type observation struct { Flush bool `json:"-"` Header map[string]any `json:"header,omitempty"` Detail map[string]any `json:"detail,omitempty"` + Request *Request `json:"request,omitempty"` + Response *Response `json:"response,omitempty"` } func newObservation(fromOperation Op, opt ...Option) (*observation, error) { @@ -49,6 +53,8 @@ func newObservation(fromOperation Op, opt ...Option) (*observation, error) { Flush: opts.withFlush, Op: fromOperation, RequestInfo: opts.withRequestInfo, + Request: opts.withRequest, + Response: opts.withResponse, Version: observationVersion, } if err := i.validate(); err != nil { @@ -102,6 +108,66 @@ func (o *observation) ComposeFrom(events []*eventlogger.Event) (eventlogger.Even Payload: g.Detail, }) } + if g.Request != nil { + msgReq := &Request{} + if v, ok := payload["request"]; ok { + msgReq, ok = v.(*Request) + if !ok { + return "", nil, fmt.Errorf("%s: request %d is not an observation request: %w", op, i, eventlogger.ErrInvalidParameter) + } + } + if g.Request.Details != nil { + res := recurseStructureWithTagFilter( + g.Request.Details, + map[string]string{ + "eventstream": "observation", + }, + false, + ) + data, _ := json.Marshal(res) + fmt.Printf("RESULT request = %s", string(data)) + msgReq.Details = g.Request.Details + } + if g.Request.Operation != "" { + msgReq.Operation = g.Request.Operation + } + if g.Request.Endpoint != "" { + msgReq.Endpoint = g.Request.Endpoint + } + if g.Request.DetailsUpstreamMessage != nil { + msgReq.DetailsUpstreamMessage = g.Request.DetailsUpstreamMessage + } + payload["request"] = msgReq + } + if g.Response != nil { + msgRes := &Response{} + if v, ok := payload["response"]; ok { + msgRes, ok = v.(*Response) + if !ok { + return "", nil, fmt.Errorf("%s: response %d is not an observation response: %w", op, i, eventlogger.ErrInvalidParameter) + } + } + if g.Response.StatusCode != 0 { + msgRes.StatusCode = g.Response.StatusCode + } + if g.Response.Details != nil { + res := recurseStructureWithTagFilter( + g.Response.Details, + map[string]string{ + "eventstream": "observation", + }, + false, + ) + data, _ := json.Marshal(res) + fmt.Printf("RESULT response = %s \n", string(data)) + + msgRes.Details = g.Response.Details + } + if g.Response.DetailsUpstreamMessage != nil { + msgRes.DetailsUpstreamMessage = g.Response.DetailsUpstreamMessage + } + payload["response"] = msgRes + } } return events[0].Type, payload, nil } @@ -118,3 +184,92 @@ func (o *observation) GetID() string { func (o *observation) FlushEvent() bool { return o.Flush } + +func recurseStructureWithTagFilter(value any, allowedStructTags map[string]string, allowLevel bool) any { + if allowedStructTags == nil { + allowedStructTags = map[string]string{} + } + + kind := reflect.ValueOf(value).Kind() + + switch kind { + case reflect.Interface, reflect.Ptr: + value = reflect.ValueOf(value).Elem().Interface() + return recurseStructureWithTagFilter(value, allowedStructTags, allowLevel) + case reflect.Map: + out := map[string]any{} + m := reflect.ValueOf(value) + for _, k := range m.MapKeys() { + mVal := m.MapIndex(k).Interface() + if vv := recurseStructureWithTagFilter(mVal, allowedStructTags, allowLevel); vv != nil { + if strKey, ok := k.Interface().(string); ok { + out[strKey] = vv + } else { + out[fmt.Sprintf("%v", k)] = vv + } + } + } + if allowLevel { + return out + } + return nil + case reflect.Array, reflect.Slice: + out := []any{} + s := reflect.ValueOf(value) + for i := 0; i < s.Len(); i++ { + sVal := s.Index(i).Interface() + if vv := recurseStructureWithTagFilter(sVal, allowedStructTags, allowLevel); vv != nil { + out = append(out, vv) + } + } + if allowLevel { + return out + } + return nil + case reflect.Struct: + // traverse the struct recursively, and apply any tag filters + fields := reflect.TypeOf(value) + values := reflect.ValueOf(value) + num := fields.NumField() + out := map[string]any{} + for i := 0; i < num; i++ { + field := fields.Field(i) + name := field.Name + allowLevel := false + if len(allowedStructTags) == 0 { + allowLevel = true + } else { + for tag, tagValue := range allowedStructTags { + if field.Tag.Get(tag) == tagValue { + // log.Printf("%s is allowed by Tags (%s:\"%s\")", name, tag, tagValue) + allowLevel = true + break + } + } + } + if !field.IsExported() { + continue + } + if values.Field(i).IsZero() { + if allowLevel { + out[name] = "" + } + continue + } + if !values.Field(i).IsValid() { + continue + } + value := values.Field(i).Interface() + if vv := recurseStructureWithTagFilter(value, allowedStructTags, allowLevel); vv != nil { + out[name] = vv + } + } + return out + default: + // any other non structured type, we will just barf out as is + if allowLevel { + return value + } + return nil + } +} diff --git a/internal/event/event_observation_test.go b/internal/event/event_observation_test.go index 8e1c45d2b7..c766c6c18e 100644 --- a/internal/event/event_observation_test.go +++ b/internal/event/event_observation_test.go @@ -4,9 +4,12 @@ package event import ( + "encoding/json" "testing" "time" + "github.com/hashicorp/boundary/internal/gen/controller/servers" + "github.com/hashicorp/boundary/internal/gen/controller/servers/services" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -139,3 +142,49 @@ func Test_observationEventType(t *testing.T) { e := &observation{} assert.Equal(t, string(ObservationType), e.EventType()) } + +func Test_iterateProto(t *testing.T) { + assert, _ := assert.New(t), require.New(t) + input := Request{ + Operation: "", + Endpoint: "", + Details: &services.StatusRequest{ + Jobs: []*services.JobStatus{ + {Job: &services.Job{ + Type: 1, + JobInfo: nil, + }}, + }, + UpdateTags: false, + WorkerStatus: &servers.ServerWorkerStatus{ + PublicId: "testID", + Name: "w_1234567890", + Description: "A default worker created in", + Address: "127.0.0.1:9202", + Tags: []*servers.TagPair{ + { + Key: "type", + Value: "dev", + }, + }, + KeyId: "ovary-valid-curler-scrambled-glutinous-alias-rework-debit", + ReleaseVersion: "Boundary v0.13.1", + OperationalState: "active", + }, + ConnectedWorkerKeyIdentifiers: nil, + ConnectedUnmappedWorkerKeyIdentifiers: nil, + ConnectedWorkerPublicIds: nil, + }, + DetailsUpstreamMessage: nil, + } + + res := recurseStructureWithTagFilter( + input.Details, + map[string]string{ + "eventstream": "observation", + }, + false, + ) + data, _ := json.Marshal(res) + assert.NotNil(data) +}