From faa7097e701ee377b515fba88f781fe808399518 Mon Sep 17 00:00:00 2001 From: Johan Brandhorst-Satzkorn Date: Fri, 7 Feb 2025 10:04:21 -0800 Subject: [PATCH 1/2] internal/db: add TransactionManager The TransactionManager serves as the entrypoint to the Database, allowing the user to create new transactions or single reader or writer operations. --- internal/db/db.go | 1 - internal/db/read_writer.go | 9 +- internal/db/transaction_manager.go | 267 +++++++++++++++++++++++++++++ 3 files changed, 272 insertions(+), 5 deletions(-) create mode 100644 internal/db/transaction_manager.go diff --git a/internal/db/db.go b/internal/db/db.go index 1f3a9a58cc..09f0849869 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -14,7 +14,6 @@ import ( "github.com/hashicorp/boundary/internal/event" "github.com/hashicorp/go-dbw" _ "github.com/jackc/pgx/v5" - "gorm.io/driver/postgres" ) diff --git a/internal/db/read_writer.go b/internal/db/read_writer.go index 91d63e7b8c..a5b1d42fb6 100644 --- a/internal/db/read_writer.go +++ b/internal/db/read_writer.go @@ -41,7 +41,11 @@ const ( DescendingOrderBy ) -// Reader interface defines lookups/searching for resources +// TxHandler defines a handler for a func that writes a transaction for use with DoTx +type TxHandler func(Reader, Writer) error + +// Reader interface defines lookups/searching for resources. It does +// not allow for writing to the db. type Reader interface { // LookupById will lookup a resource by its primary key id, which must be // unique. If the resource implements either ResourcePublicIder or @@ -170,9 +174,6 @@ type RetryInfo struct { Backoff time.Duration } -// TxHandler defines a handler for a func that writes a transaction for use with DoTx -type TxHandler func(Reader, Writer) error - // ResourcePublicIder defines an interface that LookupByPublicId() can use to // get the resource's public id. type ResourcePublicIder interface { diff --git a/internal/db/transaction_manager.go b/internal/db/transaction_manager.go new file mode 100644 index 0000000000..05465f8163 --- /dev/null +++ b/internal/db/transaction_manager.go @@ -0,0 +1,267 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package db + +import ( + "context" + stderrors "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/hashicorp/boundary/internal/errors" + "github.com/hashicorp/boundary/internal/oplog" + "github.com/hashicorp/boundary/internal/oplog/store" + "github.com/hashicorp/boundary/internal/util" + "github.com/hashicorp/go-dbw" + wrapping "github.com/hashicorp/go-kms-wrapping/v2" +) + +// RoTxHandler is a function that will be executed within a read-only transaction. +type RoTxHandler func(Reader) error + +// RwTxHandler is a function that will be executed within a read-write transaction. +type RwTxHandler func(NewWriter) error + +type txOptions struct { + retries uint + backOff Backoff +} + +func getTxOptions(opts []TxOption) *txOptions { + txOpts := txOptions{ + retries: StdRetryCnt, + backOff: ExpBackoff{}, + } + for _, opt := range opts { + opt(&txOpts) + } + return &txOpts +} + +// TxOption is used to configure the transaction. +type TxOption func(*txOptions) + +// TransactionManager defines the interface for interacting with the database. +// All database operations require an explicit transaction with a defined backoff +// and retry mechanism. +type TransactionManager interface { + // DoRoTx will start a read-only transaction and execute the handler. Retries + // will be attempted based on the backoff strategy and number of retries. + DoRoTx(ctx context.Context, handler RoTxHandler, opts ...TxOption) (RetryInfo, error) + // DoRwTx will start a read-write transaction and execute the handler. Retries + // will be attempted based on the backoff strategy and number of retries. + DoRwTx(ctx context.Context, handler RwTxHandler, opts ...TxOption) (RetryInfo, error) + + // Writer returns a writer suitable for a single read-write operation. + Writer() NewWriter + // Reader returns a reader suitable for a single read-only operation. + Reader() Reader +} + +// NewWriter interface defines read/write operations for resources. +// TODO: Remove the old Writer interface and rename this Writer. +type NewWriter interface { + Reader + + // Update an object in the db, fieldMask is required and provides + // field_mask.proto paths for fields that should be updated. The i interface + // parameter is the type the caller wants to update in the db and its + // fields are set to the update values. setToNullPaths is optional and + // provides field_mask.proto paths for the fields that should be set to + // null. fieldMaskPaths and setToNullPaths must not intersect. The caller + // is responsible for the transaction life cycle of the writer and if an + // error is returned the caller must decide what to do with the transaction, + // which almost always should be to rollback. Update returns the number of + // rows updated or an error. Supported options: WithOplog. + Update(ctx context.Context, i any, fieldMaskPaths []string, setToNullPaths []string, opt ...Option) (int, error) + + // Create an object in the db with options: WithDebug, WithOplog, NewOplogMsg, + // WithLookup, WithReturnRowsAffected, OnConflict, WithVersion, and + // WithWhere. The caller is responsible for the transaction life cycle of + // the writer and if an error is returned the caller must decide what to do + // with the transaction, which almost always should be to rollback. + Create(ctx context.Context, i any, opt ...Option) error + + // CreateItems will create multiple items of the same type. + // Supported options: WithDebug, WithOplog, WithOplogMsgs, + // WithReturnRowsAffected, OnConflict, WithVersion, and WithWhere. + /// WithOplog and WithOplogMsgs may not be used together. WithLookup is not + // a supported option. The caller is responsible for the transaction life + // cycle of the writer and if an error is returned the caller must decide + // what to do with the transaction, which almost always should be to + // rollback. + CreateItems(ctx context.Context, createItems any, opt ...Option) error + + // Delete an object in the db with options: WithOplog, WithDebug. + // The caller is responsible for the transaction life cycle of the writer + // and if an error is returned the caller must decide what to do with + // the transaction, which almost always should be to rollback. Delete + // returns the number of rows deleted or an error. + Delete(ctx context.Context, i any, opt ...Option) (int, error) + + // DeleteItems will delete multiple items of the same type. + // Supported options: WithOplog and WithOplogMsgs. WithOplog and + // WithOplogMsgs may not be used together. The caller is responsible for the + // transaction life cycle of the writer and if an error is returned the + // caller must decide what to do with the transaction, which almost always + // should be to rollback. Delete returns the number of rows deleted or an error. + DeleteItems(ctx context.Context, deleteItems any, opt ...Option) (int, error) + + // Exec will execute the sql with the values as parameters. The int returned + // is the number of rows affected by the sql. No options are currently + // supported. + Exec(ctx context.Context, sql string, values []any, opt ...Option) (int, error) + + // GetTicket returns an oplog ticket for the aggregate root of "i" which can + // be used to WriteOplogEntryWith for that aggregate root. + GetTicket(ctx context.Context, i any) (*store.Ticket, error) + + // WriteOplogEntryWith will write an oplog entry with the msgs provided for + // the ticket's aggregateName. No options are currently supported. + WriteOplogEntryWith( + ctx context.Context, + wrapper wrapping.Wrapper, + ticket *store.Ticket, + metadata oplog.Metadata, + msgs []*oplog.Message, + opt ...Option, + ) error +} + +type transactionManager struct { + underlying *DB +} + +// NewTransactionManager creates a new transaction manager. +func NewTransactionManager(db *DB) TransactionManager { + return &transactionManager{ + underlying: db, + } +} + +func (t *transactionManager) DoRoTx(ctx context.Context, handler RoTxHandler, opts ...TxOption) (RetryInfo, error) { + const op = "db.(*transactionManager).DoRoTx" + txOpts := getTxOptions(opts) + switch { + case util.IsNil(t.underlying): + return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing db") + case util.IsNil(t.underlying): + return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing underlying db") + case util.IsNil(txOpts.backOff): + return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing backoff") + case util.IsNil(handler): + return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing handler") + } + info := RetryInfo{} + for attempts := uint(1); ; attempts++ { + if attempts > txOpts.retries+1 { + return info, errors.New(ctx, errors.MaxRetries, op, fmt.Sprintf("Too many retries: %d of %d", attempts-1, txOpts.retries+1), errors.WithoutEvent()) + } + + beginTx, err := dbw.New(t.underlying.wrapped.Load()).Begin(ctx) + if err != nil { + return info, wrapError(ctx, err, op) + } + + // TODO: In the future, if we want to support read-only replicas, + // this would create a new transaction to (one of?) the read-only + // replica(s). + newTxDb := &DB{wrapped: new(atomic.Pointer[dbw.DB])} + newTxDb.wrapped.Store(beginTx.DB()) + newRW := New(newTxDb) + + if err := handler(newRW); err != nil { + if err := beginTx.Rollback(ctx); err != nil { + return info, wrapError(ctx, err, op) + } + if errors.Match(errors.T(errors.TicketAlreadyRedeemed), err) { + d := txOpts.backOff.Duration(attempts) + info.Retries++ + info.Backoff = info.Backoff + d + time.Sleep(d) + continue + } + return info, errors.Wrap(ctx, err, op, errors.WithoutEvent()) + } + + var txnErr error + if commitErr := beginTx.Commit(ctx); commitErr != nil { + txnErr = stderrors.Join(txnErr, errors.Wrap(ctx, commitErr, op, errors.WithMsg("commit error"))) + if err := beginTx.Rollback(ctx); err != nil { + return info, stderrors.Join(txnErr, errors.Wrap(ctx, err, op, errors.WithMsg("rollback error"))) + } + return info, txnErr + } + return info, nil + } +} + +func (t *transactionManager) DoRwTx(ctx context.Context, handler RwTxHandler, opts ...TxOption) (RetryInfo, error) { + const op = "db.(*transactionManager).DoRwTx" + txOpts := getTxOptions(opts) + switch { + case util.IsNil(t.underlying): + return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing db") + case util.IsNil(t.underlying): + return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing underlying db") + case util.IsNil(txOpts.backOff): + return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing backoff") + case util.IsNil(handler): + return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing handler") + } + info := RetryInfo{} + for attempts := uint(1); ; attempts++ { + if attempts > txOpts.retries+1 { + return info, errors.New(ctx, errors.MaxRetries, op, fmt.Sprintf("Too many retries: %d of %d", attempts-1, txOpts.retries+1), errors.WithoutEvent()) + } + + beginTx, err := dbw.New(t.underlying.wrapped.Load()).Begin(ctx) + if err != nil { + return info, wrapError(ctx, err, op) + } + + // TODO: In the future, if we want to support read-only replicas, + // this would create a new transaction to the primary DB (not a replica) + newTxDb := &DB{wrapped: new(atomic.Pointer[dbw.DB])} + newTxDb.wrapped.Store(beginTx.DB()) + newRW := New(newTxDb) + + if err := handler(newRW); err != nil { + if err := beginTx.Rollback(ctx); err != nil { + return info, wrapError(ctx, err, op) + } + if errors.Match(errors.T(errors.TicketAlreadyRedeemed), err) { + d := txOpts.backOff.Duration(attempts) + info.Retries++ + info.Backoff = info.Backoff + d + time.Sleep(d) + continue + } + return info, errors.Wrap(ctx, err, op, errors.WithoutEvent()) + } + + var txnErr error + if commitErr := beginTx.Commit(ctx); commitErr != nil { + txnErr = stderrors.Join(txnErr, errors.Wrap(ctx, commitErr, op, errors.WithMsg("commit error"))) + if err := beginTx.Rollback(ctx); err != nil { + return info, stderrors.Join(txnErr, errors.Wrap(ctx, err, op, errors.WithMsg("rollback error"))) + } + return info, txnErr + } + return info, nil + } +} + +func (t *transactionManager) Writer() NewWriter { + // TODO: In the future, if we want to support read-only replicas, + // this would use the primary DB (not a replica) + return New(t.underlying) +} + +func (t *transactionManager) Reader() Reader { + // TODO: In the future, if we want to support read-only replicas, + // this would use (one of?) the read-only replica(s). + return New(t.underlying) +} From be73390634fdf2bc51930734bd9895440cd8d9cb Mon Sep 17 00:00:00 2001 From: Johan Brandhorst-Satzkorn Date: Fri, 7 Feb 2025 10:08:11 -0800 Subject: [PATCH 2/2] internal/alias: switch to use Transaction Manager --- internal/alias/repository.go | 21 ++++++++----------- internal/alias/repository_alias.go | 11 ++++++---- internal/alias/target/testing.go | 13 ++++++++++++ internal/daemon/controller/controller.go | 3 ++- .../daemon/controller/interceptor_test.go | 8 +++---- 5 files changed, 35 insertions(+), 21 deletions(-) diff --git a/internal/alias/repository.go b/internal/alias/repository.go index b3e9e1fead..68257ad7b7 100644 --- a/internal/alias/repository.go +++ b/internal/alias/repository.go @@ -9,33 +9,30 @@ import ( "github.com/hashicorp/boundary/internal/db" "github.com/hashicorp/boundary/internal/errors" "github.com/hashicorp/boundary/internal/kms" + "github.com/hashicorp/boundary/internal/util" ) // A Repository stores and retrieves the persistent types in the alias // package. It is not safe to use a repository concurrently. type Repository struct { - reader db.Reader - writer db.Writer - kms *kms.Kms + txm db.TransactionManager + kms *kms.Kms } // NewRepository creates a new Repository. The returned repository should // only be used for one transaction and it is not safe for concurrent go // routines to access it. -func NewRepository(ctx context.Context, r db.Reader, w db.Writer, kms *kms.Kms) (*Repository, error) { +func NewRepository(ctx context.Context, txm db.TransactionManager, kms *kms.Kms) (*Repository, error) { const op = "alias.NewRepository" switch { - case r == nil: - return nil, errors.New(ctx, errors.InvalidParameter, op, "db.Reader") - case w == nil: - return nil, errors.New(ctx, errors.InvalidParameter, op, "db.Writer") + case util.IsNil(txm): + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing transaction manager") case kms == nil: - return nil, errors.New(ctx, errors.InvalidParameter, op, "kms") + return nil, errors.New(ctx, errors.InvalidParameter, op, "missing kms") } return &Repository{ - reader: r, - writer: w, - kms: kms, + txm: txm, + kms: kms, }, nil } diff --git a/internal/alias/repository_alias.go b/internal/alias/repository_alias.go index 611a9c0cba..37e1795a05 100644 --- a/internal/alias/repository_alias.go +++ b/internal/alias/repository_alias.go @@ -7,6 +7,7 @@ import ( "context" "fmt" + "github.com/hashicorp/boundary/internal/db" "github.com/hashicorp/boundary/internal/errors" ) @@ -18,11 +19,13 @@ func (r *Repository) lookupAliasByValue(ctx context.Context, value string) (*Ali return nil, errors.New(ctx, errors.InvalidParameter, op, "value is empty") } a := allocAlias() - if err := r.reader.LookupWhere(ctx, a, "value = $1", []any{value}); err != nil { - if errors.IsNotFoundError(err) { - return nil, nil + if _, err := r.txm.DoRoTx(ctx, func(reader db.Reader) error { + if err := reader.LookupWhere(ctx, a, "value = $1", []any{value}); err != nil && !errors.IsNotFoundError(err) { + return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("failed for %q", value))) } - return nil, errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("failed for %q", value))) + return nil + }); err != nil { + return nil, err } return a, nil } diff --git a/internal/alias/target/testing.go b/internal/alias/target/testing.go index b2b0bf5405..2b050ac184 100644 --- a/internal/alias/target/testing.go +++ b/internal/alias/target/testing.go @@ -22,3 +22,16 @@ func TestAlias(t *testing.T, rw *db.Db, alias string, opt ...Option) *Alias { require.NoError(t, rw.Create(ctx, a, db.WithDebug(true))) return a } + +// TODO: Replace TestAlias with this when migrating to the TransactionManager +func TestNewAlias(t *testing.T, txm db.TransactionManager, alias string, opt ...Option) *Alias { + t.Helper() + ctx := context.Background() + + a, err := NewAlias(ctx, "global", alias, opt...) + require.NoError(t, err) + a.PublicId, err = newAliasId(ctx) + require.NoError(t, err) + require.NoError(t, txm.Writer().Create(ctx, a, db.WithDebug(true))) + return a +} diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index 57acc13605..ff335f755f 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -355,6 +355,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { // Set up repo stuff dbase := db.New(c.conf.Database) + txManager := db.NewTransactionManager(c.conf.Database) c.kms, err = kms.New(ctx, dbase, dbase) if err != nil { return nil, fmt.Errorf("error creating kms cache: %w", err) @@ -478,7 +479,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { return billing.NewRepository(ctx, dbase) } c.AliasRepoFn = func() (*alias.Repository, error) { - return alias.NewRepository(ctx, dbase, dbase, c.kms) + return alias.NewRepository(ctx, txManager, c.kms) } c.TargetAliasRepoFn = func() (*talias.Repository, error) { return talias.NewRepository(ctx, dbase, dbase, c.kms) diff --git a/internal/daemon/controller/interceptor_test.go b/internal/daemon/controller/interceptor_test.go index 56a14fbedc..692e7a80f1 100644 --- a/internal/daemon/controller/interceptor_test.go +++ b/internal/daemon/controller/interceptor_test.go @@ -667,18 +667,18 @@ func (m *streamMock) RecvToClient() (*httpbody.HttpBody, error) { func Test_aliasResolutionInterceptor(t *testing.T) { ctx := context.Background() conn, _ := db.TestSetup(t, "postgres") - rw := db.New(conn) + txm := db.NewTransactionManager(conn) wrapper := db.TestWrapper(t) kmsCache := kms.TestKms(t, conn, wrapper) aliasRepoFn := func() (*alias.Repository, error) { - return alias.NewRepository(context.Background(), rw, rw, kmsCache) + return alias.NewRepository(context.Background(), txm, kmsCache) } _, proj := iam.TestScopes(t, iam.TestRepo(t, conn, wrapper)) tar := tcp.TestTarget(ctx, t, conn, proj.GetPublicId(), "test-target") - al := talias.TestAlias(t, rw, "test-alias.example", talias.WithDestinationId(tar.GetPublicId())) - alWithoutDest := talias.TestAlias(t, rw, "no-destination.alias") + al := talias.TestNewAlias(t, txm, "test-alias.example", talias.WithDestinationId(tar.GetPublicId())) + alWithoutDest := talias.TestNewAlias(t, txm, "no-destination.alias") interceptor := aliasResolutionInterceptor(ctx, aliasRepoFn) require.NotNil(t, interceptor)