refactor oplog to use db/internal migrations, remove InitTicket() for more reliability and simplify test step up and tear down (#37)

pull/35/head
Jim 6 years ago committed by GitHub
parent c43e05be2e
commit 0676ea51f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -302,10 +302,6 @@ func (rw *GormReadWriter) addOplog(ctx context.Context, opType OpType, opts Opti
if err != nil {
return fmt.Errorf("error getting Ticketer %w for WithOplog", err)
}
err = ticketer.InitTicket(replayable.TableName())
if err != nil {
return fmt.Errorf("error getting initializing ticket %w for WithOplog", err)
}
ticket, err := ticketer.GetTicket(replayable.TableName())
if err != nil {
return fmt.Errorf("error getting ticket %w for WithOplog", err)

@ -1,5 +0,0 @@
drop table if exists oplog_entry;
drop table if exists oplog_metadata;
drop table if exists oplog_ticket;
drop index if exists idx_oplog_metatadata_key;
drop index if exists idx_oplog_metatadata_value

@ -1,24 +0,0 @@
CREATE TABLE if not exists oplog_entry (
id bigint generated always as identity primary key,
create_time timestamp with time zone default current_timestamp,
update_time timestamp with time zone default current_timestamp,
version text NOT NULL,
aggregate_name text NOT NULL,
"data" bytea NOT NULL
);
CREATE TABLE if not exists oplog_ticket (
id bigint generated always as identity primary key,
create_time timestamp with time zone default current_timestamp,
update_time timestamp with time zone default current_timestamp,
"name" text NOT NULL UNIQUE,
"version" bigint NOT NULL
);
CREATE TABLE if not exists oplog_metadata (
id bigint generated always as identity primary key,
create_time timestamp with time zone default current_timestamp,
entry_id bigint NOT NULL REFERENCES oplog_entry(id) ON DELETE CASCADE ON UPDATE CASCADE,
"key" text NOT NULL,
value text NULL
);
create index if not exists idx_oplog_metatadata_key on oplog_metadata(key);
create index if not exists idx_oplog_metatadata_value on oplog_metadata(value);

@ -7,7 +7,6 @@ import (
"fmt"
"os"
"strings"
"sync"
"testing"
wrapping "github.com/hashicorp/go-kms-wrapping"
@ -23,62 +22,29 @@ import (
"github.com/ory/dockertest/v3"
)
// Need a way to manage the shared database resource for these oplog
// tests, so runningTests gives us a waitgroup to do this and clean up
// the shared database resource when we're done.
var runningTests sync.WaitGroup
// startTest signals that we're starting a test that uses the shared test resources
func startTest() {
runningTests.Add(1)
}
// completeTest signals that we've finished a test that uses the shared test resources
func completeTest() {
runningTests.Done()
}
// waitForTests will wait for all the tests that are sharing resources like the database
func waitForTests() {
runningTests.Wait()
}
// testDatabaseURL is initialized once using sync.Once and set to the database URL for testing
var testDatabaseURL string
// testInitDatabase ensures that the database is only initialized once during the tests.
var testInitDatabase sync.Once
// setup the tests (initialize the database one-time and intialized testDatabaseURL)
func setup(t *testing.T) (func(), string) {
func setup(t *testing.T) (func(), *gorm.DB) {
cleanup := func() {}
var url string
var err error
testInitDatabase.Do(func() {
cleanup, url, err = initDbInDocker(t)
if err != nil {
panic(err)
}
testDatabaseURL = url
db, err := test_dbconn(url)
if err != nil {
panic(err)
}
defer db.Close()
oplog_test.Init(db)
})
return cleanup, testDatabaseURL
cleanup, url, err = initDbInDocker(t)
if err != nil {
t.Fatal(err)
}
db, err := gorm.Open("postgres", url)
if err != nil {
t.Fatal(err)
}
oplog_test.Init(db)
return cleanup, db
}
// Test_BasicOplog provides some basic unit tests for oplogs
func Test_BasicOplog(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("EncryptData/DecryptData/UnmarshalData", func(t *testing.T) {
@ -163,11 +129,8 @@ func Test_BasicOplog(t *testing.T) {
resp := db.Create(&user)
assert.NilError(t, resp.Error)
ticketName, err := uuid.GenerateUUID()
assert.NilError(t, err)
err = ticketer.InitTicket(ticketName)
assert.NilError(t, err)
ticket, err := ticketer.GetTicket(ticketName)
ticket, err := ticketer.GetTicket("default")
assert.NilError(t, err)
queue = Queue{}
@ -195,13 +158,8 @@ func Test_BasicOplog(t *testing.T) {
// Test_NewEntry provides some basic unit tests for NewEntry
func Test_NewEntry(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("valid", func(t *testing.T) {
cipherer := initWrapper(t)
@ -283,13 +241,8 @@ func Test_NewEntry(t *testing.T) {
}
func Test_UnmarshalData(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
cipherer := initWrapper(t)
@ -402,13 +355,8 @@ func Test_UnmarshalData(t *testing.T) {
// Test_Replay provides some basic unit tests for replaying entries
func Test_Replay(t *testing.T) {
startTest()
t.Parallel()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
cipherer := initWrapper(t)
@ -423,17 +371,12 @@ func Test_Replay(t *testing.T) {
replayUserTable := fmt.Sprintf("%s%s", testUser.TableName(), tableSuffix)
defer func() { assert.NilError(t, writer.dropTableIfExists(replayUserTable)) }()
ticketName, err := uuid.GenerateUUID()
assert.NilError(t, err)
ticketer, err := NewGormTicketer(db, WithAggregateNames(true))
assert.NilError(t, err)
err = ticketer.InitTicket(ticketName)
assert.NilError(t, err)
t.Run("replay:create/update", func(t *testing.T) {
ticket, err := ticketer.GetTicket(ticketName)
ticket, err := ticketer.GetTicket("default")
assert.NilError(t, err)
newLogEntry, err := NewEntry(
@ -515,7 +458,7 @@ func Test_Replay(t *testing.T) {
// we need to test delete replays now...
tx2 := db.Begin()
ticket2, err := ticketer.GetTicket(ticketName)
ticket2, err := ticketer.GetTicket("default")
assert.NilError(t, err)
id4, err := uuid.GenerateUUID()
@ -578,13 +521,8 @@ func Test_Replay(t *testing.T) {
// Test_WriteEntryWith provides unit tests for oplog.WriteEntryWith
func Test_WriteEntryWith(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
cipherer := initWrapper(t)
@ -603,14 +541,10 @@ func Test_WriteEntryWith(t *testing.T) {
}
t.Log(&u2)
ticketName, err := uuid.GenerateUUID()
assert.NilError(t, err)
ticketer, err := NewGormTicketer(db, WithAggregateNames(true))
assert.NilError(t, err)
err = ticketer.InitTicket(ticketName)
assert.NilError(t, err)
ticket, err := ticketer.GetTicket(ticketName)
ticket, err := ticketer.GetTicket("default")
assert.NilError(t, err)
t.Run("successful", func(t *testing.T) {
@ -695,23 +629,13 @@ func Test_WriteEntryWith(t *testing.T) {
// Test_TicketSerialization provides unit tests for making sure oplog.Tickets properly serialize writes to oplog entries
func Test_TicketSerialization(t *testing.T) {
startTest()
t.Parallel()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
ticketName, err := uuid.GenerateUUID()
assert.NilError(t, err)
ticketer, err := NewGormTicketer(db, WithAggregateNames(true))
assert.NilError(t, err)
// in it's own transaction, init the ticket
_ = ticketer.InitTicket(ticketName)
cipherer := initWrapper(t)
id, err := uuid.GenerateUUID()
@ -722,7 +646,7 @@ func Test_TicketSerialization(t *testing.T) {
}
err = firstTx.Create(&firstUser).Error
assert.NilError(t, err)
firstTicket, err := ticketer.GetTicket(ticketName)
firstTicket, err := ticketer.GetTicket("default")
assert.NilError(t, err)
firstQueue := Queue{}
@ -750,7 +674,7 @@ func Test_TicketSerialization(t *testing.T) {
}
err = secondTx.Create(&secondUser).Error
assert.NilError(t, err)
secondTicket, err := ticketer.GetTicket(ticketName)
secondTicket, err := ticketer.GetTicket("default")
assert.NilError(t, err)
secondQueue := Queue{}
@ -805,7 +729,7 @@ func initDbInDocker(t *testing.T) (cleanup func(), retURL string, err error) {
}
c := func() {
cleanupResource(t, pool, resource)
cleanupResource(pool, resource)
}
url := fmt.Sprintf("postgres://postgres:secret@localhost:%s?sslmode=disable", resource.GetPort("5432/tcp"))
@ -848,7 +772,7 @@ func initWrapper(t *testing.T) wrapping.Wrapper {
// initTestStore will execute the migrations needed to initialize the store for tests
func initTestStore(t *testing.T, cleanup func(), url string) {
// run migrations
m, err := migrate.New("file://migrations/postgres", url)
m, err := migrate.New("file://../db/migrations/postgres", url)
if err != nil {
cleanup()
t.Fatalf("Error creating migrations: %s", err)
@ -860,22 +784,16 @@ func initTestStore(t *testing.T, cleanup func(), url string) {
}
// cleanupResource will clean up the dockertest resources (postgres)
func cleanupResource(t *testing.T, pool *dockertest.Pool, resource *dockertest.Resource) {
waitForTests()
func cleanupResource(pool *dockertest.Pool, resource *dockertest.Resource) error {
var err error
for i := 0; i < 10; i++ {
err = pool.Purge(resource)
if err == nil {
return
return nil
}
}
if strings.Contains(err.Error(), "No such container") {
return
return nil
}
t.Fatalf("Failed to cleanup local container: %s", err)
}
func test_dbconn(url string) (*gorm.DB, error) {
return gorm.Open("postgres", url)
return fmt.Errorf("Failed to cleanup local container: %s", err)
}

@ -18,15 +18,16 @@ const DefaultAggregateName = "global"
// Ticketer provides an interface to storage for Tickets, so you can easily substitute your own ticketer
type Ticketer interface {
// InitTicket initializes a ticket. You MUST initialize a ticket in its own transaction (and commit it),
// before you GetTicket in a later transaction to write to the oplog. InitTicket will check to see if
// the ticket has already been initialized before creating a new one.
InitTicket(aggregateName string) error
// GetTicket returns a ticket for the specified name. You MUST GetTicket in the same transaction
// that you're using to write to the database tables. Names allow us to shard tickets around domain root names
// that you're using to write to the database tables. Names allow us to shard tickets around domain root names.
// Before getting a ticket you must insert it with it's name into the oplog_ticket table. This is done via a
// db migrations script. Requiring this insert as part of migrations ensures that the tickets are initialized in
// a separate transaction from when a client calls GetTicket(aggregateName) which is critical for the optimized locking
// pattern to work properly
GetTicket(aggregateName string) (*store.Ticket, error)
// Redeem ticket will attempt to redeem the ticket and ensure it's serialized with other tickets using the same
// aggregate name
Redeem(ticket *store.Ticket) error
}
@ -66,34 +67,6 @@ func (ticketer *GormTicketer) GetTicket(aggregateName string) (*store.Ticket, er
return &ticket, nil
}
// InitTicket initializes a ticket. You MUST initialize a ticket in its own transaction (and commit it),
// before you GetTicket in a later transaction to write to the oplog. InitTicket will check to see if
// the ticket has already been initialized before creating a new one.
func (ticketer *GormTicketer) InitTicket(aggregateName string) error {
// no existing ticket found, so let's initialize a new one
if aggregateName == "" {
return fmt.Errorf("bad ticket name")
}
// check to see if a ticket has already been initialized
existingTicket, err := ticketer.GetTicket(aggregateName)
if err == nil && existingTicket != nil {
return nil // found an existing intialized ticket without errors
}
if err != ErrTicketNotFound {
return fmt.Errorf("error retreiving ticket from storage: %w", err)
}
name := DefaultAggregateName
if ticketer.withAggregateNames {
name = aggregateName
}
newTicket := store.Ticket{Name: name, Version: 1}
if err := ticketer.tx.Create(&newTicket).Error; err != nil {
return fmt.Errorf("error creating ticket in storage: %w", err)
}
return nil
}
// Redeem will attempt to redeem the ticket. If the ticket version has already been used, then an error is returned
func (ticketer *GormTicketer) Redeem(t *store.Ticket) error {
if t == nil {

@ -3,19 +3,13 @@ package oplog
import (
"testing"
"github.com/hashicorp/go-uuid"
"gotest.tools/assert"
)
// Test_NewGormTicketer provides unit tests for creating a Gorm ticketer
func Test_NewGormTicketer(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("valid", func(t *testing.T) {
ticketer, err := NewGormTicketer(db, WithAggregateNames(true))
@ -30,27 +24,16 @@ func Test_NewGormTicketer(t *testing.T) {
// Test_GetTicket provides unit tests for getting oplog.Tickets
func Test_GetTicket(t *testing.T) {
startTest()
t.Parallel()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
ticketName, err := uuid.GenerateUUID()
assert.NilError(t, err)
ticketer, err := NewGormTicketer(db, WithAggregateNames(true))
assert.NilError(t, err)
err = ticketer.InitTicket(ticketName)
assert.NilError(t, err)
t.Run("valid", func(t *testing.T) {
ticket, err := ticketer.GetTicket(ticketName)
ticket, err := ticketer.GetTicket("default")
assert.NilError(t, err)
assert.Equal(t, ticket.Name, ticketName)
assert.Equal(t, ticket.Name, "default")
assert.Check(t, ticket.Version != 0)
})
@ -61,58 +44,17 @@ func Test_GetTicket(t *testing.T) {
})
}
// Test_InitTicket provides unit tests for initializing tickets
func Test_InitTicket(t *testing.T) {
startTest()
t.Parallel()
cleanup, url := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
ticketName, err := uuid.GenerateUUID()
assert.NilError(t, err)
ticketer, err := NewGormTicketer(db, WithAggregateNames(true))
assert.NilError(t, err)
t.Run("valid", func(t *testing.T) {
err = ticketer.InitTicket(ticketName)
assert.NilError(t, err)
})
t.Run("no name", func(t *testing.T) {
err = ticketer.InitTicket("")
assert.Equal(t, err.Error(), "bad ticket name")
})
}
// Test_Redeem provides unit tests for redeeming tickets
func Test_Redeem(t *testing.T) {
startTest()
t.Parallel()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
ticketName, err := uuid.GenerateUUID()
assert.NilError(t, err)
ticketer, err := NewGormTicketer(db, WithAggregateNames(true))
assert.NilError(t, err)
// in it's own transaction, init the ticket
_ = ticketer.InitTicket(ticketName)
t.Run("valid", func(t *testing.T) {
tx := db.Begin()
ticketer, err := NewGormTicketer(tx, WithAggregateNames(true))
assert.NilError(t, err)
ticket, err := ticketer.GetTicket(ticketName)
ticket, err := ticketer.GetTicket("default")
assert.NilError(t, err)
err = ticketer.Redeem(ticket)
@ -134,13 +76,13 @@ func Test_Redeem(t *testing.T) {
ticketer, err := NewGormTicketer(tx, WithAggregateNames(true))
assert.NilError(t, err)
ticket, err := ticketer.GetTicket(ticketName)
ticket, err := ticketer.GetTicket("default")
assert.NilError(t, err)
secondTx := db.Begin()
secondTicketer, err := NewGormTicketer(secondTx, WithAggregateNames(true))
assert.NilError(t, err)
secondTicket, err := secondTicketer.GetTicket(ticketName)
secondTicket, err := secondTicketer.GetTicket("default")
assert.NilError(t, err)
err = ticketer.Redeem(ticket)

@ -11,13 +11,8 @@ import (
// Test_GormWriterCreate provides unit tests for GormWriter Create
func Test_GormWriterCreate(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("valid", func(t *testing.T) {
tx := db.Begin()
@ -54,7 +49,7 @@ func Test_GormWriterCreate(t *testing.T) {
tx := db.Begin()
defer tx.Rollback()
w := GormWriter{tx}
err = w.Create(nil)
err := w.Create(nil)
assert.Check(t, err != nil)
assert.Equal(t, err.Error(), "create interface is nil")
})
@ -62,13 +57,8 @@ func Test_GormWriterCreate(t *testing.T) {
// Test_GormWriterDelete provides unit tests for GormWriter Delete
func Test_GormWriterDelete(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("valid", func(t *testing.T) {
tx := db.Begin()
@ -108,7 +98,7 @@ func Test_GormWriterDelete(t *testing.T) {
tx := db.Begin()
defer tx.Rollback()
w := GormWriter{tx}
err = w.Delete(nil)
err := w.Delete(nil)
assert.Check(t, err != nil)
assert.Equal(t, err.Error(), "delete interface is nil")
})
@ -116,13 +106,8 @@ func Test_GormWriterDelete(t *testing.T) {
// Test_GormWriterUpdate provides unit tests for GormWriter Update
func Test_GormWriterUpdate(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("valid no fieldmask", func(t *testing.T) {
tx := db.Begin()
@ -213,7 +198,7 @@ func Test_GormWriterUpdate(t *testing.T) {
tx := db.Begin()
defer tx.Rollback()
w := GormWriter{tx}
err = w.Update(nil, nil)
err := w.Update(nil, nil)
assert.Check(t, err != nil)
assert.Equal(t, err.Error(), "update interface is nil")
})
@ -221,15 +206,9 @@ func Test_GormWriterUpdate(t *testing.T) {
// Test_GormWriterHasTable provides unit tests for GormWriter HasTable
func Test_GormWriterHasTable(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
w := GormWriter{Tx: db}
t.Run("success", func(t *testing.T) {
@ -250,15 +229,9 @@ func Test_GormWriterHasTable(t *testing.T) {
// Test_GormWriterCreateTable provides unit tests for GormWriter CreateTable
func Test_GormWriterCreateTable(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("success", func(t *testing.T) {
w := GormWriter{Tx: db}
suffix, err := uuid.GenerateUUID()
@ -299,7 +272,7 @@ func Test_GormWriterCreateTable(t *testing.T) {
t.Run("blank name", func(t *testing.T) {
w := GormWriter{Tx: db}
u := &oplog_test.TestUser{}
err = w.createTableLike(u.TableName(), "")
err := w.createTableLike(u.TableName(), "")
assert.Check(t, err != nil)
assert.Error(t, err, err.Error(), nil)
assert.Equal(t, err.Error(), "error newTableName is empty string")
@ -308,15 +281,9 @@ func Test_GormWriterCreateTable(t *testing.T) {
// Test_GormWriterDropTableIfExists provides unit tests for GormWriter DropTableIfExists
func Test_GormWriterDropTableIfExists(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
cleanup, db := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("success", func(t *testing.T) {
w := GormWriter{Tx: db}
suffix, err := uuid.GenerateUUID()

Loading…
Cancel
Save