From c965cf1bc57363336491b51a1d6232fdda39e96b Mon Sep 17 00:00:00 2001 From: Jeff Mitchell Date: Tue, 30 Jun 2020 17:37:24 -0400 Subject: [PATCH 1/2] Attempt to fix content-length header in index (#159) --- internal/servers/controller/index_response_writer.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/servers/controller/index_response_writer.go b/internal/servers/controller/index_response_writer.go index f78eed7af2..44b9279703 100644 --- a/internal/servers/controller/index_response_writer.go +++ b/internal/servers/controller/index_response_writer.go @@ -2,6 +2,7 @@ package controller import ( "bytes" + "fmt" "net/http" "strings" ) @@ -42,10 +43,8 @@ func (i *indexResponseWriter) writeToWriter(w http.ResponseWriter) { w.Header().Add(k, h) } } + newBody := []byte(strings.Replace(i.body.String(), magicValue, i.defaultOrgId, 1)) + w.Header().Set("content-length", fmt.Sprintf("%d", len(newBody))) w.WriteHeader(i.statusCode) - w.Write( - []byte( - strings.Replace(i.body.String(), magicValue, i.defaultOrgId, 1), - ), - ) + w.Write(newBody) } From d0a846823a47a379fabf1358d3fa487b71d2e36e Mon Sep 17 00:00:00 2001 From: Jim Date: Wed, 1 Jul 2020 05:40:02 -0400 Subject: [PATCH 2/2] support for NewOplogMsgs() option when calling CreateItems and DeleteItems (#158) --- internal/db/option.go | 15 ++- internal/db/option_test.go | 15 ++- internal/db/read_writer.go | 160 +++++++++++++++----------- internal/db/read_writer_test.go | 198 ++++++++++++++++++++++++++++++-- 4 files changed, 310 insertions(+), 78 deletions(-) diff --git a/internal/db/option.go b/internal/db/option.go index 06cb957458..ef47e58bdc 100644 --- a/internal/db/option.go +++ b/internal/db/option.go @@ -29,7 +29,9 @@ type Options struct { // WithNullPaths must be accessible from other packages. WithNullPaths []string - newOplogMsg *oplog.Message + newOplogMsg *oplog.Message + newOplogMsgs *[]*oplog.Message + // WithVersion must be accessible from other packages WithVersion int } @@ -82,6 +84,17 @@ func NewOplogMsg(msg *oplog.Message) Option { } } +// NewOplogMsgs provides an option to ask for multiple new in-memory oplog +// messages. The new msgs will be returned in the provided *[]oplog.Message +// parameter. NewOplogMsgs can only be used with write functions that operate on +// multiple items(CreateItems, DeleteItems). WithOplog and NewOplogMsgs cannot +// be used together. +func NewOplogMsgs(msgs *[]*oplog.Message) Option { + return func(o *Options) { + o.newOplogMsgs = msgs + } +} + // WithFieldMaskPaths provides an option to provide field mask paths. func WithFieldMaskPaths(paths []string) Option { return func(o *Options) { diff --git a/internal/db/option_test.go b/internal/db/option_test.go index 0c0eb0ab5b..c1832e1084 100644 --- a/internal/db/option_test.go +++ b/internal/db/option_test.go @@ -96,19 +96,30 @@ func Test_getOpts(t *testing.T) { }) t.Run("NewOplogMsg", func(t *testing.T) { assert := assert.New(t) - // test default of false opts := GetOpts() testOpts := getDefaultOptions() testOpts.newOplogMsg = nil assert.Equal(opts, testOpts) msg := oplog.Message{} - // try setting to true opts = GetOpts(NewOplogMsg(&msg)) testOpts = getDefaultOptions() testOpts.newOplogMsg = &msg assert.Equal(opts, testOpts) }) + t.Run("NewOplogMsgs", func(t *testing.T) { + assert := assert.New(t) + opts := GetOpts() + testOpts := getDefaultOptions() + testOpts.newOplogMsgs = nil + assert.Equal(opts, testOpts) + + msgs := []*oplog.Message{} + opts = GetOpts(NewOplogMsgs(&msgs)) + testOpts = getDefaultOptions() + testOpts.newOplogMsgs = &msgs + assert.Equal(opts, testOpts) + }) t.Run("WithVersion", func(t *testing.T) { assert := assert.New(t) // test default of 0 diff --git a/internal/db/read_writer.go b/internal/db/read_writer.go index 42ea20f0ac..d3794e96f1 100644 --- a/internal/db/read_writer.go +++ b/internal/db/read_writer.go @@ -77,10 +77,12 @@ type Writer interface { Create(ctx context.Context, i interface{}, opt ...Option) error // CreateItems will create multiple items of the same type. - // Supported options: WithOplog. WithLookup is not a supported option. - // The caller is responsible for the transaction life cycle of the writer - // and if an error is returned the caller must decide what to do with - // the transaction, which almost always should be to rollback. + // Supported options: WithOplog and WithOplogMsgs. WithOplog and + // WithOplogMsgs may not be used together. WithLookup is not a + // supported option. The caller is responsible for the transaction life + // cycle of the writer and if an error is returned the caller must decide + // what to do with the transaction, which almost always should be to + // rollback. CreateItems(ctx context.Context, createItems []interface{}, opt ...Option) error // Delete an object in the db with options: WithOplog @@ -91,7 +93,8 @@ type Writer interface { Delete(ctx context.Context, i interface{}, opt ...Option) (int, error) // DeleteItems will delete multiple items of the same type. - // Supported options: WithOplog. The caller is responsible for the + // Supported options: WithOplog and WithOplogMsgs. WithOplog and + // WithOplogMsgs may not be used together. The caller is responsible for the // transaction life cycle of the writer and if an error is returned the // caller must decide what to do with the transaction, which almost always // should be to rollback. Delete returns the number of rows deleted or an error. @@ -272,8 +275,9 @@ func (rw *Db) Create(ctx context.Context, i interface{}, opt ...Option) error { return nil } -// CreateItems will create multiple items of the same type. -// Supported options: WithOplog. WithLookup is not a supported option. +// CreateItems will create multiple items of the same type. Supported options: +// WithOplog and WithOplogMsgs. WithOplog and WithOplogMsgs may not be used +// together. WithLookup is not a supported option. func (rw *Db) CreateItems(ctx context.Context, createItems []interface{}, opt ...Option) error { if rw.underlying == nil { return fmt.Errorf("create items: missing underlying db: %w", ErrNilParameter) @@ -283,7 +287,13 @@ func (rw *Db) CreateItems(ctx context.Context, createItems []interface{}, opt .. } opts := GetOpts(opt...) if opts.withLookup { - return fmt.Errorf("create items: withLookup not a supported option: %w", ErrInvalidParameter) + return fmt.Errorf("create items: with lookup not a supported option: %w", ErrInvalidParameter) + } + if opts.newOplogMsg != nil { + return fmt.Errorf("create items: new oplog msg (singular) is not a supported option: %w", ErrInvalidParameter) + } + if opts.withOplog && opts.newOplogMsgs != nil { + return fmt.Errorf("create items: both WithOplog and NewOplogMsgs options have been specified: %w", ErrInvalidParameter) } // verify that createItems are all the same type. var foundType reflect.Type @@ -318,6 +328,13 @@ func (rw *Db) CreateItems(ctx context.Context, createItems []interface{}, opt .. return fmt.Errorf("create items: unable to add oplog: %w", err) } } + if opts.newOplogMsgs != nil { + msgs, err := rw.oplogMsgsForItems(ctx, CreateOp, opts, createItems) + if err != nil { + return fmt.Errorf("create items: returning oplog msgs failed %w", err) + } + *opts.newOplogMsgs = append(*opts.newOplogMsgs, msgs...) + } return nil } @@ -513,8 +530,9 @@ func (rw *Db) Delete(ctx context.Context, i interface{}, opt ...Option) (int, er return rowsDeleted, nil } -// DeleteItems will delete multiple items of the same type. -// Supported options: WithOplog. +// DeleteItems will delete multiple items of the same type. Supported options: +// WithOplog and WithOplogMsgs. WithOplog and WithOplogMsgs may not be used +// together. func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt ...Option) (int, error) { if rw.underlying == nil { return NoRowsAffected, fmt.Errorf("delete items: missing underlying db: %w", ErrNilParameter) @@ -522,6 +540,13 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt .. if len(deleteItems) == 0 { return NoRowsAffected, fmt.Errorf("delete items: no interfaces to delete: %w", ErrInvalidParameter) } + opts := GetOpts(opt...) + if opts.newOplogMsg != nil { + return NoRowsAffected, fmt.Errorf("delete items: new oplog msg (singular) is not a supported option: %w", ErrInvalidParameter) + } + if opts.withOplog && opts.newOplogMsgs != nil { + return NoRowsAffected, fmt.Errorf("delete items: both WithOplog and NewOplogMsgs options have been specified: %w", ErrInvalidParameter) + } // verify that createItems are all the same type. var foundType reflect.Type for i, v := range deleteItems { @@ -533,7 +558,7 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt .. return NoRowsAffected, fmt.Errorf("delete items: items contain disparate types. item %d is not a %s: %w", i, foundType.Name(), ErrInvalidParameter) } } - opts := GetOpts(opt...) + var ticket *store.Ticket if opts.withOplog { _, err := validateOplogArgs(deleteItems[0], opts) @@ -556,9 +581,18 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt .. } rowsDeleted += int(underlying.RowsAffected) } - if opts.withOplog && rowsDeleted > 0 { - if err := rw.addOplogForItems(ctx, DeleteOp, opts, ticket, deleteItems); err != nil { - return rowsDeleted, fmt.Errorf("delete items: unable to add oplog: %w", err) + if rowsDeleted > 0 && (opts.withOplog || opts.newOplogMsgs != nil) { + if opts.withOplog { + if err := rw.addOplogForItems(ctx, DeleteOp, opts, ticket, deleteItems); err != nil { + return rowsDeleted, fmt.Errorf("delete items: unable to add oplog: %w", err) + } + } + if opts.newOplogMsgs != nil { + msgs, err := rw.oplogMsgsForItems(ctx, DeleteOp, opts, deleteItems) + if err != nil { + return rowsDeleted, fmt.Errorf("delete items: returning oplog msgs failed %w", err) + } + *opts.newOplogMsgs = append(*opts.newOplogMsgs, msgs...) } } return rowsDeleted, nil @@ -610,33 +644,62 @@ func (rw *Db) GetTicket(i interface{}) (*store.Ticket, error) { return rw.getTicketFor(replayable.TableName()) } +func (rw *Db) oplogMsgsForItems(ctx context.Context, opType OpType, opts Options, items []interface{}) ([]*oplog.Message, error) { + if len(items) == 0 { + return nil, fmt.Errorf("oplog msgs for items: items is empty: %w", ErrInvalidParameter) + } + oplogMsgs := []*oplog.Message{} + var foundType reflect.Type + for i, item := range items { + if i == 0 { + foundType = reflect.TypeOf(item) + } + currentType := reflect.TypeOf(item) + if foundType != currentType { + return nil, fmt.Errorf("oplog msgs for items: items contains disparate types. item (%d) %s is not a %s: %w", i, currentType, foundType, ErrInvalidParameter) + } + msg, err := rw.newOplogMessage(ctx, opType, item, WithFieldMaskPaths(opts.WithFieldMaskPaths), WithNullPaths(opts.WithNullPaths)) + if err != nil { + return nil, fmt.Errorf("oplog msgs for items: %w", err) + } + oplogMsgs = append(oplogMsgs, msg) + } + return oplogMsgs, nil +} + // addOplogForItems will add a multi-message oplog entry with one msg for each // item. Items must all be of the same type. Only CreateOp and DeleteOp are // currently supported operations. func (rw *Db) addOplogForItems(ctx context.Context, opType OpType, opts Options, ticket *store.Ticket, items []interface{}) error { oplogArgs := opts.oplogOpts if ticket == nil { - return fmt.Errorf("oplog many: ticket is missing: %w", ErrNilParameter) + return fmt.Errorf("oplog for items: ticket is missing: %w", ErrNilParameter) } if items == nil { - return fmt.Errorf("oplog many: items are missing: %w", ErrNilParameter) + return fmt.Errorf("oplog for items: items are missing: %w", ErrNilParameter) } if len(items) == 0 { - return fmt.Errorf("oplog many: items is empty: %w", ErrInvalidParameter) + return fmt.Errorf("oplog for items: items is empty: %w", ErrInvalidParameter) } if oplogArgs.metadata == nil { - return fmt.Errorf("oplog many: metadata is missing: %w", ErrNilParameter) + return fmt.Errorf("oplog for items: metadata is missing: %w", ErrNilParameter) } if oplogArgs.wrapper == nil { - return fmt.Errorf("oplog many: wrapper is missing: %w", ErrNilParameter) + return fmt.Errorf("oplog for items: wrapper is missing: %w", ErrNilParameter) } + + oplogMsgs, err := rw.oplogMsgsForItems(ctx, opType, opts, items) + if err != nil { + return fmt.Errorf("oplog for items: %w", err) + } + replayable, err := validateOplogArgs(items[0], opts) if err != nil { - return fmt.Errorf("oplog many: oplog validation failed %w", err) + return fmt.Errorf("oplog for items: oplog validation failed %w", err) } ticketer, err := oplog.NewGormTicketer(rw.underlying, oplog.WithAggregateNames(true)) if err != nil { - return fmt.Errorf("oplog many: unable to get Ticketer %w", err) + return fmt.Errorf("oplog for items: unable to get Ticketer %w", err) } entry, err := oplog.NewEntry( replayable.TableName(), @@ -645,35 +708,7 @@ func (rw *Db) addOplogForItems(ctx context.Context, opType OpType, opts Options, ticketer, ) if err != nil { - return fmt.Errorf("oplog many: unable to create oplog entry %w", err) - } - oplogMsgs := []*oplog.Message{} - var foundType reflect.Type - for i, item := range items { - if i == 0 { - foundType = reflect.TypeOf(item) - } - currentType := reflect.TypeOf(item) - if foundType != currentType { - return fmt.Errorf("oplog many: items contains disparate types. item %d is not a %s", i, foundType.Name()) - } - replayable, ok := item.(oplog.ReplayableMessage) - if !ok { - return fmt.Errorf("oplog many: item %d not a replayable oplog message %w", i, ErrInvalidParameter) - } - msg := &oplog.Message{ - Message: item.(proto.Message), - TypeName: replayable.TableName(), - } - switch opType { - case CreateOp: - msg.OpType = oplog.OpType_OP_TYPE_CREATE - case DeleteOp: - msg.OpType = oplog.OpType_OP_TYPE_DELETE - default: - return fmt.Errorf("oplog many: operation type %v is not supported", opType) - } - oplogMsgs = append(oplogMsgs, msg) + return fmt.Errorf("oplog for items: unable to create oplog entry %w", err) } if err := entry.WriteEntryWith( ctx, @@ -681,10 +716,11 @@ func (rw *Db) addOplogForItems(ctx context.Context, opType OpType, opts Options, ticket, oplogMsgs..., ); err != nil { - return fmt.Errorf("oplog many: unable to write oplog entry %w", err) + return fmt.Errorf("oplog for items: unable to write oplog entry %w", err) } return nil } + func (rw *Db) addOplog(ctx context.Context, opType OpType, opts Options, ticket *store.Ticket, i interface{}) error { oplogArgs := opts.oplogOpts replayable, err := validateOplogArgs(i, opts) @@ -707,27 +743,15 @@ func (rw *Db) addOplog(ctx context.Context, opType OpType, opts Options, ticket if err != nil { return err } - msg := oplog.Message{ - Message: i.(proto.Message), - TypeName: replayable.TableName(), - } - switch opType { - case CreateOp: - msg.OpType = oplog.OpType_OP_TYPE_CREATE - case UpdateOp: - msg.OpType = oplog.OpType_OP_TYPE_UPDATE - msg.FieldMaskPaths = opts.WithFieldMaskPaths - msg.SetToNullPaths = opts.WithNullPaths - case DeleteOp: - msg.OpType = oplog.OpType_OP_TYPE_DELETE - default: - return fmt.Errorf("add oplog: operation type %v is not supported", opType) + msg, err := rw.newOplogMessage(ctx, opType, i, WithFieldMaskPaths(opts.WithFieldMaskPaths), WithNullPaths(opts.WithNullPaths)) + if err != nil { + return fmt.Errorf("add oplog: %w", err) } err = entry.WriteEntryWith( ctx, &oplog.GormWriter{Tx: rw.underlying}, ticket, - &msg, + msg, ) if err != nil { return fmt.Errorf("add oplog: unable to write oplog entry: %w", err) @@ -803,7 +827,7 @@ func (rw *Db) newOplogMessage(ctx context.Context, opType OpType, i interface{}, case DeleteOp: msg.OpType = oplog.OpType_OP_TYPE_DELETE default: - return nil, fmt.Errorf("add oplog: operation type %v is not supported", opType) + return nil, fmt.Errorf("operation type %v is not supported", opType) } return &msg, nil } diff --git a/internal/db/read_writer_test.go b/internal/db/read_writer_test.go index 896771bb7a..de2c9c3729 100644 --- a/internal/db/read_writer_test.go +++ b/internal/db/read_writer_test.go @@ -1478,19 +1478,22 @@ func TestDb_CreateItems(t *testing.T) { u, c, } - } + + returnedMsgs := []*oplog.Message{} + type args struct { createItems []interface{} opt []Option } tests := []struct { - name string - underlying *gorm.DB - args args - wantOplogId string - wantErr bool - wantErrIs error + name string + underlying *gorm.DB + args args + wantOplogId string + wantOplogMsgs bool + wantErr bool + wantErrIs error }{ { name: "simple", @@ -1518,6 +1521,37 @@ func TestDb_CreateItems(t *testing.T) { wantOplogId: testOplogResourceId, wantErr: false, }, + { + name: "NewOplogMsgs", + underlying: db, + args: args{ + createItems: createFn(), + opt: []Option{ + NewOplogMsgs(&returnedMsgs), + }, + }, + wantOplogMsgs: true, + wantErr: false, + }, + { + name: "withOplog and NewOplogMsgs", + underlying: db, + args: args{ + createItems: createFn(), + opt: []Option{ + NewOplogMsgs(&[]*oplog.Message{}), + WithOplog( + TestWrapper(t), + oplog.Metadata{ + "resource-public-id": []string{testOplogResourceId}, + "op-type": []string{oplog.OpType_OP_TYPE_CREATE.String()}, + }, + ), + }, + }, + wantErrIs: ErrInvalidParameter, + wantErr: true, + }, { name: "mixed items", underlying: db, @@ -1626,6 +1660,12 @@ func TestDb_CreateItems(t *testing.T) { err = TestVerifyOplog(t, rw, tt.wantOplogId, WithOperation(oplog.OpType_OP_TYPE_CREATE), WithCreateNotBefore(10*time.Second)) assert.NoError(err) } + if tt.wantOplogMsgs { + assert.Equal(len(tt.args.createItems), len(returnedMsgs)) + for _, m := range returnedMsgs { + assert.Equal(m.OpType, oplog.OpType_OP_TYPE_CREATE) + } + } }) } } @@ -1649,6 +1689,9 @@ func TestDb_DeleteItems(t *testing.T) { } return results } + + returnedMsgs := []*oplog.Message{} + type args struct { deleteItems []interface{} opt []Option @@ -1659,6 +1702,7 @@ func TestDb_DeleteItems(t *testing.T) { args args wantRowsDeleted int wantOplogId string + wantOplogMsgs bool wantErr bool wantErrIs error }{ @@ -1671,6 +1715,37 @@ func TestDb_DeleteItems(t *testing.T) { wantRowsDeleted: 10, wantErr: false, }, + { + name: "NewOplogMsgs", + underlying: db, + args: args{ + deleteItems: createFn(), + opt: []Option{ + NewOplogMsgs(&returnedMsgs), + }, + }, + wantRowsDeleted: 10, + wantErr: false, + }, + { + name: "withOplog and NewOplogMsgs", + underlying: db, + args: args{ + deleteItems: createFn(), + opt: []Option{ + NewOplogMsgs(&[]*oplog.Message{}), + WithOplog( + TestWrapper(t), + oplog.Metadata{ + "resource-public-id": []string{testOplogResourceId}, + "op-type": []string{oplog.OpType_OP_TYPE_DELETE.String()}, + }, + ), + }, + }, + wantErr: true, + wantErrIs: ErrInvalidParameter, + }, { name: "withOplog", underlying: db, @@ -1788,6 +1863,12 @@ func TestDb_DeleteItems(t *testing.T) { err = TestVerifyOplog(t, rw, tt.wantOplogId, WithOperation(oplog.OpType_OP_TYPE_DELETE), WithCreateNotBefore(10*time.Second)) assert.NoError(err) } + if tt.wantOplogMsgs { + assert.Equal(len(tt.args.deleteItems), len(returnedMsgs)) + for _, m := range returnedMsgs { + assert.Equal(m.OpType, oplog.OpType_OP_TYPE_DELETE) + } + } }) } } @@ -2676,3 +2757,106 @@ func TestClear_SetFieldsToNil(t *testing.T) { }) } } + +func TestDb_oplogMsgsForItems(t *testing.T) { + t.Parallel() + + // underlying isn't used at this point, so it can just be nil + rw := Db{underlying: nil} + var users []interface{} + var wantUsrMsgs []*oplog.Message + for i := 0; i < 5; i++ { + publicId, err := base62.Random(20) + require.NoError(t, err) + u := &db_test.TestUser{StoreTestUser: &db_test.StoreTestUser{PublicId: publicId}} + users = append(users, u) + wantUsrMsgs = append( + wantUsrMsgs, + &oplog.Message{ + Message: users[i].(proto.Message), + TypeName: u.TableName(), + OpType: oplog.OpType_OP_TYPE_CREATE, + }, + ) + } + + publicId, err := base62.Random(20) + require.NoError(t, err) + mixed := []interface{}{ + &db_test.TestUser{StoreTestUser: &db_test.StoreTestUser{PublicId: publicId}}, + &db_test.TestCar{StoreTestCar: &db_test.StoreTestCar{PublicId: publicId}}, + } + + type args struct { + opType OpType + opts Options + items []interface{} + } + tests := []struct { + name string + args args + want []*oplog.Message + wantErr bool + wantIsErr error + }{ + { + name: "valid", + args: args{ + opType: CreateOp, + items: users, + }, + wantErr: false, + want: wantUsrMsgs, + }, + { + name: "nil items", + args: args{ + opType: CreateOp, + items: nil, + }, + wantErr: true, + wantIsErr: ErrInvalidParameter, + }, + { + name: "zero items", + args: args{ + opType: CreateOp, + items: []interface{}{}, + }, + wantErr: true, + wantIsErr: ErrInvalidParameter, + }, + { + name: "mixed items", + args: args{ + opType: CreateOp, + items: mixed, + }, + wantErr: true, + wantIsErr: ErrInvalidParameter, + }, + { + name: "bad op", + args: args{ + opType: UnknownOp, + items: users, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert, require := assert.New(t), require.New(t) + got, err := rw.oplogMsgsForItems(context.Background(), tt.args.opType, tt.args.opts, tt.args.items) + if tt.wantErr { + require.Error(err) + if tt.wantIsErr != nil { + assert.Truef(errors.Is(err, tt.wantIsErr), "unexpected error %s", err.Error()) + } + return + } + require.NoError(err) + assert.Equal(tt.want, got) + }) + } +}