Johan Brandhorst-Satzkorn 1 month ago committed by GitHub
commit 22f4005d32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -9,33 +9,30 @@ import (
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/kms"
"github.com/hashicorp/boundary/internal/util"
)
// A Repository stores and retrieves the persistent types in the alias
// package. It is not safe to use a repository concurrently.
type Repository struct {
reader db.Reader
writer db.Writer
kms *kms.Kms
txm db.TransactionManager
kms *kms.Kms
}
// NewRepository creates a new Repository. The returned repository should
// only be used for one transaction and it is not safe for concurrent go
// routines to access it.
func NewRepository(ctx context.Context, r db.Reader, w db.Writer, kms *kms.Kms) (*Repository, error) {
func NewRepository(ctx context.Context, txm db.TransactionManager, kms *kms.Kms) (*Repository, error) {
const op = "alias.NewRepository"
switch {
case r == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "db.Reader")
case w == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "db.Writer")
case util.IsNil(txm):
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing transaction manager")
case kms == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "kms")
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing kms")
}
return &Repository{
reader: r,
writer: w,
kms: kms,
txm: txm,
kms: kms,
}, nil
}

@ -7,6 +7,7 @@ import (
"context"
"fmt"
"github.com/hashicorp/boundary/internal/db"
"github.com/hashicorp/boundary/internal/errors"
)
@ -18,11 +19,13 @@ func (r *Repository) lookupAliasByValue(ctx context.Context, value string) (*Ali
return nil, errors.New(ctx, errors.InvalidParameter, op, "value is empty")
}
a := allocAlias()
if err := r.reader.LookupWhere(ctx, a, "value = $1", []any{value}); err != nil {
if errors.IsNotFoundError(err) {
return nil, nil
if _, err := r.txm.DoRoTx(ctx, func(reader db.Reader) error {
if err := reader.LookupWhere(ctx, a, "value = $1", []any{value}); err != nil && !errors.IsNotFoundError(err) {
return errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("failed for %q", value)))
}
return nil, errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("failed for %q", value)))
return nil
}); err != nil {
return nil, err
}
return a, nil
}

@ -22,3 +22,16 @@ func TestAlias(t *testing.T, rw *db.Db, alias string, opt ...Option) *Alias {
require.NoError(t, rw.Create(ctx, a, db.WithDebug(true)))
return a
}
// TODO: Replace TestAlias with this when migrating to the TransactionManager
func TestNewAlias(t *testing.T, txm db.TransactionManager, alias string, opt ...Option) *Alias {
t.Helper()
ctx := context.Background()
a, err := NewAlias(ctx, "global", alias, opt...)
require.NoError(t, err)
a.PublicId, err = newAliasId(ctx)
require.NoError(t, err)
require.NoError(t, txm.Writer().Create(ctx, a, db.WithDebug(true)))
return a
}

