|
|
|
|
@ -21,6 +21,8 @@ var (
|
|
|
|
|
ErrRecordNotFound = errors.New("record not found")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const NoRowsAffected = 0
|
|
|
|
|
|
|
|
|
|
// Reader interface defines lookups/searching for resources
|
|
|
|
|
type Reader interface {
|
|
|
|
|
// LookupByName will lookup resource by its friendly name which must be unique
|
|
|
|
|
@ -47,12 +49,13 @@ type Writer interface {
|
|
|
|
|
// DoTx will wrap the TxHandler in a retryable transaction
|
|
|
|
|
DoTx(ctx context.Context, retries uint, backOff Backoff, Handler TxHandler) (RetryInfo, error)
|
|
|
|
|
|
|
|
|
|
// Update an object in the db, if there's a fieldMask then only the field_mask.proto paths are updated, otherwise
|
|
|
|
|
// it will send every field to the DB. options: WithOplog
|
|
|
|
|
// the caller is responsible for the transaction life cycle of the writer
|
|
|
|
|
// and if an error is returned the caller must decide what to do with
|
|
|
|
|
// the transaction, which almost always should be to rollback.
|
|
|
|
|
Update(ctx context.Context, i interface{}, fieldMaskPaths []string, opt ...Option) error
|
|
|
|
|
// Update an object in the db, if there's a fieldMask then only the
|
|
|
|
|
// field_mask.proto paths are updated, otherwise it will send every field to
|
|
|
|
|
// the DB. options: WithOplog the caller is responsible for the transaction
|
|
|
|
|
// life cycle of the writer and if an error is returned the caller must
|
|
|
|
|
// decide what to do with the transaction, which almost always should be to
|
|
|
|
|
// rollback. Update returns the number of rows updated or an error.
|
|
|
|
|
Update(ctx context.Context, i interface{}, fieldMaskPaths []string, opt ...Option) (int, error)
|
|
|
|
|
|
|
|
|
|
// Create an object in the db with options: WithOplog
|
|
|
|
|
// the caller is responsible for the transaction life cycle of the writer
|
|
|
|
|
@ -63,8 +66,9 @@ type Writer interface {
|
|
|
|
|
// Delete an object in the db with options: WithOplog
|
|
|
|
|
// the caller is responsible for the transaction life cycle of the writer
|
|
|
|
|
// and if an error is returned the caller must decide what to do with
|
|
|
|
|
// the transaction, which almost always should be to rollback.
|
|
|
|
|
Delete(ctx context.Context, i interface{}, opt ...Option) error
|
|
|
|
|
// the transaction, which almost always should be to rollback. Delete
|
|
|
|
|
// returns the number of rows deleted or an error.
|
|
|
|
|
Delete(ctx context.Context, i interface{}, opt ...Option) (int, error)
|
|
|
|
|
|
|
|
|
|
// DB returns the sql.DB
|
|
|
|
|
DB() (*sql.DB, error)
|
|
|
|
|
@ -159,6 +163,13 @@ func (rw *Db) Create(ctx context.Context, i interface{}, opt ...Option) error {
|
|
|
|
|
opts := GetOpts(opt...)
|
|
|
|
|
withOplog := opts.withOplog
|
|
|
|
|
withDebug := opts.withDebug
|
|
|
|
|
if withOplog {
|
|
|
|
|
// let's validate oplog options before we start writing to the database
|
|
|
|
|
_, err := validateOplogArgs(i, opts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if withDebug {
|
|
|
|
|
rw.underlying.LogMode(true)
|
|
|
|
|
defer rw.underlying.LogMode(false)
|
|
|
|
|
@ -185,30 +196,39 @@ func (rw *Db) Create(ctx context.Context, i interface{}, opt ...Option) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update an object in the db, if there's a fieldMask then only the field_mask.proto paths are updated, otherwise
|
|
|
|
|
// it will send every field to the DB. Update supports embedding a struct (or structPtr) one level deep for updating
|
|
|
|
|
func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string, opt ...Option) error {
|
|
|
|
|
// Update an object in the db, if there's a fieldMask then only the
|
|
|
|
|
// field_mask.proto paths are updated, otherwise it will send every field to the
|
|
|
|
|
// DB. Update supports embedding a struct (or structPtr) one level deep for
|
|
|
|
|
// updating. Update returns the number of rows updated and any errors.
|
|
|
|
|
func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string, opt ...Option) (int, error) {
|
|
|
|
|
if rw.underlying == nil {
|
|
|
|
|
return errors.New("update underlying db is nil")
|
|
|
|
|
return NoRowsAffected, errors.New("update underlying db is nil")
|
|
|
|
|
}
|
|
|
|
|
opts := GetOpts(opt...)
|
|
|
|
|
withDebug := opts.withDebug
|
|
|
|
|
withOplog := opts.withOplog
|
|
|
|
|
if withOplog {
|
|
|
|
|
// let's validate oplog options before we start writing to the database
|
|
|
|
|
_, err := validateOplogArgs(i, opts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return NoRowsAffected, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if withDebug {
|
|
|
|
|
rw.underlying.LogMode(true)
|
|
|
|
|
defer rw.underlying.LogMode(false)
|
|
|
|
|
}
|
|
|
|
|
if i == nil {
|
|
|
|
|
return errors.New("update interface is nil")
|
|
|
|
|
return NoRowsAffected, errors.New("update interface is nil")
|
|
|
|
|
}
|
|
|
|
|
if vetter, ok := i.(VetForWriter); ok {
|
|
|
|
|
if err := vetter.VetForWrite(ctx, rw, UpdateOp, WithFieldMaskPaths(fieldMaskPaths)); err != nil {
|
|
|
|
|
return fmt.Errorf("error on update %w", err)
|
|
|
|
|
return NoRowsAffected, fmt.Errorf("error on update %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len(fieldMaskPaths) == 0 {
|
|
|
|
|
if err := rw.underlying.Save(i).Error; err != nil {
|
|
|
|
|
return fmt.Errorf("error updating: %w", err)
|
|
|
|
|
return NoRowsAffected, fmt.Errorf("error updating: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
updateFields := map[string]interface{}{}
|
|
|
|
|
@ -246,62 +266,83 @@ func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len(updateFields) == 0 {
|
|
|
|
|
return fmt.Errorf("error no update fields matched using fieldMaskPaths: %s", fieldMaskPaths)
|
|
|
|
|
return NoRowsAffected, fmt.Errorf("error no update fields matched using fieldMaskPaths: %s", fieldMaskPaths)
|
|
|
|
|
}
|
|
|
|
|
if err := rw.underlying.Model(i).Updates(updateFields).Error; err != nil {
|
|
|
|
|
return fmt.Errorf("error updating: %w", err)
|
|
|
|
|
underlying := rw.underlying.Model(i).Updates(updateFields)
|
|
|
|
|
if underlying.Error != nil {
|
|
|
|
|
return NoRowsAffected, fmt.Errorf("error updating: %w", underlying.Error)
|
|
|
|
|
}
|
|
|
|
|
rowsUpdated := int(underlying.RowsAffected)
|
|
|
|
|
if withOplog {
|
|
|
|
|
if err := rw.addOplog(ctx, UpdateOp, opts, i); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
return rowsUpdated, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// we need to force a lookupAfterWrite so the resource returned is correctly initialized
|
|
|
|
|
// from the db
|
|
|
|
|
opt = append(opt, WithLookup(true))
|
|
|
|
|
if err := rw.lookupAfterWrite(ctx, i, opt...); err != nil {
|
|
|
|
|
return fmt.Errorf("lookup error after update: %w", err)
|
|
|
|
|
return NoRowsAffected, fmt.Errorf("lookup error after update: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
return rowsUpdated, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Delete an object in the db with options: WithOplog (which requires WithMetadata, WithWrapper)
|
|
|
|
|
func (rw *Db) Delete(ctx context.Context, i interface{}, opt ...Option) error {
|
|
|
|
|
// Delete an object in the db with options: WithOplog (which requires
|
|
|
|
|
// WithMetadata, WithWrapper). Delete returns the number of rows deleted and
|
|
|
|
|
// any errors.
|
|
|
|
|
func (rw *Db) Delete(ctx context.Context, i interface{}, opt ...Option) (int, error) {
|
|
|
|
|
if rw.underlying == nil {
|
|
|
|
|
return errors.New("delete underlying db is nil")
|
|
|
|
|
return NoRowsAffected, errors.New("delete underlying db is nil")
|
|
|
|
|
}
|
|
|
|
|
if i == nil {
|
|
|
|
|
return errors.New("delete interface is nil")
|
|
|
|
|
return NoRowsAffected, errors.New("delete interface is nil")
|
|
|
|
|
}
|
|
|
|
|
opts := GetOpts(opt...)
|
|
|
|
|
withDebug := opts.withDebug
|
|
|
|
|
withOplog := opts.withOplog
|
|
|
|
|
if withOplog {
|
|
|
|
|
_, err := validateOplogArgs(i, opts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return NoRowsAffected, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if withDebug {
|
|
|
|
|
rw.underlying.LogMode(true)
|
|
|
|
|
defer rw.underlying.LogMode(false)
|
|
|
|
|
}
|
|
|
|
|
if err := rw.underlying.Delete(i).Error; err != nil {
|
|
|
|
|
return fmt.Errorf("error deleting: %w", err)
|
|
|
|
|
underlying := rw.underlying.Delete(i)
|
|
|
|
|
if underlying.Error != nil {
|
|
|
|
|
return NoRowsAffected, fmt.Errorf("error deleting: %w", underlying.Error)
|
|
|
|
|
}
|
|
|
|
|
rowsDeleted := int(underlying.RowsAffected)
|
|
|
|
|
if withOplog {
|
|
|
|
|
if err := rw.addOplog(ctx, DeleteOp, opts, i); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
return rowsDeleted, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
return rowsDeleted, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rw *Db) addOplog(ctx context.Context, opType OpType, opts Options, i interface{}) error {
|
|
|
|
|
func validateOplogArgs(i interface{}, opts Options) (oplog.ReplayableMessage, error) {
|
|
|
|
|
oplogArgs := opts.oplogOpts
|
|
|
|
|
if oplogArgs.wrapper == nil {
|
|
|
|
|
return errors.New("error wrapper is nil for WithWrapper")
|
|
|
|
|
return nil, errors.New("error no wrapper WithOplog")
|
|
|
|
|
}
|
|
|
|
|
if len(oplogArgs.metadata) == 0 {
|
|
|
|
|
return errors.New("error no metadata for WithOplog")
|
|
|
|
|
return nil, errors.New("error no metadata for WithOplog")
|
|
|
|
|
}
|
|
|
|
|
replayable, ok := i.(oplog.ReplayableMessage)
|
|
|
|
|
if !ok {
|
|
|
|
|
return errors.New("error not a replayable message for WithOplog")
|
|
|
|
|
return nil, errors.New("error not a replayable message for WithOplog")
|
|
|
|
|
}
|
|
|
|
|
return replayable, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rw *Db) addOplog(ctx context.Context, opType OpType, opts Options, i interface{}) error {
|
|
|
|
|
oplogArgs := opts.oplogOpts
|
|
|
|
|
replayable, err := validateOplogArgs(i, opts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
gdb := rw.underlying
|
|
|
|
|
withDebug := opts.withDebug
|
|
|
|
|
|