support for NewOplogMsgs() option when calling CreateItems and DeleteItems (#158)

pull/160/head
Jim 6 years ago committed by GitHub
parent c965cf1bc5
commit d0a846823a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -29,7 +29,9 @@ type Options struct {
// WithNullPaths must be accessible from other packages.
WithNullPaths []string
newOplogMsg *oplog.Message
newOplogMsg *oplog.Message
newOplogMsgs *[]*oplog.Message
// WithVersion must be accessible from other packages
WithVersion int
}
@ -82,6 +84,17 @@ func NewOplogMsg(msg *oplog.Message) Option {
}
}
// NewOplogMsgs provides an option to ask for multiple new in-memory oplog
// messages. The new msgs will be returned in the provided *[]oplog.Message
// parameter. NewOplogMsgs can only be used with write functions that operate on
// multiple items(CreateItems, DeleteItems). WithOplog and NewOplogMsgs cannot
// be used together.
func NewOplogMsgs(msgs *[]*oplog.Message) Option {
return func(o *Options) {
o.newOplogMsgs = msgs
}
}
// WithFieldMaskPaths provides an option to provide field mask paths.
func WithFieldMaskPaths(paths []string) Option {
return func(o *Options) {

@ -96,19 +96,30 @@ func Test_getOpts(t *testing.T) {
})
t.Run("NewOplogMsg", func(t *testing.T) {
assert := assert.New(t)
// test default of false
opts := GetOpts()
testOpts := getDefaultOptions()
testOpts.newOplogMsg = nil
assert.Equal(opts, testOpts)
msg := oplog.Message{}
// try setting to true
opts = GetOpts(NewOplogMsg(&msg))
testOpts = getDefaultOptions()
testOpts.newOplogMsg = &msg
assert.Equal(opts, testOpts)
})
t.Run("NewOplogMsgs", func(t *testing.T) {
assert := assert.New(t)
opts := GetOpts()
testOpts := getDefaultOptions()
testOpts.newOplogMsgs = nil
assert.Equal(opts, testOpts)
msgs := []*oplog.Message{}
opts = GetOpts(NewOplogMsgs(&msgs))
testOpts = getDefaultOptions()
testOpts.newOplogMsgs = &msgs
assert.Equal(opts, testOpts)
})
t.Run("WithVersion", func(t *testing.T) {
assert := assert.New(t)
// test default of 0

@ -77,10 +77,12 @@ type Writer interface {
Create(ctx context.Context, i interface{}, opt ...Option) error
// CreateItems will create multiple items of the same type.
// Supported options: WithOplog. WithLookup is not a supported option.
// The caller is responsible for the transaction life cycle of the writer
// and if an error is returned the caller must decide what to do with
// the transaction, which almost always should be to rollback.
// Supported options: WithOplog and WithOplogMsgs. WithOplog and
// WithOplogMsgs may not be used together. WithLookup is not a
// supported option. The caller is responsible for the transaction life
// cycle of the writer and if an error is returned the caller must decide
// what to do with the transaction, which almost always should be to
// rollback.
CreateItems(ctx context.Context, createItems []interface{}, opt ...Option) error
// Delete an object in the db with options: WithOplog
@ -91,7 +93,8 @@ type Writer interface {
Delete(ctx context.Context, i interface{}, opt ...Option) (int, error)
// DeleteItems will delete multiple items of the same type.
// Supported options: WithOplog. The caller is responsible for the
// Supported options: WithOplog and WithOplogMsgs. WithOplog and
// WithOplogMsgs may not be used together. The caller is responsible for the
// transaction life cycle of the writer and if an error is returned the
// caller must decide what to do with the transaction, which almost always
// should be to rollback. Delete returns the number of rows deleted or an error.
@ -272,8 +275,9 @@ func (rw *Db) Create(ctx context.Context, i interface{}, opt ...Option) error {
return nil
}
// CreateItems will create multiple items of the same type.
// Supported options: WithOplog. WithLookup is not a supported option.
// CreateItems will create multiple items of the same type. Supported options:
// WithOplog and WithOplogMsgs. WithOplog and WithOplogMsgs may not be used
// together. WithLookup is not a supported option.
func (rw *Db) CreateItems(ctx context.Context, createItems []interface{}, opt ...Option) error {
if rw.underlying == nil {
return fmt.Errorf("create items: missing underlying db: %w", ErrNilParameter)
@ -283,7 +287,13 @@ func (rw *Db) CreateItems(ctx context.Context, createItems []interface{}, opt ..
}
opts := GetOpts(opt...)
if opts.withLookup {
return fmt.Errorf("create items: withLookup not a supported option: %w", ErrInvalidParameter)
return fmt.Errorf("create items: with lookup not a supported option: %w", ErrInvalidParameter)
}
if opts.newOplogMsg != nil {
return fmt.Errorf("create items: new oplog msg (singular) is not a supported option: %w", ErrInvalidParameter)
}
if opts.withOplog && opts.newOplogMsgs != nil {
return fmt.Errorf("create items: both WithOplog and NewOplogMsgs options have been specified: %w", ErrInvalidParameter)
}
// verify that createItems are all the same type.
var foundType reflect.Type
@ -318,6 +328,13 @@ func (rw *Db) CreateItems(ctx context.Context, createItems []interface{}, opt ..
return fmt.Errorf("create items: unable to add oplog: %w", err)
}
}
if opts.newOplogMsgs != nil {
msgs, err := rw.oplogMsgsForItems(ctx, CreateOp, opts, createItems)
if err != nil {
return fmt.Errorf("create items: returning oplog msgs failed %w", err)
}
*opts.newOplogMsgs = append(*opts.newOplogMsgs, msgs...)
}
return nil
}
@ -513,8 +530,9 @@ func (rw *Db) Delete(ctx context.Context, i interface{}, opt ...Option) (int, er
return rowsDeleted, nil
}
// DeleteItems will delete multiple items of the same type.
// Supported options: WithOplog.
// DeleteItems will delete multiple items of the same type. Supported options:
// WithOplog and WithOplogMsgs. WithOplog and WithOplogMsgs may not be used
// together.
func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt ...Option) (int, error) {
if rw.underlying == nil {
return NoRowsAffected, fmt.Errorf("delete items: missing underlying db: %w", ErrNilParameter)
@ -522,6 +540,13 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt ..
if len(deleteItems) == 0 {
return NoRowsAffected, fmt.Errorf("delete items: no interfaces to delete: %w", ErrInvalidParameter)
}
opts := GetOpts(opt...)
if opts.newOplogMsg != nil {
return NoRowsAffected, fmt.Errorf("delete items: new oplog msg (singular) is not a supported option: %w", ErrInvalidParameter)
}
if opts.withOplog && opts.newOplogMsgs != nil {
return NoRowsAffected, fmt.Errorf("delete items: both WithOplog and NewOplogMsgs options have been specified: %w", ErrInvalidParameter)
}
// verify that createItems are all the same type.
var foundType reflect.Type
for i, v := range deleteItems {
@ -533,7 +558,7 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt ..
return NoRowsAffected, fmt.Errorf("delete items: items contain disparate types. item %d is not a %s: %w", i, foundType.Name(), ErrInvalidParameter)
}
}
opts := GetOpts(opt...)
var ticket *store.Ticket
if opts.withOplog {
_, err := validateOplogArgs(deleteItems[0], opts)
@ -556,9 +581,18 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt ..
}
rowsDeleted += int(underlying.RowsAffected)
}
if opts.withOplog && rowsDeleted > 0 {
if err := rw.addOplogForItems(ctx, DeleteOp, opts, ticket, deleteItems); err != nil {
return rowsDeleted, fmt.Errorf("delete items: unable to add oplog: %w", err)
if rowsDeleted > 0 && (opts.withOplog || opts.newOplogMsgs != nil) {
if opts.withOplog {
if err := rw.addOplogForItems(ctx, DeleteOp, opts, ticket, deleteItems); err != nil {
return rowsDeleted, fmt.Errorf("delete items: unable to add oplog: %w", err)
}
}
if opts.newOplogMsgs != nil {
msgs, err := rw.oplogMsgsForItems(ctx, DeleteOp, opts, deleteItems)
if err != nil {
return rowsDeleted, fmt.Errorf("delete items: returning oplog msgs failed %w", err)
}
*opts.newOplogMsgs = append(*opts.newOplogMsgs, msgs...)
}
}
return rowsDeleted, nil
@ -610,33 +644,62 @@ func (rw *Db) GetTicket(i interface{}) (*store.Ticket, error) {
return rw.getTicketFor(replayable.TableName())
}
func (rw *Db) oplogMsgsForItems(ctx context.Context, opType OpType, opts Options, items []interface{}) ([]*oplog.Message, error) {
if len(items) == 0 {
return nil, fmt.Errorf("oplog msgs for items: items is empty: %w", ErrInvalidParameter)
}
oplogMsgs := []*oplog.Message{}
var foundType reflect.Type
for i, item := range items {
if i == 0 {
foundType = reflect.TypeOf(item)
}
currentType := reflect.TypeOf(item)
if foundType != currentType {
return nil, fmt.Errorf("oplog msgs for items: items contains disparate types. item (%d) %s is not a %s: %w", i, currentType, foundType, ErrInvalidParameter)
}
msg, err := rw.newOplogMessage(ctx, opType, item, WithFieldMaskPaths(opts.WithFieldMaskPaths), WithNullPaths(opts.WithNullPaths))
if err != nil {
return nil, fmt.Errorf("oplog msgs for items: %w", err)
}
oplogMsgs = append(oplogMsgs, msg)
}
return oplogMsgs, nil
}
// addOplogForItems will add a multi-message oplog entry with one msg for each
// item. Items must all be of the same type. Only CreateOp and DeleteOp are
// currently supported operations.
func (rw *Db) addOplogForItems(ctx context.Context, opType OpType, opts Options, ticket *store.Ticket, items []interface{}) error {
oplogArgs := opts.oplogOpts
if ticket == nil {
return fmt.Errorf("oplog many: ticket is missing: %w", ErrNilParameter)
return fmt.Errorf("oplog for items: ticket is missing: %w", ErrNilParameter)
}
if items == nil {
return fmt.Errorf("oplog many: items are missing: %w", ErrNilParameter)
return fmt.Errorf("oplog for items: items are missing: %w", ErrNilParameter)
}
if len(items) == 0 {
return fmt.Errorf("oplog many: items is empty: %w", ErrInvalidParameter)
return fmt.Errorf("oplog for items: items is empty: %w", ErrInvalidParameter)
}
if oplogArgs.metadata == nil {
return fmt.Errorf("oplog many: metadata is missing: %w", ErrNilParameter)
return fmt.Errorf("oplog for items: metadata is missing: %w", ErrNilParameter)
}
if oplogArgs.wrapper == nil {
return fmt.Errorf("oplog many: wrapper is missing: %w", ErrNilParameter)
return fmt.Errorf("oplog for items: wrapper is missing: %w", ErrNilParameter)
}
oplogMsgs, err := rw.oplogMsgsForItems(ctx, opType, opts, items)
if err != nil {
return fmt.Errorf("oplog for items: %w", err)
}
replayable, err := validateOplogArgs(items[0], opts)
if err != nil {
return fmt.Errorf("oplog many: oplog validation failed %w", err)
return fmt.Errorf("oplog for items: oplog validation failed %w", err)
}
ticketer, err := oplog.NewGormTicketer(rw.underlying, oplog.WithAggregateNames(true))
if err != nil {
return fmt.Errorf("oplog many: unable to get Ticketer %w", err)
return fmt.Errorf("oplog for items: unable to get Ticketer %w", err)
}
entry, err := oplog.NewEntry(
replayable.TableName(),
@ -645,35 +708,7 @@ func (rw *Db) addOplogForItems(ctx context.Context, opType OpType, opts Options,
ticketer,
)
if err != nil {
return fmt.Errorf("oplog many: unable to create oplog entry %w", err)
}
oplogMsgs := []*oplog.Message{}
var foundType reflect.Type
for i, item := range items {
if i == 0 {
foundType = reflect.TypeOf(item)
}
currentType := reflect.TypeOf(item)
if foundType != currentType {
return fmt.Errorf("oplog many: items contains disparate types. item %d is not a %s", i, foundType.Name())
}
replayable, ok := item.(oplog.ReplayableMessage)
if !ok {
return fmt.Errorf("oplog many: item %d not a replayable oplog message %w", i, ErrInvalidParameter)
}
msg := &oplog.Message{
Message: item.(proto.Message),
TypeName: replayable.TableName(),
}
switch opType {
case CreateOp:
msg.OpType = oplog.OpType_OP_TYPE_CREATE
case DeleteOp:
msg.OpType = oplog.OpType_OP_TYPE_DELETE
default:
return fmt.Errorf("oplog many: operation type %v is not supported", opType)
}
oplogMsgs = append(oplogMsgs, msg)
return fmt.Errorf("oplog for items: unable to create oplog entry %w", err)
}
if err := entry.WriteEntryWith(
ctx,
@ -681,10 +716,11 @@ func (rw *Db) addOplogForItems(ctx context.Context, opType OpType, opts Options,
ticket,
oplogMsgs...,
); err != nil {
return fmt.Errorf("oplog many: unable to write oplog entry %w", err)
return fmt.Errorf("oplog for items: unable to write oplog entry %w", err)
}
return nil
}
func (rw *Db) addOplog(ctx context.Context, opType OpType, opts Options, ticket *store.Ticket, i interface{}) error {
oplogArgs := opts.oplogOpts
replayable, err := validateOplogArgs(i, opts)
@ -707,27 +743,15 @@ func (rw *Db) addOplog(ctx context.Context, opType OpType, opts Options, ticket
if err != nil {
return err
}
msg := oplog.Message{
Message: i.(proto.Message),
TypeName: replayable.TableName(),
}
switch opType {
case CreateOp:
msg.OpType = oplog.OpType_OP_TYPE_CREATE
case UpdateOp:
msg.OpType = oplog.OpType_OP_TYPE_UPDATE
msg.FieldMaskPaths = opts.WithFieldMaskPaths
msg.SetToNullPaths = opts.WithNullPaths
case DeleteOp:
msg.OpType = oplog.OpType_OP_TYPE_DELETE
default:
return fmt.Errorf("add oplog: operation type %v is not supported", opType)
msg, err := rw.newOplogMessage(ctx, opType, i, WithFieldMaskPaths(opts.WithFieldMaskPaths), WithNullPaths(opts.WithNullPaths))
if err != nil {
return fmt.Errorf("add oplog: %w", err)
}
err = entry.WriteEntryWith(
ctx,
&oplog.GormWriter{Tx: rw.underlying},
ticket,
&msg,
msg,
)
if err != nil {
return fmt.Errorf("add oplog: unable to write oplog entry: %w", err)
@ -803,7 +827,7 @@ func (rw *Db) newOplogMessage(ctx context.Context, opType OpType, i interface{},
case DeleteOp:
msg.OpType = oplog.OpType_OP_TYPE_DELETE
default:
return nil, fmt.Errorf("add oplog: operation type %v is not supported", opType)
return nil, fmt.Errorf("operation type %v is not supported", opType)
}
return &msg, nil
}

@ -1478,19 +1478,22 @@ func TestDb_CreateItems(t *testing.T) {
u,
c,
}
}
returnedMsgs := []*oplog.Message{}
type args struct {
createItems []interface{}
opt []Option
}
tests := []struct {
name string
underlying *gorm.DB
args args
wantOplogId string
wantErr bool
wantErrIs error
name string
underlying *gorm.DB
args args
wantOplogId string
wantOplogMsgs bool
wantErr bool
wantErrIs error
}{
{
name: "simple",
@ -1518,6 +1521,37 @@ func TestDb_CreateItems(t *testing.T) {
wantOplogId: testOplogResourceId,
wantErr: false,
},
{
name: "NewOplogMsgs",
underlying: db,
args: args{
createItems: createFn(),
opt: []Option{
NewOplogMsgs(&returnedMsgs),
},
},
wantOplogMsgs: true,
wantErr: false,
},
{
name: "withOplog and NewOplogMsgs",
underlying: db,
args: args{
createItems: createFn(),
opt: []Option{
NewOplogMsgs(&[]*oplog.Message{}),
WithOplog(
TestWrapper(t),
oplog.Metadata{
"resource-public-id": []string{testOplogResourceId},
"op-type": []string{oplog.OpType_OP_TYPE_CREATE.String()},
},
),
},
},
wantErrIs: ErrInvalidParameter,
wantErr: true,
},
{
name: "mixed items",
underlying: db,
@ -1626,6 +1660,12 @@ func TestDb_CreateItems(t *testing.T) {
err = TestVerifyOplog(t, rw, tt.wantOplogId, WithOperation(oplog.OpType_OP_TYPE_CREATE), WithCreateNotBefore(10*time.Second))
assert.NoError(err)
}
if tt.wantOplogMsgs {
assert.Equal(len(tt.args.createItems), len(returnedMsgs))
for _, m := range returnedMsgs {
assert.Equal(m.OpType, oplog.OpType_OP_TYPE_CREATE)
}
}
})
}
}
@ -1649,6 +1689,9 @@ func TestDb_DeleteItems(t *testing.T) {
}
return results
}
returnedMsgs := []*oplog.Message{}
type args struct {
deleteItems []interface{}
opt []Option
@ -1659,6 +1702,7 @@ func TestDb_DeleteItems(t *testing.T) {
args args
wantRowsDeleted int
wantOplogId string
wantOplogMsgs bool
wantErr bool
wantErrIs error
}{
@ -1671,6 +1715,37 @@ func TestDb_DeleteItems(t *testing.T) {
wantRowsDeleted: 10,
wantErr: false,
},
{
name: "NewOplogMsgs",
underlying: db,
args: args{
deleteItems: createFn(),
opt: []Option{
NewOplogMsgs(&returnedMsgs),
},
},
wantRowsDeleted: 10,
wantErr: false,
},
{
name: "withOplog and NewOplogMsgs",
underlying: db,
args: args{
deleteItems: createFn(),
opt: []Option{
NewOplogMsgs(&[]*oplog.Message{}),
WithOplog(
TestWrapper(t),
oplog.Metadata{
"resource-public-id": []string{testOplogResourceId},
"op-type": []string{oplog.OpType_OP_TYPE_DELETE.String()},
},
),
},
},
wantErr: true,
wantErrIs: ErrInvalidParameter,
},
{
name: "withOplog",
underlying: db,
@ -1788,6 +1863,12 @@ func TestDb_DeleteItems(t *testing.T) {
err = TestVerifyOplog(t, rw, tt.wantOplogId, WithOperation(oplog.OpType_OP_TYPE_DELETE), WithCreateNotBefore(10*time.Second))
assert.NoError(err)
}
if tt.wantOplogMsgs {
assert.Equal(len(tt.args.deleteItems), len(returnedMsgs))
for _, m := range returnedMsgs {
assert.Equal(m.OpType, oplog.OpType_OP_TYPE_DELETE)
}
}
})
}
}
@ -2676,3 +2757,106 @@ func TestClear_SetFieldsToNil(t *testing.T) {
})
}
}
func TestDb_oplogMsgsForItems(t *testing.T) {
t.Parallel()
// underlying isn't used at this point, so it can just be nil
rw := Db{underlying: nil}
var users []interface{}
var wantUsrMsgs []*oplog.Message
for i := 0; i < 5; i++ {
publicId, err := base62.Random(20)
require.NoError(t, err)
u := &db_test.TestUser{StoreTestUser: &db_test.StoreTestUser{PublicId: publicId}}
users = append(users, u)
wantUsrMsgs = append(
wantUsrMsgs,
&oplog.Message{
Message: users[i].(proto.Message),
TypeName: u.TableName(),
OpType: oplog.OpType_OP_TYPE_CREATE,
},
)
}
publicId, err := base62.Random(20)
require.NoError(t, err)
mixed := []interface{}{
&db_test.TestUser{StoreTestUser: &db_test.StoreTestUser{PublicId: publicId}},
&db_test.TestCar{StoreTestCar: &db_test.StoreTestCar{PublicId: publicId}},
}
type args struct {
opType OpType
opts Options
items []interface{}
}
tests := []struct {
name string
args args
want []*oplog.Message
wantErr bool
wantIsErr error
}{
{
name: "valid",
args: args{
opType: CreateOp,
items: users,
},
wantErr: false,
want: wantUsrMsgs,
},
{
name: "nil items",
args: args{
opType: CreateOp,
items: nil,
},
wantErr: true,
wantIsErr: ErrInvalidParameter,
},
{
name: "zero items",
args: args{
opType: CreateOp,
items: []interface{}{},
},
wantErr: true,
wantIsErr: ErrInvalidParameter,
},
{
name: "mixed items",
args: args{
opType: CreateOp,
items: mixed,
},
wantErr: true,
wantIsErr: ErrInvalidParameter,
},
{
name: "bad op",
args: args{
opType: UnknownOp,
items: users,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert, require := assert.New(t), require.New(t)
got, err := rw.oplogMsgsForItems(context.Background(), tt.args.opType, tt.args.opts, tt.args.items)
if tt.wantErr {
require.Error(err)
if tt.wantIsErr != nil {
assert.Truef(errors.Is(err, tt.wantIsErr), "unexpected error %s", err.Error())
}
return
}
require.NoError(err)
assert.Equal(tt.want, got)
})
}
}

Loading…
Cancel
Save