From 0676ea51f1dfc3b68bf874f1c549e842bc45bfa6 Mon Sep 17 00:00:00 2001 From: Jim Date: Mon, 11 May 2020 10:30:44 -0400 Subject: [PATCH] refactor oplog to use db/internal migrations, remove InitTicket() for more reliability and simplify test step up and tear down (#37) --- internal/db/read_writer.go | 4 - .../oplog/migrations/postgres/01_db.down.sql | 5 - .../oplog/migrations/postgres/01_db.up.sql | 24 --- internal/oplog/oplog_test.go | 144 ++++-------------- internal/oplog/ticketer.go | 41 +---- internal/oplog/ticketer_gorm_test.go | 74 +-------- internal/oplog/writer_test.go | 53 ++----- 7 files changed, 56 insertions(+), 289 deletions(-) delete mode 100644 internal/oplog/migrations/postgres/01_db.down.sql delete mode 100644 internal/oplog/migrations/postgres/01_db.up.sql diff --git a/internal/db/read_writer.go b/internal/db/read_writer.go index ac4f326d2b..ed6baacc3e 100644 --- a/internal/db/read_writer.go +++ b/internal/db/read_writer.go @@ -302,10 +302,6 @@ func (rw *GormReadWriter) addOplog(ctx context.Context, opType OpType, opts Opti if err != nil { return fmt.Errorf("error getting Ticketer %w for WithOplog", err) } - err = ticketer.InitTicket(replayable.TableName()) - if err != nil { - return fmt.Errorf("error getting initializing ticket %w for WithOplog", err) - } ticket, err := ticketer.GetTicket(replayable.TableName()) if err != nil { return fmt.Errorf("error getting ticket %w for WithOplog", err) diff --git a/internal/oplog/migrations/postgres/01_db.down.sql b/internal/oplog/migrations/postgres/01_db.down.sql deleted file mode 100644 index 6420e36ac3..0000000000 --- a/internal/oplog/migrations/postgres/01_db.down.sql +++ /dev/null @@ -1,5 +0,0 @@ -drop table if exists oplog_entry; -drop table if exists oplog_metadata; -drop table if exists oplog_ticket; -drop index if exists idx_oplog_metatadata_key; -drop index if exists idx_oplog_metatadata_value \ No newline at end of file diff --git a/internal/oplog/migrations/postgres/01_db.up.sql b/internal/oplog/migrations/postgres/01_db.up.sql deleted file mode 100644 index b9d478142a..0000000000 --- a/internal/oplog/migrations/postgres/01_db.up.sql +++ /dev/null @@ -1,24 +0,0 @@ -CREATE TABLE if not exists oplog_entry ( - id bigint generated always as identity primary key, - create_time timestamp with time zone default current_timestamp, - update_time timestamp with time zone default current_timestamp, - version text NOT NULL, - aggregate_name text NOT NULL, - "data" bytea NOT NULL -); -CREATE TABLE if not exists oplog_ticket ( - id bigint generated always as identity primary key, - create_time timestamp with time zone default current_timestamp, - update_time timestamp with time zone default current_timestamp, - "name" text NOT NULL UNIQUE, - "version" bigint NOT NULL -); -CREATE TABLE if not exists oplog_metadata ( - id bigint generated always as identity primary key, - create_time timestamp with time zone default current_timestamp, - entry_id bigint NOT NULL REFERENCES oplog_entry(id) ON DELETE CASCADE ON UPDATE CASCADE, - "key" text NOT NULL, - value text NULL -); -create index if not exists idx_oplog_metatadata_key on oplog_metadata(key); -create index if not exists idx_oplog_metatadata_value on oplog_metadata(value); \ No newline at end of file diff --git a/internal/oplog/oplog_test.go b/internal/oplog/oplog_test.go index cf562ed408..15ba7c40b5 100644 --- a/internal/oplog/oplog_test.go +++ b/internal/oplog/oplog_test.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "strings" - "sync" "testing" wrapping "github.com/hashicorp/go-kms-wrapping" @@ -23,62 +22,29 @@ import ( "github.com/ory/dockertest/v3" ) -// Need a way to manage the shared database resource for these oplog -// tests, so runningTests gives us a waitgroup to do this and clean up -// the shared database resource when we're done. -var runningTests sync.WaitGroup - -// startTest signals that we're starting a test that uses the shared test resources -func startTest() { - runningTests.Add(1) -} - -// completeTest signals that we've finished a test that uses the shared test resources -func completeTest() { - runningTests.Done() -} - -// waitForTests will wait for all the tests that are sharing resources like the database -func waitForTests() { - runningTests.Wait() -} - -// testDatabaseURL is initialized once using sync.Once and set to the database URL for testing -var testDatabaseURL string - -// testInitDatabase ensures that the database is only initialized once during the tests. -var testInitDatabase sync.Once - // setup the tests (initialize the database one-time and intialized testDatabaseURL) -func setup(t *testing.T) (func(), string) { +func setup(t *testing.T) (func(), *gorm.DB) { cleanup := func() {} var url string var err error - testInitDatabase.Do(func() { - cleanup, url, err = initDbInDocker(t) - if err != nil { - panic(err) - } - testDatabaseURL = url - db, err := test_dbconn(url) - if err != nil { - panic(err) - } - defer db.Close() - oplog_test.Init(db) - }) - return cleanup, testDatabaseURL + + cleanup, url, err = initDbInDocker(t) + if err != nil { + t.Fatal(err) + } + db, err := gorm.Open("postgres", url) + if err != nil { + t.Fatal(err) + } + oplog_test.Init(db) + + return cleanup, db } // Test_BasicOplog provides some basic unit tests for oplogs func Test_BasicOplog(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() t.Run("EncryptData/DecryptData/UnmarshalData", func(t *testing.T) { @@ -163,11 +129,8 @@ func Test_BasicOplog(t *testing.T) { resp := db.Create(&user) assert.NilError(t, resp.Error) - ticketName, err := uuid.GenerateUUID() assert.NilError(t, err) - err = ticketer.InitTicket(ticketName) - assert.NilError(t, err) - ticket, err := ticketer.GetTicket(ticketName) + ticket, err := ticketer.GetTicket("default") assert.NilError(t, err) queue = Queue{} @@ -195,13 +158,8 @@ func Test_BasicOplog(t *testing.T) { // Test_NewEntry provides some basic unit tests for NewEntry func Test_NewEntry(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() t.Run("valid", func(t *testing.T) { cipherer := initWrapper(t) @@ -283,13 +241,8 @@ func Test_NewEntry(t *testing.T) { } func Test_UnmarshalData(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() cipherer := initWrapper(t) @@ -402,13 +355,8 @@ func Test_UnmarshalData(t *testing.T) { // Test_Replay provides some basic unit tests for replaying entries func Test_Replay(t *testing.T) { - startTest() - t.Parallel() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() cipherer := initWrapper(t) @@ -423,17 +371,12 @@ func Test_Replay(t *testing.T) { replayUserTable := fmt.Sprintf("%s%s", testUser.TableName(), tableSuffix) defer func() { assert.NilError(t, writer.dropTableIfExists(replayUserTable)) }() - ticketName, err := uuid.GenerateUUID() - assert.NilError(t, err) ticketer, err := NewGormTicketer(db, WithAggregateNames(true)) assert.NilError(t, err) - err = ticketer.InitTicket(ticketName) - assert.NilError(t, err) - t.Run("replay:create/update", func(t *testing.T) { - ticket, err := ticketer.GetTicket(ticketName) + ticket, err := ticketer.GetTicket("default") assert.NilError(t, err) newLogEntry, err := NewEntry( @@ -515,7 +458,7 @@ func Test_Replay(t *testing.T) { // we need to test delete replays now... tx2 := db.Begin() - ticket2, err := ticketer.GetTicket(ticketName) + ticket2, err := ticketer.GetTicket("default") assert.NilError(t, err) id4, err := uuid.GenerateUUID() @@ -578,13 +521,8 @@ func Test_Replay(t *testing.T) { // Test_WriteEntryWith provides unit tests for oplog.WriteEntryWith func Test_WriteEntryWith(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() cipherer := initWrapper(t) @@ -603,14 +541,10 @@ func Test_WriteEntryWith(t *testing.T) { } t.Log(&u2) - ticketName, err := uuid.GenerateUUID() - assert.NilError(t, err) ticketer, err := NewGormTicketer(db, WithAggregateNames(true)) assert.NilError(t, err) - err = ticketer.InitTicket(ticketName) - assert.NilError(t, err) - ticket, err := ticketer.GetTicket(ticketName) + ticket, err := ticketer.GetTicket("default") assert.NilError(t, err) t.Run("successful", func(t *testing.T) { @@ -695,23 +629,13 @@ func Test_WriteEntryWith(t *testing.T) { // Test_TicketSerialization provides unit tests for making sure oplog.Tickets properly serialize writes to oplog entries func Test_TicketSerialization(t *testing.T) { - startTest() - t.Parallel() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() - ticketName, err := uuid.GenerateUUID() - assert.NilError(t, err) ticketer, err := NewGormTicketer(db, WithAggregateNames(true)) assert.NilError(t, err) - // in it's own transaction, init the ticket - _ = ticketer.InitTicket(ticketName) - cipherer := initWrapper(t) id, err := uuid.GenerateUUID() @@ -722,7 +646,7 @@ func Test_TicketSerialization(t *testing.T) { } err = firstTx.Create(&firstUser).Error assert.NilError(t, err) - firstTicket, err := ticketer.GetTicket(ticketName) + firstTicket, err := ticketer.GetTicket("default") assert.NilError(t, err) firstQueue := Queue{} @@ -750,7 +674,7 @@ func Test_TicketSerialization(t *testing.T) { } err = secondTx.Create(&secondUser).Error assert.NilError(t, err) - secondTicket, err := ticketer.GetTicket(ticketName) + secondTicket, err := ticketer.GetTicket("default") assert.NilError(t, err) secondQueue := Queue{} @@ -805,7 +729,7 @@ func initDbInDocker(t *testing.T) (cleanup func(), retURL string, err error) { } c := func() { - cleanupResource(t, pool, resource) + cleanupResource(pool, resource) } url := fmt.Sprintf("postgres://postgres:secret@localhost:%s?sslmode=disable", resource.GetPort("5432/tcp")) @@ -848,7 +772,7 @@ func initWrapper(t *testing.T) wrapping.Wrapper { // initTestStore will execute the migrations needed to initialize the store for tests func initTestStore(t *testing.T, cleanup func(), url string) { // run migrations - m, err := migrate.New("file://migrations/postgres", url) + m, err := migrate.New("file://../db/migrations/postgres", url) if err != nil { cleanup() t.Fatalf("Error creating migrations: %s", err) @@ -860,22 +784,16 @@ func initTestStore(t *testing.T, cleanup func(), url string) { } // cleanupResource will clean up the dockertest resources (postgres) -func cleanupResource(t *testing.T, pool *dockertest.Pool, resource *dockertest.Resource) { - waitForTests() +func cleanupResource(pool *dockertest.Pool, resource *dockertest.Resource) error { var err error for i := 0; i < 10; i++ { err = pool.Purge(resource) if err == nil { - return + return nil } } - if strings.Contains(err.Error(), "No such container") { - return + return nil } - t.Fatalf("Failed to cleanup local container: %s", err) -} - -func test_dbconn(url string) (*gorm.DB, error) { - return gorm.Open("postgres", url) + return fmt.Errorf("Failed to cleanup local container: %s", err) } diff --git a/internal/oplog/ticketer.go b/internal/oplog/ticketer.go index c5f0253ddf..53042964cd 100644 --- a/internal/oplog/ticketer.go +++ b/internal/oplog/ticketer.go @@ -18,15 +18,16 @@ const DefaultAggregateName = "global" // Ticketer provides an interface to storage for Tickets, so you can easily substitute your own ticketer type Ticketer interface { - // InitTicket initializes a ticket. You MUST initialize a ticket in its own transaction (and commit it), - // before you GetTicket in a later transaction to write to the oplog. InitTicket will check to see if - // the ticket has already been initialized before creating a new one. - InitTicket(aggregateName string) error - // GetTicket returns a ticket for the specified name. You MUST GetTicket in the same transaction - // that you're using to write to the database tables. Names allow us to shard tickets around domain root names + // that you're using to write to the database tables. Names allow us to shard tickets around domain root names. + // Before getting a ticket you must insert it with it's name into the oplog_ticket table. This is done via a + // db migrations script. Requiring this insert as part of migrations ensures that the tickets are initialized in + // a separate transaction from when a client calls GetTicket(aggregateName) which is critical for the optimized locking + // pattern to work properly GetTicket(aggregateName string) (*store.Ticket, error) + // Redeem ticket will attempt to redeem the ticket and ensure it's serialized with other tickets using the same + // aggregate name Redeem(ticket *store.Ticket) error } @@ -66,34 +67,6 @@ func (ticketer *GormTicketer) GetTicket(aggregateName string) (*store.Ticket, er return &ticket, nil } -// InitTicket initializes a ticket. You MUST initialize a ticket in its own transaction (and commit it), -// before you GetTicket in a later transaction to write to the oplog. InitTicket will check to see if -// the ticket has already been initialized before creating a new one. -func (ticketer *GormTicketer) InitTicket(aggregateName string) error { - // no existing ticket found, so let's initialize a new one - if aggregateName == "" { - return fmt.Errorf("bad ticket name") - } - // check to see if a ticket has already been initialized - existingTicket, err := ticketer.GetTicket(aggregateName) - if err == nil && existingTicket != nil { - return nil // found an existing intialized ticket without errors - } - if err != ErrTicketNotFound { - return fmt.Errorf("error retreiving ticket from storage: %w", err) - } - - name := DefaultAggregateName - if ticketer.withAggregateNames { - name = aggregateName - } - newTicket := store.Ticket{Name: name, Version: 1} - if err := ticketer.tx.Create(&newTicket).Error; err != nil { - return fmt.Errorf("error creating ticket in storage: %w", err) - } - return nil -} - // Redeem will attempt to redeem the ticket. If the ticket version has already been used, then an error is returned func (ticketer *GormTicketer) Redeem(t *store.Ticket) error { if t == nil { diff --git a/internal/oplog/ticketer_gorm_test.go b/internal/oplog/ticketer_gorm_test.go index 0a1d5b2bcc..68fa3d5cc3 100644 --- a/internal/oplog/ticketer_gorm_test.go +++ b/internal/oplog/ticketer_gorm_test.go @@ -3,19 +3,13 @@ package oplog import ( "testing" - "github.com/hashicorp/go-uuid" "gotest.tools/assert" ) // Test_NewGormTicketer provides unit tests for creating a Gorm ticketer func Test_NewGormTicketer(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() t.Run("valid", func(t *testing.T) { ticketer, err := NewGormTicketer(db, WithAggregateNames(true)) @@ -30,27 +24,16 @@ func Test_NewGormTicketer(t *testing.T) { // Test_GetTicket provides unit tests for getting oplog.Tickets func Test_GetTicket(t *testing.T) { - startTest() - t.Parallel() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() - - ticketName, err := uuid.GenerateUUID() - assert.NilError(t, err) ticketer, err := NewGormTicketer(db, WithAggregateNames(true)) assert.NilError(t, err) - err = ticketer.InitTicket(ticketName) - assert.NilError(t, err) - t.Run("valid", func(t *testing.T) { - ticket, err := ticketer.GetTicket(ticketName) + ticket, err := ticketer.GetTicket("default") assert.NilError(t, err) - assert.Equal(t, ticket.Name, ticketName) + assert.Equal(t, ticket.Name, "default") assert.Check(t, ticket.Version != 0) }) @@ -61,58 +44,17 @@ func Test_GetTicket(t *testing.T) { }) } -// Test_InitTicket provides unit tests for initializing tickets -func Test_InitTicket(t *testing.T) { - startTest() - t.Parallel() - cleanup, url := setup(t) - defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) - defer db.Close() - - ticketName, err := uuid.GenerateUUID() - assert.NilError(t, err) - ticketer, err := NewGormTicketer(db, WithAggregateNames(true)) - assert.NilError(t, err) - - t.Run("valid", func(t *testing.T) { - err = ticketer.InitTicket(ticketName) - assert.NilError(t, err) - }) - - t.Run("no name", func(t *testing.T) { - err = ticketer.InitTicket("") - assert.Equal(t, err.Error(), "bad ticket name") - }) -} - // Test_Redeem provides unit tests for redeeming tickets func Test_Redeem(t *testing.T) { - startTest() - t.Parallel() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() - - ticketName, err := uuid.GenerateUUID() - assert.NilError(t, err) - ticketer, err := NewGormTicketer(db, WithAggregateNames(true)) - assert.NilError(t, err) - - // in it's own transaction, init the ticket - _ = ticketer.InitTicket(ticketName) - t.Run("valid", func(t *testing.T) { tx := db.Begin() ticketer, err := NewGormTicketer(tx, WithAggregateNames(true)) assert.NilError(t, err) - ticket, err := ticketer.GetTicket(ticketName) + ticket, err := ticketer.GetTicket("default") assert.NilError(t, err) err = ticketer.Redeem(ticket) @@ -134,13 +76,13 @@ func Test_Redeem(t *testing.T) { ticketer, err := NewGormTicketer(tx, WithAggregateNames(true)) assert.NilError(t, err) - ticket, err := ticketer.GetTicket(ticketName) + ticket, err := ticketer.GetTicket("default") assert.NilError(t, err) secondTx := db.Begin() secondTicketer, err := NewGormTicketer(secondTx, WithAggregateNames(true)) assert.NilError(t, err) - secondTicket, err := secondTicketer.GetTicket(ticketName) + secondTicket, err := secondTicketer.GetTicket("default") assert.NilError(t, err) err = ticketer.Redeem(ticket) diff --git a/internal/oplog/writer_test.go b/internal/oplog/writer_test.go index e4b7dec217..cbcc5bea67 100644 --- a/internal/oplog/writer_test.go +++ b/internal/oplog/writer_test.go @@ -11,13 +11,8 @@ import ( // Test_GormWriterCreate provides unit tests for GormWriter Create func Test_GormWriterCreate(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() t.Run("valid", func(t *testing.T) { tx := db.Begin() @@ -54,7 +49,7 @@ func Test_GormWriterCreate(t *testing.T) { tx := db.Begin() defer tx.Rollback() w := GormWriter{tx} - err = w.Create(nil) + err := w.Create(nil) assert.Check(t, err != nil) assert.Equal(t, err.Error(), "create interface is nil") }) @@ -62,13 +57,8 @@ func Test_GormWriterCreate(t *testing.T) { // Test_GormWriterDelete provides unit tests for GormWriter Delete func Test_GormWriterDelete(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() t.Run("valid", func(t *testing.T) { tx := db.Begin() @@ -108,7 +98,7 @@ func Test_GormWriterDelete(t *testing.T) { tx := db.Begin() defer tx.Rollback() w := GormWriter{tx} - err = w.Delete(nil) + err := w.Delete(nil) assert.Check(t, err != nil) assert.Equal(t, err.Error(), "delete interface is nil") }) @@ -116,13 +106,8 @@ func Test_GormWriterDelete(t *testing.T) { // Test_GormWriterUpdate provides unit tests for GormWriter Update func Test_GormWriterUpdate(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() t.Run("valid no fieldmask", func(t *testing.T) { tx := db.Begin() @@ -213,7 +198,7 @@ func Test_GormWriterUpdate(t *testing.T) { tx := db.Begin() defer tx.Rollback() w := GormWriter{tx} - err = w.Update(nil, nil) + err := w.Update(nil, nil) assert.Check(t, err != nil) assert.Equal(t, err.Error(), "update interface is nil") }) @@ -221,15 +206,9 @@ func Test_GormWriterUpdate(t *testing.T) { // Test_GormWriterHasTable provides unit tests for GormWriter HasTable func Test_GormWriterHasTable(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() - w := GormWriter{Tx: db} t.Run("success", func(t *testing.T) { @@ -250,15 +229,9 @@ func Test_GormWriterHasTable(t *testing.T) { // Test_GormWriterCreateTable provides unit tests for GormWriter CreateTable func Test_GormWriterCreateTable(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() - t.Run("success", func(t *testing.T) { w := GormWriter{Tx: db} suffix, err := uuid.GenerateUUID() @@ -299,7 +272,7 @@ func Test_GormWriterCreateTable(t *testing.T) { t.Run("blank name", func(t *testing.T) { w := GormWriter{Tx: db} u := &oplog_test.TestUser{} - err = w.createTableLike(u.TableName(), "") + err := w.createTableLike(u.TableName(), "") assert.Check(t, err != nil) assert.Error(t, err, err.Error(), nil) assert.Equal(t, err.Error(), "error newTableName is empty string") @@ -308,15 +281,9 @@ func Test_GormWriterCreateTable(t *testing.T) { // Test_GormWriterDropTableIfExists provides unit tests for GormWriter DropTableIfExists func Test_GormWriterDropTableIfExists(t *testing.T) { - t.Parallel() - startTest() - cleanup, url := setup(t) + cleanup, db := setup(t) defer cleanup() - defer completeTest() // must come after the "defer cleanup()" - db, err := test_dbconn(url) - assert.NilError(t, err) defer db.Close() - t.Run("success", func(t *testing.T) { w := GormWriter{Tx: db} suffix, err := uuid.GenerateUUID()