@ -3,7 +3,6 @@ package db
import (
"context"
"database/sql"
stderrors "errors"
"fmt"
"reflect"
"strings"
@ -180,13 +179,14 @@ func New(underlying *gorm.DB) *Db {
// Exec will execute the sql with the values as parameters. The int returned
// is the number of rows affected by the sql. No options are currently
// supported.
func ( rw * Db ) Exec ( ctx context . Context , sql string , values [ ] interface { } , opt ... Option ) ( int , error ) {
func ( rw * Db ) Exec ( _ context . Context , sql string , values [ ] interface { } , _ ... Option ) ( int , error ) {
const op = "db.Exec"
if sql == "" {
return NoRowsAffected , fmt. Errorf ( "missing sql: %w" , errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , "missing sql" )
}
gormDb := rw . underlying . Exec ( sql , values ... )
if gormDb . Error != nil {
return NoRowsAffected , fmt. Errorf ( "exec: failed: %w" , gormDb . Error )
return NoRowsAffected , errors. Wrap ( gormDb . Error , op )
}
return int ( gormDb . RowsAffected ) , nil
}
@ -195,29 +195,32 @@ func (rw *Db) Exec(ctx context.Context, sql string, values []interface{}, opt ..
// operate within the context of any ongoing transaction for the db.Reader. The
// caller must close the returned *sql.Rows. Query can/should be used in
// combination with ScanRows.
func ( rw * Db ) Query ( ctx context . Context , sql string , values [ ] interface { } , opt ... Option ) ( * sql . Rows , error ) {
func ( rw * Db ) Query ( _ context . Context , sql string , values [ ] interface { } , _ ... Option ) ( * sql . Rows , error ) {
const op = "db.Query"
if sql == "" {
return nil , fmt. Errorf ( "raw missing sql: %w" , errors . ErrInvalidParameter )
return nil , errors. New ( errors . InvalidParameter , op , "missing sql" )
}
gormDb := rw . underlying . Raw ( sql , values ... )
if gormDb . Error != nil {
return nil , fmt. Errorf ( "exec: failed: %w" , gormDb . Error )
return nil , errors. Wrap ( gormDb . Error , op )
}
return gormDb . Rows ( )
}
// Scan rows will scan the rows into the interface
func ( rw * Db ) ScanRows ( rows * sql . Rows , result interface { } ) error {
const op = "db.ScanRows"
if rw . underlying == nil {
return fmt. Errorf ( "scan rows: missing underlying db %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing underlying db" )
}
if isNil ( result ) {
return fmt. Errorf ( "scan rows: result is missing %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing result" )
}
return rw . underlying . ScanRows ( rows , result )
}
func ( rw * Db ) lookupAfterWrite ( ctx context . Context , i interface { } , opt ... Option ) error {
const op = "db.lookupAfterWrite"
opts := GetOpts ( opt ... )
withLookup := opts . withLookup
@ -225,7 +228,7 @@ func (rw *Db) lookupAfterWrite(ctx context.Context, i interface{}, opt ...Option
return nil
}
if err := rw . LookupById ( ctx , i , opt ... ) ; err != nil {
return fmt. Errorf ( "lookup after write: %w" , err )
return errors. Wrap ( err , op )
}
return nil
}
@ -235,22 +238,23 @@ func (rw *Db) lookupAfterWrite(ctx context.Context, i interface{}, opt ...Option
// NewOplogMsg will return in-memory oplog message. WithOplog and NewOplogMsg
// cannot be used together. WithLookup with to force a lookup after create.
func ( rw * Db ) Create ( ctx context . Context , i interface { } , opt ... Option ) error {
const op = "db.Create"
if rw . underlying == nil {
return fmt. Errorf ( "create: missing underlying db: %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing underlying db" )
}
if isNil ( i ) {
return fmt. Errorf ( "create: interface is missing: %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing interface" )
}
opts := GetOpts ( opt ... )
withOplog := opts . withOplog
if withOplog && opts . newOplogMsg != nil {
return fmt. Errorf ( "create: both WithOplog and NewOplogMsg options have been specified: %w ", errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , " both WithOplog and NewOplogMsg options have been specified")
}
if withOplog {
// let's validate oplog options before we start writing to the database
_ , err := validateOplogArgs ( i , opts )
if err != nil {
return fmt. Errorf ( "create: oplog validation failed: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "oplog validation failed" ) )
}
}
// these fields should be nil, since they are not writeable and we want the
@ -260,7 +264,7 @@ func (rw *Db) Create(ctx context.Context, i interface{}, opt ...Option) error {
if ! opts . withSkipVetForWrite {
if vetter , ok := i . ( VetForWriter ) ; ok {
if err := vetter . VetForWrite ( ctx , rw , CreateOp ) ; err != nil {
return fmt. Errorf ( "create: vet for write failed: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "vet for write failed" ) )
}
}
}
@ -269,26 +273,26 @@ func (rw *Db) Create(ctx context.Context, i interface{}, opt ...Option) error {
var err error
ticket , err = rw . GetTicket ( i )
if err != nil {
return fmt. Errorf ( "create: unable to get ticket: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to get ticket" ) )
}
}
if err := rw . underlying . Create ( i ) . Error ; err != nil {
return fmt. Errorf ( "create: failed: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "create failed" ) )
}
if withOplog {
if err := rw . addOplog ( ctx , CreateOp , opts , ticket , i ) ; err != nil {
return err
return err ors. Wrap ( err , op )
}
}
if opts . newOplogMsg != nil {
msg , err := rw . newOplogMessage ( ctx , CreateOp , i )
if err != nil {
return fmt. Errorf ( "create: returning oplog failed: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "returning oplog failed" ) )
}
* opts . newOplogMsg = * msg
}
if err := rw . lookupAfterWrite ( ctx , i , opt ... ) ; err != nil {
return fmt. Errorf ( "create: %w" , err )
return errors. Wrap ( err , op )
}
return nil
}
@ -297,21 +301,22 @@ func (rw *Db) Create(ctx context.Context, i interface{}, opt ...Option) error {
// WithOplog and WithOplogMsgs. WithOplog and WithOplogMsgs may not be used
// together. WithLookup is not a supported option.
func ( rw * Db ) CreateItems ( ctx context . Context , createItems [ ] interface { } , opt ... Option ) error {
const op = "db.CreateItems"
if rw . underlying == nil {
return fmt. Errorf ( "create items: missing underlying db: %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing underlying db" )
}
if len ( createItems ) == 0 {
return fmt. Errorf ( "create items: no interfaces to create: %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing interfaces" )
}
opts := GetOpts ( opt ... )
if opts . withLookup {
return fmt. Errorf ( "create items: with lookup not a supported option: %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "with lookup not a supported option" )
}
if opts . newOplogMsg != nil {
return fmt. Errorf ( "create items: new oplog msg (singular) is not a supported option: %w ", errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , " new oplog msg (singular) is not a supported option")
}
if opts . withOplog && opts . newOplogMsgs != nil {
return fmt. Errorf ( "create items: both WithOplog and NewOplogMsgs options have been specified: %w ", errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , " both WithOplog and NewOplogMsgs options have been specified")
}
// verify that createItems are all the same type.
var foundType reflect . Type
@ -321,35 +326,35 @@ func (rw *Db) CreateItems(ctx context.Context, createItems []interface{}, opt ..
}
currentType := reflect . TypeOf ( v )
if foundType != currentType {
return fmt. Errorf ( "create items: create items contains disparate types. item %d is not a %s: %w ", i , foundType . Name ( ) , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , fmt . Sprintf ( " create items contains disparate types. item %d is not a %s", i , foundType . Name ( ) ) )
}
}
var ticket * store . Ticket
if opts . withOplog {
_ , err := validateOplogArgs ( createItems [ 0 ] , opts )
if err != nil {
return fmt. Errorf ( "create items: oplog validation failed: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "oplog validation failed" ) )
}
ticket , err = rw . GetTicket ( createItems [ 0 ] )
if err != nil {
return fmt. Errorf ( "create items: unable to get ticket: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to get ticket" ) )
}
}
for _ , item := range createItems {
if err := rw . Create ( ctx , item ) ; err != nil {
return fmt. Errorf ( "create items: %w" , err )
return errors. Wrap ( err , op )
}
}
if opts . withOplog {
if err := rw . addOplogForItems ( ctx , CreateOp , opts , ticket , createItems ) ; err != nil {
return fmt. Errorf ( "create items: unable to add oplog: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to add oplog" ) )
}
}
if opts . newOplogMsgs != nil {
msgs , err := rw . oplogMsgsForItems ( ctx , CreateOp , opts , createItems )
if err != nil {
return fmt. Errorf ( "create items: returning oplog msgs failed %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "returning oplog msgs failed" ) )
}
* opts . newOplogMsgs = append ( * opts . newOplogMsgs , msgs ... )
}
@ -376,46 +381,47 @@ func (rw *Db) CreateItems(ctx context.Context, createItems []interface{}, opt ..
// version matches the WithVersion option. Zero is not a valid value for the
// WithVersion option and will return an error.
func ( rw * Db ) Update ( ctx context . Context , i interface { } , fieldMaskPaths [ ] string , setToNullPaths [ ] string , opt ... Option ) ( int , error ) {
const op = "db.Update"
if rw . underlying == nil {
return NoRowsAffected , fmt. Errorf ( "update: missing underlying db %w" , errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , "missing underlying db" )
}
if isNil ( i ) {
return NoRowsAffected , fmt. Errorf ( "update: interface is missing %w" , errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , "missing interface" )
}
if len ( fieldMaskPaths ) == 0 && len ( setToNullPaths ) == 0 {
return NoRowsAffected , stderrors. New ( "update: both fieldMaskPaths and setToNullPaths are missing")
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , " both fieldMaskPaths and setToNullPaths are missing")
}
opts := GetOpts ( opt ... )
withOplog := opts . withOplog
if withOplog && opts . newOplogMsg != nil {
return NoRowsAffected , fmt. Errorf ( "update: both WithOplog and NewOplogMsg options have been specified: %w ", errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , " both WithOplog and NewOplogMsg options have been specified")
}
// we need to filter out some non-updatable fields (like: CreateTime, etc)
fieldMaskPaths = filterPaths ( fieldMaskPaths )
setToNullPaths = filterPaths ( setToNullPaths )
if len ( fieldMaskPaths ) == 0 && len ( setToNullPaths ) == 0 {
return NoRowsAffected , fmt. Errorf ( "update: after filtering non-updated fields, there are no fields left in fieldMaskPaths or setToNullPaths")
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , " after filtering non-updated fields, there are no fields left in fieldMaskPaths or setToNullPaths")
}
updateFields , err := common . UpdateFields ( i , fieldMaskPaths , setToNullPaths )
if err != nil {
return NoRowsAffected , fmt. Errorf ( "update: getting update fields failed: %w" , err )
return NoRowsAffected , errors. Wrap ( err , op , errors . WithMsg ( "getting update fields failed" ) )
}
if len ( updateFields ) == 0 {
return NoRowsAffected , fmt. Errorf ( "update: no fields matched using fieldMaskPaths %s", fieldMaskPaths )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , fmt . Sprintf ( " no fields matched using fieldMaskPaths %s", fieldMaskPaths ) )
}
// This is not a boundary scope, but rather a gorm Scope:
// https://godoc.org/github.com/jinzhu/gorm#DB.NewScope
scope := rw . underlying . NewScope ( i )
if scope . PrimaryKeyZero ( ) {
return NoRowsAffected , fmt. Errorf ( "update: primary key is not set")
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , " primary key is not set")
}
for _ , f := range scope . PrimaryFields ( ) {
if contains ( fieldMaskPaths , f . Name ) {
return NoRowsAffected , fmt. Errorf ( "update: not allowed on primary key field %s: %w" , f . Name , errors . ErrInvalidFieldMask )
return NoRowsAffected , errors. New ( errors . InvalidFieldMask , op , fmt . Sprintf ( "not allowed on primary key field %s" , f . Name ) )
}
}
@ -423,13 +429,13 @@ func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string
// let's validate oplog options before we start writing to the database
_ , err := validateOplogArgs ( i , opts )
if err != nil {
return NoRowsAffected , fmt. Errorf ( "update: oplog validation failed: %w" , err )
return NoRowsAffected , errors. Wrap ( err , op , errors . WithMsg ( "oplog validation failed" ) )
}
}
if ! opts . withSkipVetForWrite {
if vetter , ok := i . ( VetForWriter ) ; ok {
if err := vetter . VetForWrite ( ctx , rw , UpdateOp , WithFieldMaskPaths ( fieldMaskPaths ) , WithNullPaths ( setToNullPaths ) ) ; err != nil {
return NoRowsAffected , fmt. Errorf ( "update: vet for write failed: %w" , err )
return NoRowsAffected , errors. Wrap ( err , op , errors . WithMsg ( "vet for write failed" ) )
}
}
}
@ -438,7 +444,7 @@ func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string
var err error
ticket , err = rw . GetTicket ( i )
if err != nil {
return NoRowsAffected , fmt. Errorf ( "update: unable to get ticket: %w" , err )
return NoRowsAffected , errors. Wrap ( err , op , errors . WithMsg ( "unable to get ticket" ) )
}
}
var underlying * gorm . DB
@ -448,10 +454,10 @@ func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string
var args [ ] interface { }
if opts . WithVersion != nil {
if * opts . WithVersion == 0 {
return NoRowsAffected , fmt. Errorf ( "update: with version option is zero: %w" , errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , "with version option is zero" )
}
if _ , ok := scope . FieldByName ( "version" ) ; ! ok {
return NoRowsAffected , fmt. Errorf ( "update: %s does not have a version field", scope . TableName ( ) )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , fmt . Sprintf ( " %s does not have a version field", scope . TableName ( ) ) )
}
where , args = append ( where , "version = ?" ) , append ( args , opts . WithVersion )
}
@ -463,10 +469,10 @@ func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string
underlying = rw . underlying . Model ( i ) . Updates ( updateFields )
}
if underlying . Error != nil {
if err == gorm . ErrRecordNotFound {
return NoRowsAffected , fmt. Errorf ( "update: failed: %w" , errors . ErrRecordNotFound )
if und erlying. Erro r == gorm . ErrRecordNotFound {
return NoRowsAffected , errors. E ( errors . WithCode ( errors . RecordNotFound ) , errors . WithOp ( op ) )
}
return NoRowsAffected , fmt. Errorf ( "update: failed: %w" , underlying . Error )
return NoRowsAffected , errors. Wrap ( underlying . Error , op )
}
rowsUpdated := int ( underlying . RowsAffected )
if rowsUpdated > 0 && ( withOplog || opts . newOplogMsg != nil ) {
@ -484,13 +490,13 @@ func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string
}
if withOplog {
if err := rw . addOplog ( ctx , UpdateOp , oplogOpts , ticket , i ) ; err != nil {
return rowsUpdated , fmt. Errorf ( "update: add oplog failed %w" , err )
return rowsUpdated , errors. Wrap ( err , op , errors . WithMsg ( "add oplog failed" ) )
}
}
if opts . newOplogMsg != nil {
msg , err := rw . newOplogMessage ( ctx , UpdateOp , i , WithFieldMaskPaths ( oplogFieldMasks ) , WithNullPaths ( oplogNullPaths ) )
if err != nil {
return rowsUpdated , fmt. Errorf ( "update: returning oplog failed %w" , err )
return rowsUpdated , errors. Wrap ( err , op , errors . WithMsg ( "returning oplog failed" ) )
}
* opts . newOplogMsg = * msg
}
@ -499,7 +505,7 @@ func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string
// from the db
opt = append ( opt , WithLookup ( true ) )
if err := rw . lookupAfterWrite ( ctx , i , opt ... ) ; err != nil {
return NoRowsAffected , fmt. Errorf ( "update: %w" , err )
return NoRowsAffected , errors. Wrap ( err , op )
}
return rowsUpdated , nil
}
@ -510,29 +516,30 @@ func (rw *Db) Update(ctx context.Context, i interface{}, fieldMaskPaths []string
// WithWhere allows specifying a constraint. Delete returns the number of rows
// deleted and any errors.
func ( rw * Db ) Delete ( ctx context . Context , i interface { } , opt ... Option ) ( int , error ) {
const op = "db.Delete"
if rw . underlying == nil {
return NoRowsAffected , fmt. Errorf ( "delete: missing underlying db %w" , errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , "missing underlying db" )
}
if isNil ( i ) {
return NoRowsAffected , fmt. Errorf ( "delete: interface is missing %w" , errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , "missing interface" )
}
opts := GetOpts ( opt ... )
withOplog := opts . withOplog
if withOplog && opts . newOplogMsg != nil {
return NoRowsAffected , fmt. Errorf ( "delete: both WithOplog and NewOplogMsg options have been specified: %w ", errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , " both WithOplog and NewOplogMsg options have been specified")
}
// This is not a boundary scope, but rather a gorm Scope:
// https://godoc.org/github.com/jinzhu/gorm#DB.NewScope
scope := rw . underlying . NewScope ( i )
if opts . withWhereClause == "" {
if scope . PrimaryKeyZero ( ) {
return NoRowsAffected , fmt. Errorf ( "delete: primary key is not set")
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , " primary key is not set")
}
}
if withOplog {
_ , err := validateOplogArgs ( i , opts )
if err != nil {
return NoRowsAffected , fmt. Errorf ( "delete: oplog validation failed %w" , err )
return NoRowsAffected , errors. Wrap ( err , op , errors . WithMsg ( "oplog validation failed" ) )
}
}
var ticket * store . Ticket
@ -540,7 +547,7 @@ func (rw *Db) Delete(ctx context.Context, i interface{}, opt ...Option) (int, er
var err error
ticket , err = rw . GetTicket ( i )
if err != nil {
return NoRowsAffected , fmt. Errorf ( "delete: unable to get ticket: %w" , err )
return NoRowsAffected , errors. Wrap ( err , op , errors . WithMsg ( "unable to get ticket" ) )
}
}
db := rw . underlying
@ -549,19 +556,19 @@ func (rw *Db) Delete(ctx context.Context, i interface{}, opt ...Option) (int, er
}
db = db . Delete ( i )
if db . Error != nil {
return NoRowsAffected , fmt. Errorf ( "delete: failed %w" , db . Error )
return NoRowsAffected , errors. Wrap ( db . Error , op )
}
rowsDeleted := int ( db . RowsAffected )
if rowsDeleted > 0 && ( withOplog || opts . newOplogMsg != nil ) {
if withOplog {
if err := rw . addOplog ( ctx , DeleteOp , opts , ticket , i ) ; err != nil {
return rowsDeleted , fmt. Errorf ( "delete: add oplog failed %w" , err )
return rowsDeleted , errors. Wrap ( db . Error , op , errors . WithMsg ( "add oplog failed" ) )
}
}
if opts . newOplogMsg != nil {
msg , err := rw . newOplogMessage ( ctx , DeleteOp , i )
if err != nil {
return rowsDeleted , fmt. Errorf ( "delete: returning oplog failed %w" , err )
return rowsDeleted , errors. Wrap ( db . Error , op , errors . WithMsg ( "returning oplog failed" ) )
}
* opts . newOplogMsg = * msg
}
@ -573,18 +580,19 @@ func (rw *Db) Delete(ctx context.Context, i interface{}, opt ...Option) (int, er
// WithOplog and WithOplogMsgs. WithOplog and WithOplogMsgs may not be used
// together.
func ( rw * Db ) DeleteItems ( ctx context . Context , deleteItems [ ] interface { } , opt ... Option ) ( int , error ) {
const op = "db.DeleteItems"
if rw . underlying == nil {
return NoRowsAffected , fmt. Errorf ( "delete items: missing underlying db: %w" , errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , "missing underlying db" )
}
if len ( deleteItems ) == 0 {
return NoRowsAffected , fmt. Errorf ( "delete items: no interfaces to delete: %w" , errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , "no interfaces to delete" )
}
opts := GetOpts ( opt ... )
if opts . newOplogMsg != nil {
return NoRowsAffected , fmt. Errorf ( "delete items: new oplog msg (singular) is not a supported option: %w ", errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , " new oplog msg (singular) is not a supported option")
}
if opts . withOplog && opts . newOplogMsgs != nil {
return NoRowsAffected , fmt. Errorf ( "delete items: both WithOplog and NewOplogMsgs options have been specified: %w ", errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , " both WithOplog and NewOplogMsgs options have been specified")
}
// verify that createItems are all the same type.
var foundType reflect . Type
@ -594,7 +602,7 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt ..
}
currentType := reflect . TypeOf ( v )
if foundType != currentType {
return NoRowsAffected , fmt. Errorf ( "delete items: items contain disparate types. item %d is not a %s: %w ", i , foundType . Name ( ) , errors . ErrInvalidParameter )
return NoRowsAffected , errors. New ( errors . InvalidParameter , op , fmt . Sprintf ( " items contain disparate types. item %d is not a %s", i , foundType . Name ( ) ) )
}
}
@ -602,11 +610,11 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt ..
if opts . withOplog {
_ , err := validateOplogArgs ( deleteItems [ 0 ] , opts )
if err != nil {
return NoRowsAffected , fmt. Errorf ( "delete items: oplog validation failed: %w" , err )
return NoRowsAffected , errors. Wrap ( err , op , errors . WithMsg ( "oplog validation failed" ) )
}
ticket , err = rw . GetTicket ( deleteItems [ 0 ] )
if err != nil {
return NoRowsAffected , fmt. Errorf ( "delete items: unable to get ticket: %w" , err )
return NoRowsAffected , errors. Wrap ( err , op , errors . WithMsg ( "unable to get ticket" ) )
}
}
rowsDeleted := 0
@ -616,20 +624,20 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt ..
// relationship between Create and CreateItems).
underlying := rw . underlying . Delete ( item )
if underlying . Error != nil {
return rowsDeleted , fmt. Errorf ( "delete: failed: %w" , underlying . Error )
return rowsDeleted , errors. Wrap ( underlying . Error , op )
}
rowsDeleted += int ( underlying . RowsAffected )
}
if rowsDeleted > 0 && ( opts . withOplog || opts . newOplogMsgs != nil ) {
if opts . withOplog {
if err := rw . addOplogForItems ( ctx , DeleteOp , opts , ticket , deleteItems ) ; err != nil {
return rowsDeleted , fmt. Errorf ( "delete items: unable to add oplog: %w" , err )
return rowsDeleted , errors. Wrap ( err , op , errors . WithMsg ( "unable to add oplog" ) )
}
}
if opts . newOplogMsgs != nil {
msgs , err := rw . oplogMsgsForItems ( ctx , DeleteOp , opts , deleteItems )
if err != nil {
return rowsDeleted , fmt. Errorf ( "delete items: returning oplog msgs failed %w" , err )
return rowsDeleted , errors. Wrap ( err , op , errors . WithMsg ( "returning oplog msgs failed" ) )
}
* opts . newOplogMsgs = append ( * opts . newOplogMsgs , msgs ... )
}
@ -638,31 +646,33 @@ func (rw *Db) DeleteItems(ctx context.Context, deleteItems []interface{}, opt ..
}
func validateOplogArgs ( i interface { } , opts Options ) ( oplog . ReplayableMessage , error ) {
const op = "db.validateOplogArgs"
oplogArgs := opts . oplogOpts
if oplogArgs . wrapper == nil {
return nil , fmt. Errorf ( "error no wrapper WithOplog: %w" , errors . ErrInvalidParameter )
return nil , errors. New ( errors . InvalidParameter , op , "missing wrapper" )
}
if len ( oplogArgs . metadata ) == 0 {
return nil , fmt. Errorf ( "error no metadata for WithOplog: %w" , errors . ErrInvalidParameter )
return nil , errors. New ( errors . InvalidParameter , op , "missing metadata" )
}
replayable , ok := i . ( oplog . ReplayableMessage )
if ! ok {
return nil , stderrors. New ( "error not a replayable message for WithOplog" )
return nil , errors. E ( errors . WithOp ( op ) , errors . WithMsg ( "not a replayable message" ) )
}
return replayable , nil
}
func ( rw * Db ) getTicketFor ( aggregateName string ) ( * store . Ticket , error ) {
const op = "db.getTicketFor"
if rw . underlying == nil {
return nil , fmt. Errorf ( "get ticket for %s: underlying db missing: %w" , aggregateName , errors . ErrInvalidParameter )
return nil , errors. New ( errors . InvalidParameter , op , fmt . Sprintf ( "%s: underlying db missing" , aggregateName ) )
}
ticketer , err := oplog . NewGormTicketer ( rw . underlying , oplog . WithAggregateNames ( true ) )
if err != nil {
return nil , fmt. Errorf ( "get ticket for %s: unable to get Ticketer %w" , aggregateName , err )
return nil , errors. Wrap ( err , op , errors . WithMsg ( fmt . Sprintf ( "%s: unable to get Ticketer" , aggregateName ) ) )
}
ticket , err := ticketer . GetTicket ( aggregateName )
if err != nil {
return nil , fmt. Errorf ( "get ticket for %s: unable to get ticket %w" , aggregateName , err )
return nil , errors. Wrap ( err , op , errors . WithMsg ( fmt . Sprintf ( "%s: unable to get ticket" , aggregateName ) ) )
}
return ticket , nil
}
@ -670,22 +680,24 @@ func (rw *Db) getTicketFor(aggregateName string) (*store.Ticket, error) {
// GetTicket returns an oplog ticket for the aggregate root of "i" which can
// be used to WriteOplogEntryWith for that aggregate root.
func ( rw * Db ) GetTicket ( i interface { } ) ( * store . Ticket , error ) {
const op = "db.GetTicket"
if rw . underlying == nil {
return nil , fmt. Errorf ( "get ticket: underlying db missing: %w" , errors . ErrInvalidParameter )
return nil , errors. New ( errors . InvalidParameter , op , "missing underlying db" )
}
if isNil ( i ) {
return nil , fmt. Errorf ( "get ticket: interface is missing %w" , errors . ErrInvalidParameter )
return nil , errors. New ( errors . InvalidParameter , op , "missing interface" )
}
replayable , ok := i . ( oplog . ReplayableMessage )
if ! ok {
return nil , fmt. Errorf ( "get ticket: not a replayable message %w" , errors . ErrInvalidParameter )
return nil , errors. New ( errors . InvalidParameter , op , "not a replayable message" )
}
return rw . getTicketFor ( replayable . TableName ( ) )
}
func ( rw * Db ) oplogMsgsForItems ( ctx context . Context , opType OpType , opts Options , items [ ] interface { } ) ( [ ] * oplog . Message , error ) {
const op = "db.oplogMsgsForItems"
if len ( items ) == 0 {
return nil , fmt. Errorf ( "oplog msgs for items: items is empty: %w" , errors . ErrInvalidParameter )
return nil , errors. New ( errors . InvalidParameter , op , "missing items" )
}
oplogMsgs := [ ] * oplog . Message { }
var foundType reflect . Type
@ -695,11 +707,11 @@ func (rw *Db) oplogMsgsForItems(ctx context.Context, opType OpType, opts Options
}
currentType := reflect . TypeOf ( item )
if foundType != currentType {
return nil , fmt. Errorf ( "oplog msgs for items: items contains disparate types. item (%d) %s is not a %s: %w" , i , currentType , foundType , errors . ErrInvalidParameter )
return nil , errors. New ( errors . InvalidParameter , op , fmt . Sprintf ( "items contains disparate types. item (%d) %s is not a %s" , i , currentType , foundType ) )
}
msg , err := rw . newOplogMessage ( ctx , opType , item , WithFieldMaskPaths ( opts . WithFieldMaskPaths ) , WithNullPaths ( opts . WithNullPaths ) )
if err != nil {
return nil , fmt. Errorf ( "oplog msgs for items: %w" , err )
return nil , errors. Wrap ( err , op )
}
oplogMsgs = append ( oplogMsgs , msg )
}
@ -710,35 +722,33 @@ func (rw *Db) oplogMsgsForItems(ctx context.Context, opType OpType, opts Options
// item. Items must all be of the same type. Only CreateOp and DeleteOp are
// currently supported operations.
func ( rw * Db ) addOplogForItems ( ctx context . Context , opType OpType , opts Options , ticket * store . Ticket , items [ ] interface { } ) error {
const op = "db.addOplogForItems"
oplogArgs := opts . oplogOpts
if ticket == nil {
return fmt . Errorf ( "oplog for items: ticket is missing: %w" , errors . ErrInvalidParameter )
}
if items == nil {
return fmt . Errorf ( "oplog for items: items are missing: %w" , errors . ErrInvalidParameter )
return errors . New ( errors . InvalidParameter , op , "missing ticket" )
}
if len ( items ) == 0 {
return fmt. Errorf ( "oplog for items: items is empty: %w" , errors . ErrInvalidParameter )
return errors . New ( errors . InvalidParameter , op , "missing items" )
}
if oplogArgs . metadata == nil {
return fmt. Errorf ( "oplog for items: metadata is missing: %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing metadata" )
}
if oplogArgs . wrapper == nil {
return fmt. Errorf ( "oplog for items: wrapper is missing: %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing wrapper" )
}
oplogMsgs , err := rw . oplogMsgsForItems ( ctx , opType , opts , items )
if err != nil {
return fmt. Errorf ( "oplog for items: %w" , err )
return errors. Wrap ( err , op )
}
replayable , err := validateOplogArgs ( items [ 0 ] , opts )
if err != nil {
return fmt. Errorf ( "oplog for items: oplog validation failed %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "oplog validation failed" ) )
}
ticketer , err := oplog . NewGormTicketer ( rw . underlying , oplog . WithAggregateNames ( true ) )
if err != nil {
return fmt. Errorf ( "oplog for items: unable to get Ticketer %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to get Ticketer" ) )
}
entry , err := oplog . NewEntry (
replayable . TableName ( ) ,
@ -747,7 +757,7 @@ func (rw *Db) addOplogForItems(ctx context.Context, opType OpType, opts Options,
ticketer ,
)
if err != nil {
return fmt. Errorf ( "oplog for items: unable to create oplog entry %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to create oplog entry" ) )
}
if err := entry . WriteEntryWith (
ctx ,
@ -755,23 +765,24 @@ func (rw *Db) addOplogForItems(ctx context.Context, opType OpType, opts Options,
ticket ,
oplogMsgs ... ,
) ; err != nil {
return fmt. Errorf ( "oplog for items: unable to write oplog entry %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to write oplog entry" ) )
}
return nil
}
func ( rw * Db ) addOplog ( ctx context . Context , opType OpType , opts Options , ticket * store . Ticket , i interface { } ) error {
const op = "db.addOplog"
oplogArgs := opts . oplogOpts
replayable , err := validateOplogArgs ( i , opts )
if err != nil {
return err
return err ors. Wrap ( err , op )
}
if ticket == nil {
return fmt. Errorf ( "add oplog: missing ticket %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing ticket" )
}
ticketer , err := oplog . NewGormTicketer ( rw . underlying , oplog . WithAggregateNames ( true ) )
if err != nil {
return fmt. Errorf ( "add oplog: unable to get Ticketer %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to get Ticketer" ) )
}
entry , err := oplog . NewEntry (
replayable . TableName ( ) ,
@ -784,7 +795,7 @@ func (rw *Db) addOplog(ctx context.Context, opType OpType, opts Options, ticket
}
msg , err := rw . newOplogMessage ( ctx , opType , i , WithFieldMaskPaths ( opts . WithFieldMaskPaths ) , WithNullPaths ( opts . WithNullPaths ) )
if err != nil {
return fmt. Errorf ( "add oplog: %w" , err )
return errors. Wrap ( err , op )
}
err = entry . WriteEntryWith (
ctx ,
@ -793,36 +804,34 @@ func (rw *Db) addOplog(ctx context.Context, opType OpType, opts Options, ticket
msg ,
)
if err != nil {
return fmt. Errorf ( "add oplog: unable to write oplog entry: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to write oplog entry" ) )
}
return nil
}
// WriteOplogEntryWith will write an oplog entry with the msgs provided for
// the ticket's aggregateName. No options are currently supported.
func ( rw * Db ) WriteOplogEntryWith ( ctx context . Context , wrapper wrapping . Wrapper , ticket * store . Ticket , metadata oplog . Metadata , msgs [ ] * oplog . Message , opt ... Option ) error {
func ( rw * Db ) WriteOplogEntryWith ( ctx context . Context , wrapper wrapping . Wrapper , ticket * store . Ticket , metadata oplog . Metadata , msgs [ ] * oplog . Message , _ ... Option ) error {
const op = "db.WriteOplogEntryWith"
if wrapper == nil {
return fmt. Errorf ( "write oplog: wrapper is unset %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing wrapper" )
}
if ticket == nil {
return fmt. Errorf ( "write oplog: ticket is unset %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing ticket" )
}
if len ( msgs ) == 0 {
return fmt. Errorf ( "write oplog: msgs are empty %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing msgs" )
}
if rw . underlying == nil {
return fmt . Errorf ( "write oplog: underlying is unset %w" , errors . ErrInvalidParameter )
}
if metadata == nil {
return fmt . Errorf ( "write oplog: metadata is unset %w" , errors . ErrInvalidParameter )
return errors . New ( errors . InvalidParameter , op , "missing underlying db" )
}
if len ( metadata ) == 0 {
return fmt. Errorf ( "write oplog: metadata is empty %w" , errors . ErrInvalidParameter )
return errors . New ( errors . InvalidParameter , op , "missing metadata" )
}
ticketer , err := oplog . NewGormTicketer ( rw . underlying , oplog . WithAggregateNames ( true ) )
if err != nil {
return fmt. Errorf ( "write oplog: unable to get Ticketer %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to get Ticketer" ) )
}
entry , err := oplog . NewEntry (
@ -832,7 +841,7 @@ func (rw *Db) WriteOplogEntryWith(ctx context.Context, wrapper wrapping.Wrapper,
ticketer ,
)
if err != nil {
return fmt. Errorf ( "write oplog: unable to create oplog entry: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to create oplog entry" ) )
}
err = entry . WriteEntryWith (
ctx ,
@ -841,16 +850,17 @@ func (rw *Db) WriteOplogEntryWith(ctx context.Context, wrapper wrapping.Wrapper,
msgs ... ,
)
if err != nil {
return fmt. Errorf ( "write oplog: unable to write oplog entry: %w" , err )
return errors. Wrap ( err , op , errors . WithMsg ( "unable to write oplog entry" ) )
}
return nil
}
func ( rw * Db ) newOplogMessage ( ctx context . Context , opType OpType , i interface { } , opt ... Option ) ( * oplog . Message , error ) {
func ( rw * Db ) newOplogMessage ( _ context . Context , opType OpType , i interface { } , opt ... Option ) ( * oplog . Message , error ) {
const op = "db.newOplogMessage"
opts := GetOpts ( opt ... )
replayable , ok := i . ( oplog . ReplayableMessage )
if ! ok {
return nil , stderrors. New ( "error not a replayable interface")
return nil , errors. New ( errors . InvalidParameter , op , " not a replayable interface")
}
msg := oplog . Message {
Message : i . ( proto . Message ) ,
@ -866,7 +876,7 @@ func (rw *Db) newOplogMessage(ctx context.Context, opType OpType, i interface{},
case DeleteOp :
msg . OpType = oplog . OpType_OP_TYPE_DELETE
default :
return nil , fmt. Error f( "operation type %v is not supported" , opType )
return nil , errors. New ( errors . InvalidParameter , op , fmt . Sprint f( "operation type %v is not supported" , opType ) )
}
return & msg , nil
}
@ -876,13 +886,14 @@ func (rw *Db) newOplogMessage(ctx context.Context, opType OpType, i interface{},
// means that the object may be sent to the db several times (retried), so things like the primary key must
// be reset before retry
func ( w * Db ) DoTx ( ctx context . Context , retries uint , backOff Backoff , Handler TxHandler ) ( RetryInfo , error ) {
const op = "db.DoTx"
if w . underlying == nil {
return RetryInfo { } , stderrors. New ( "do underlying db is nil ")
return RetryInfo { } , errors. New ( errors . InvalidParameter , op , "missing underlying db ")
}
info := RetryInfo { }
for attempts := uint ( 1 ) ; ; attempts ++ {
if attempts > retries + 1 {
return info , fmt. Error f( "Too many retries: %d of %d" , attempts - 1 , retries + 1 )
return info , errors. New ( errors . MaxRetries , op , fmt . Sprint f( "Too many retries: %d of %d" , attempts - 1 , retries + 1 ) )
}
// step one of this, start a transaction...
@ -891,7 +902,7 @@ func (w *Db) DoTx(ctx context.Context, retries uint, backOff Backoff, Handler Tx
rw := & Db { newTx }
if err := Handler ( rw , rw ) ; err != nil {
if err := newTx . Rollback ( ) . Error ; err != nil {
return info , err
return info , err ors. Wrap ( err , op )
}
if errors . Is ( err , oplog . ErrTicketAlreadyRedeemed ) {
d := backOff . Duration ( attempts )
@ -900,14 +911,14 @@ func (w *Db) DoTx(ctx context.Context, retries uint, backOff Backoff, Handler Tx
time . Sleep ( d )
continue
}
return info , err
return info , err ors. Wrap ( err , op )
}
if err := newTx . Commit ( ) . Error ; err != nil {
if err := newTx . Rollback ( ) . Error ; err != nil {
return info , err
return info , err ors. Wrap ( err , op )
}
return info , err
return info , err ors. Wrap ( err , op )
}
return info , nil // it all worked!!!
}
@ -915,27 +926,29 @@ func (w *Db) DoTx(ctx context.Context, retries uint, backOff Backoff, Handler Tx
// LookupByPublicId will lookup resource by its public_id or private_id, which
// must be unique. Options are ignored.
func ( rw * Db ) LookupById ( ctx context . Context , resourceWithIder interface { } , opt ... Option ) error {
func ( rw * Db ) LookupById ( _ context . Context , resourceWithIder interface { } , _ ... Option ) error {
const op = "db.LookupById"
if rw . underlying == nil {
return fmt. Errorf ( "lookup by id: underlying db nil %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "missing underlying db" )
}
if reflect . ValueOf ( resourceWithIder ) . Kind ( ) != reflect . Ptr {
return fmt. Errorf ( "lookup by id: interface parameter must to be a pointer: %w" , errors . ErrInvalidParameter )
return errors. New ( errors . InvalidParameter , op , "interface parameter must to be a pointer" )
}
primaryKey , where , err := primaryKeyWhere ( resourceWithIder )
if err != nil {
return fmt. Errorf ( "lookup by id: %w" , err )
return errors. Wrap ( err , op )
}
if err := rw . underlying . Where ( where , primaryKey ) . First ( resourceWithIder ) . Error ; err != nil {
if err == gorm . ErrRecordNotFound {
return errors . ErrRecordNotFound
return errors . E ( errors . WithCode ( errors . RecordNotFound ) , errors . WithOp ( op ) )
}
return err
return err ors. Wrap ( err , op )
}
return nil
}
func primaryKeyWhere ( resourceWithIder interface { } ) ( pkey string , w string , e error ) {
const op = "db.primaryKeyWhere"
var primaryKey , where string
switch resourceType := resourceWithIder . ( type ) {
case ResourcePublicIder :
@ -945,10 +958,10 @@ func primaryKeyWhere(resourceWithIder interface{}) (pkey string, w string, e err
primaryKey = resourceType . GetPrivateId ( )
where = "private_id = ?"
default :
return "" , "" , fmt. Errorf ( "unsupported interface type %w" , errors . ErrInvalidParameter )
return "" , "" , errors. New ( errors . InvalidParameter , op , fmt . Sprintf ( "unsupported interface type %T" , resourceWithIder ) )
}
if primaryKey == "" {
return "" , "" , fmt. Errorf ( "primary key unset %w" , errors . ErrInvalidParameter )
return "" , "" , errors. New ( errors . InvalidParameter , op , "missing primary key" )
}
return primaryKey , where , nil
}
@ -960,16 +973,17 @@ func (rw *Db) LookupByPublicId(ctx context.Context, resource ResourcePublicIder,
}
// LookupWhere will lookup the first resource using a where clause with parameters (it only returns the first one)
func ( rw * Db ) LookupWhere ( ctx context . Context , resource interface { } , where string , args ... interface { } ) error {
func ( rw * Db ) LookupWhere ( _ context . Context , resource interface { } , where string , args ... interface { } ) error {
const op = "db.LookupWhere"
if rw . underlying == nil {
return stderrors. New ( "error underlying db nil for lookup by ")
return errors. New ( errors . InvalidParameter , op , "missing underlying db ")
}
if reflect . ValueOf ( resource ) . Kind ( ) != reflect . Ptr {
return stderrors. New ( "error interface parameter must to be a pointer for lookup by ")
return errors. New ( errors . InvalidParameter , op , "interface parameter must to be a pointer ")
}
if err := rw . underlying . Where ( where , args ... ) . First ( resource ) . Error ; err != nil {
if err == gorm . ErrRecordNotFound {
return errors . ErrRecordNotFound
return errors . E ( errors . WithCode ( errors . RecordNotFound ) , errors . WithOp ( op ) )
}
return err
}
@ -980,13 +994,14 @@ func (rw *Db) LookupWhere(ctx context.Context, resource interface{}, where strin
// clause with parameters. Supports the WithLimit option. If
// WithLimit < 0, then unlimited results are returned. If WithLimit == 0, then
// default limits are used for results. Supports the WithOrder option.
func ( rw * Db ) SearchWhere ( ctx context . Context , resources interface { } , where string , args [ ] interface { } , opt ... Option ) error {
func ( rw * Db ) SearchWhere ( _ context . Context , resources interface { } , where string , args [ ] interface { } , opt ... Option ) error {
const op = "db.SearchWhere"
opts := GetOpts ( opt ... )
if rw . underlying == nil {
return stderrors. New ( "error underlying db nil for search by ")
return errors. New ( errors . InvalidParameter , op , "missing underlying db ")
}
if reflect . ValueOf ( resources ) . Kind ( ) != reflect . Ptr {
return stderrors. New ( "error interface parameter must to be a pointer for search by ")
return errors. New ( errors . InvalidParameter , op , "interface parameter must to be a pointer ")
}
var err error
db := rw . underlying . Order ( opts . withOrder )
@ -1011,7 +1026,7 @@ func (rw *Db) SearchWhere(ctx context.Context, resources interface{}, where stri
err = db . Find ( resources ) . Error
if err != nil {
// searching with a slice parameter does not return a gorm.ErrRecordNotFound
return err
return err ors. Wrap ( err , op )
}
return nil
}
@ -1070,6 +1085,7 @@ func contains(ss []string, t string) bool {
// A depth of 2 will change i and i's children. A depth of 1 will change i
// but no children of i. A depth of 0 will return with no changes to i.
func Clear ( i interface { } , fields [ ] string , depth int ) error {
const op = "db.Clear"
if len ( fields ) == 0 || depth == 0 {
return nil
}
@ -1081,13 +1097,13 @@ func Clear(i interface{}, fields []string, depth int) error {
v := reflect . ValueOf ( i )
switch v . Kind ( ) {
default :
return errors . ErrInvalidParameter
case reflect . Ptr :
if v . IsNil ( ) || v . Elem ( ) . Kind ( ) != reflect . Struct {
return errors . ErrInvalidParameter
return errors . E ( errors . WithCode ( errors . InvalidParameter ) , errors . WithOp ( op ) )
}
clear ( v , fm , depth )
default :
return errors . E ( errors . WithCode ( errors . InvalidParameter ) , errors . WithOp ( op ) )
}
return nil
}