@ -355,6 +355,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
// Set up repo stuff
dbase := db.New(c.conf.Database)
txManager := db.NewTransactionManager(c.conf.Database)
c.kms, err = kms.New(ctx, dbase, dbase)
if err != nil {
return nil, fmt.Errorf("error creating kms cache: %w", err)
@ -479,7 +480,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
return billing.NewRepository(ctx, dbase)
}
c.AliasRepoFn = func() (*alias.Repository, error) {
return alias.NewRepository(ctx, dbase, dbase, c.kms)
return alias.NewRepository(ctx, txManager, c.kms)
}
c.TargetAliasRepoFn = func() (*talias.Repository, error) {
return talias.NewRepository(ctx, dbase, dbase, c.kms)

@ -672,18 +672,18 @@ func (m *streamMock) RecvToClient() (*httpbody.HttpBody, error) {
func Test_aliasResolutionInterceptor(t *testing.T) {
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
txm := db.NewTransactionManager(conn)
wrapper := db.TestWrapper(t)
kmsCache := kms.TestKms(t, conn, wrapper)
aliasRepoFn := func() (*alias.Repository, error) {
return alias.NewRepository(context.Background(), rw, rw, kmsCache)
return alias.NewRepository(context.Background(), txm, kmsCache)
}
_, proj := iam.TestScopes(t, iam.TestRepo(t, conn, wrapper))
tar := tcp.TestTarget(ctx, t, conn, proj.GetPublicId(), "test-target")
al := talias.TestAlias(t, rw, "test-alias.example", talias.WithDestinationId(tar.GetPublicId()))
alWithoutDest := talias.TestAlias(t, rw, "no-destination.alias")
al := talias.TestNewAlias(t, txm, "test-alias.example", talias.WithDestinationId(tar.GetPublicId()))
alWithoutDest := talias.TestNewAlias(t, txm, "no-destination.alias")
interceptor := aliasResolutionInterceptor(ctx, aliasRepoFn)
require.NotNil(t, interceptor)

@ -14,7 +14,6 @@ import (
"github.com/hashicorp/boundary/internal/event"
"github.com/hashicorp/go-dbw"
_ "github.com/jackc/pgx/v5"
"gorm.io/driver/postgres"
)

@ -41,7 +41,11 @@ const (
DescendingOrderBy
)
// Reader interface defines lookups/searching for resources
// TxHandler defines a handler for a func that writes a transaction for use with DoTx
type TxHandler func(Reader, Writer) error
// Reader interface defines lookups/searching for resources. It does
// not allow for writing to the db.
type Reader interface {
// LookupById will lookup a resource by its primary key id, which must be
// unique. If the resource implements either ResourcePublicIder or
@ -170,9 +174,6 @@ type RetryInfo struct {
Backoff time.Duration
}
// TxHandler defines a handler for a func that writes a transaction for use with DoTx
type TxHandler func(Reader, Writer) error
// ResourcePublicIder defines an interface that LookupByPublicId() can use to
// get the resource's public id.
type ResourcePublicIder interface {

@ -0,0 +1,267 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package db
import (
"context"
stderrors "errors"
"fmt"
"sync/atomic"
"time"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/oplog"
"github.com/hashicorp/boundary/internal/oplog/store"
"github.com/hashicorp/boundary/internal/util"
"github.com/hashicorp/go-dbw"
wrapping "github.com/hashicorp/go-kms-wrapping/v2"
)
// RoTxHandler is a function that will be executed within a read-only transaction.
type RoTxHandler func(Reader) error
// RwTxHandler is a function that will be executed within a read-write transaction.
type RwTxHandler func(NewWriter) error
type txOptions struct {
retries uint
backOff Backoff
}
func getTxOptions(opts []TxOption) *txOptions {
txOpts := txOptions{
retries: StdRetryCnt,
backOff: ExpBackoff{},
}
for _, opt := range opts {
opt(&txOpts)
}
return &txOpts
}
// TxOption is used to configure the transaction.
type TxOption func(*txOptions)
// TransactionManager defines the interface for interacting with the database.
// All database operations require an explicit transaction with a defined backoff
// and retry mechanism.
type TransactionManager interface {
// DoRoTx will start a read-only transaction and execute the handler. Retries
// will be attempted based on the backoff strategy and number of retries.
DoRoTx(ctx context.Context, handler RoTxHandler, opts ...TxOption) (RetryInfo, error)
// DoRwTx will start a read-write transaction and execute the handler. Retries
// will be attempted based on the backoff strategy and number of retries.
DoRwTx(ctx context.Context, handler RwTxHandler, opts ...TxOption) (RetryInfo, error)
// Writer returns a writer suitable for a single read-write operation.
Writer() NewWriter
// Reader returns a reader suitable for a single read-only operation.
Reader() Reader
}
// NewWriter interface defines read/write operations for resources.
// TODO: Remove the old Writer interface and rename this Writer.
type NewWriter interface {
Reader
// Update an object in the db, fieldMask is required and provides
// field_mask.proto paths for fields that should be updated. The i interface
// parameter is the type the caller wants to update in the db and its
// fields are set to the update values. setToNullPaths is optional and
// provides field_mask.proto paths for the fields that should be set to
// null. fieldMaskPaths and setToNullPaths must not intersect. The caller
// is responsible for the transaction life cycle of the writer and if an
// error is returned the caller must decide what to do with the transaction,
// which almost always should be to rollback. Update returns the number of
// rows updated or an error. Supported options: WithOplog.
Update(ctx context.Context, i any, fieldMaskPaths []string, setToNullPaths []string, opt ...Option) (int, error)
// Create an object in the db with options: WithDebug, WithOplog, NewOplogMsg,
// WithLookup, WithReturnRowsAffected, OnConflict, WithVersion, and
// WithWhere. The caller is responsible for the transaction life cycle of
// the writer and if an error is returned the caller must decide what to do
// with the transaction, which almost always should be to rollback.
Create(ctx context.Context, i any, opt ...Option) error
// CreateItems will create multiple items of the same type.
// Supported options: WithDebug, WithOplog, WithOplogMsgs,
// WithReturnRowsAffected, OnConflict, WithVersion, and WithWhere.
/// WithOplog and WithOplogMsgs may not be used together. WithLookup is not
// a supported option. The caller is responsible for the transaction life
// cycle of the writer and if an error is returned the caller must decide
// what to do with the transaction, which almost always should be to
// rollback.
CreateItems(ctx context.Context, createItems any, opt ...Option) error
// Delete an object in the db with options: WithOplog, WithDebug.
// The caller is responsible for the transaction life cycle of the writer
// and if an error is returned the caller must decide what to do with
// the transaction, which almost always should be to rollback. Delete
// returns the number of rows deleted or an error.
Delete(ctx context.Context, i any, opt ...Option) (int, error)
// DeleteItems will delete multiple items of the same type.
// Supported options: WithOplog and WithOplogMsgs. WithOplog and
// WithOplogMsgs may not be used together. The caller is responsible for the
// transaction life cycle of the writer and if an error is returned the
// caller must decide what to do with the transaction, which almost always
// should be to rollback. Delete returns the number of rows deleted or an error.
DeleteItems(ctx context.Context, deleteItems any, opt ...Option) (int, error)
// Exec will execute the sql with the values as parameters. The int returned
// is the number of rows affected by the sql. No options are currently
// supported.
Exec(ctx context.Context, sql string, values []any, opt ...Option) (int, error)
// GetTicket returns an oplog ticket for the aggregate root of "i" which can
// be used to WriteOplogEntryWith for that aggregate root.
GetTicket(ctx context.Context, i any) (*store.Ticket, error)
// WriteOplogEntryWith will write an oplog entry with the msgs provided for
// the ticket's aggregateName. No options are currently supported.
WriteOplogEntryWith(
ctx context.Context,
wrapper wrapping.Wrapper,
ticket *store.Ticket,
metadata oplog.Metadata,
msgs []*oplog.Message,
opt ...Option,
) error
}
type transactionManager struct {
underlying *DB
}
// NewTransactionManager creates a new transaction manager.
func NewTransactionManager(db *DB) TransactionManager {
return &transactionManager{
underlying: db,
}
}
func (t *transactionManager) DoRoTx(ctx context.Context, handler RoTxHandler, opts ...TxOption) (RetryInfo, error) {
const op = "db.(*transactionManager).DoRoTx"
txOpts := getTxOptions(opts)
switch {
case util.IsNil(t.underlying):
return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing db")
case util.IsNil(t.underlying):
return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing underlying db")
case util.IsNil(txOpts.backOff):
return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing backoff")
case util.IsNil(handler):
return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing handler")
}
info := RetryInfo{}
for attempts := uint(1); ; attempts++ {
if attempts > txOpts.retries+1 {
return info, errors.New(ctx, errors.MaxRetries, op, fmt.Sprintf("Too many retries: %d of %d", attempts-1, txOpts.retries+1), errors.WithoutEvent())
}
beginTx, err := dbw.New(t.underlying.wrapped.Load()).Begin(ctx)
if err != nil {
return info, wrapError(ctx, err, op)
}
// TODO: In the future, if we want to support read-only replicas,
// this would create a new transaction to (one of?) the read-only
// replica(s).
newTxDb := &DB{wrapped: new(atomic.Pointer[dbw.DB])}
newTxDb.wrapped.Store(beginTx.DB())
newRW := New(newTxDb)
if err := handler(newRW); err != nil {
if err := beginTx.Rollback(ctx); err != nil {
return info, wrapError(ctx, err, op)
}
if errors.Match(errors.T(errors.TicketAlreadyRedeemed), err) {
d := txOpts.backOff.Duration(attempts)
info.Retries++
info.Backoff = info.Backoff + d
time.Sleep(d)
continue
}
return info, errors.Wrap(ctx, err, op, errors.WithoutEvent())
}
var txnErr error
if commitErr := beginTx.Commit(ctx); commitErr != nil {
txnErr = stderrors.Join(txnErr, errors.Wrap(ctx, commitErr, op, errors.WithMsg("commit error")))
if err := beginTx.Rollback(ctx); err != nil {
return info, stderrors.Join(txnErr, errors.Wrap(ctx, err, op, errors.WithMsg("rollback error")))
}
return info, txnErr
}
return info, nil
}
}
func (t *transactionManager) DoRwTx(ctx context.Context, handler RwTxHandler, opts ...TxOption) (RetryInfo, error) {
const op = "db.(*transactionManager).DoRwTx"
txOpts := getTxOptions(opts)
switch {
case util.IsNil(t.underlying):
return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing db")
case util.IsNil(t.underlying):
return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing underlying db")
case util.IsNil(txOpts.backOff):
return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing backoff")
case util.IsNil(handler):
return RetryInfo{}, errors.New(ctx, errors.InvalidParameter, op, "missing handler")
}
info := RetryInfo{}
for attempts := uint(1); ; attempts++ {
if attempts > txOpts.retries+1 {
return info, errors.New(ctx, errors.MaxRetries, op, fmt.Sprintf("Too many retries: %d of %d", attempts-1, txOpts.retries+1), errors.WithoutEvent())
}
beginTx, err := dbw.New(t.underlying.wrapped.Load()).Begin(ctx)
if err != nil {
return info, wrapError(ctx, err, op)
}
// TODO: In the future, if we want to support read-only replicas,
// this would create a new transaction to the primary DB (not a replica)
newTxDb := &DB{wrapped: new(atomic.Pointer[dbw.DB])}
newTxDb.wrapped.Store(beginTx.DB())
newRW := New(newTxDb)
if err := handler(newRW); err != nil {
if err := beginTx.Rollback(ctx); err != nil {
return info, wrapError(ctx, err, op)
}
if errors.Match(errors.T(errors.TicketAlreadyRedeemed), err) {
d := txOpts.backOff.Duration(attempts)
info.Retries++
info.Backoff = info.Backoff + d
time.Sleep(d)
continue
}
return info, errors.Wrap(ctx, err, op, errors.WithoutEvent())
}
var txnErr error
if commitErr := beginTx.Commit(ctx); commitErr != nil {
txnErr = stderrors.Join(txnErr, errors.Wrap(ctx, commitErr, op, errors.WithMsg("commit error")))
if err := beginTx.Rollback(ctx); err != nil {
return info, stderrors.Join(txnErr, errors.Wrap(ctx, err, op, errors.WithMsg("rollback error")))
}
return info, txnErr
}
return info, nil
}
}
func (t *transactionManager) Writer() NewWriter {
// TODO: In the future, if we want to support read-only replicas,
// this would use the primary DB (not a replica)
return New(t.underlying)
}
func (t *transactionManager) Reader() Reader {
// TODO: In the future, if we want to support read-only replicas,
// this would use (one of?) the read-only replica(s).
return New(t.underlying)
}
Loading…
Cancel
Save