refactor replay, so that it creates the required tables during replay (#7)

Co-authored-by: Jim Lambert <jimlambrt@Jims-MBP-3.home>
Co-authored-by: Todd Knight <T.Alan.Knight@gmail.com>
pull/9/head
Jim 6 years ago committed by GitHub
parent 0fffb54501
commit fca36970bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -215,7 +215,24 @@ func (e *Entry) Replay(ctx context.Context, tx Writer, types *TypeCatalog, table
}
origTableName := em.TableName()
defer em.SetTableName(origTableName)
em.SetTableName(origTableName + tableSuffix)
/*
how replay will be implemented for snapshots is still very much under discussion.
when we go to implement snapshots we may very well need to refactor this create table
choice... there are many issues with doing the "create" in this manner:
* the perms needed to create a table and possible security issues
* the fk references would be to the original tables, not the new replay tables.
It may be a better choice to just create separate schemas for replay named blue and green
since we need at min of two replay tables definitions. if we went with separate schemas they
could be create with a watchtower cli cmd that had appropriate privs (reducing security issues)
and the separate schemas wouldn't have the fk reference issues mentioned above.
*/
replayTable := origTableName + tableSuffix
if !tx.hasTable(replayTable) {
tx.createTableLike(origTableName, replayTable)
}
em.SetTableName(replayTable)
switch m.OpType {
case OpType_CREATE_OP:
if err := tx.Create(m.Message); err != nil {

@ -415,10 +415,11 @@ func Test_Replay(t *testing.T) {
assert.NilError(t, err)
// setup new tables for replay
tableSuffix := "_" + id
tmpUserModel := &oplog_test.ReplayableTestUser{}
tmpUserModel.SetTableName(fmt.Sprintf("%s%s", tmpUserModel.TableName(), tableSuffix))
db.AutoMigrate(tmpUserModel)
defer db.DropTableIfExists(tmpUserModel)
writer := GormWriter{Tx: db}
testUser := &oplog_test.TestUser{}
replayUserTable := fmt.Sprintf("%s%s", testUser.TableName(), tableSuffix)
defer func() { assert.NilError(t, writer.dropTableIfExists(replayUserTable)) }()
ticketName, err := uuid.GenerateUUID()
assert.NilError(t, err)
@ -451,28 +452,22 @@ func Test_Replay(t *testing.T) {
assert.NilError(t, err)
userName := "foo-" + id3
// create a user that's replayable
userCreate := oplog_test.ReplayableTestUser{
TestUser: oplog_test.TestUser{
Name: userName,
},
userCreate := oplog_test.TestUser{
Name: userName,
}
err = tx.Create(&userCreate).Error
assert.NilError(t, err)
userSave := oplog_test.ReplayableTestUser{
TestUser: oplog_test.TestUser{
Id: userCreate.Id,
Name: userCreate.Name,
Email: userName + "@hashicorp.com",
},
userSave := oplog_test.TestUser{
Id: userCreate.Id,
Name: userCreate.Name,
Email: userName + "@hashicorp.com",
}
err = tx.Save(&userSave).Error
assert.NilError(t, err)
userUpdate := oplog_test.ReplayableTestUser{
TestUser: oplog_test.TestUser{
Id: userCreate.Id,
PhoneNumber: "867-5309",
},
userUpdate := oplog_test.TestUser{
Id: userCreate.Id,
PhoneNumber: "867-5309",
}
err = tx.Model(&userUpdate).Updates(map[string]interface{}{"PhoneNumber": "867-5309"}).Error
assert.NilError(t, err)
@ -484,7 +479,7 @@ func Test_Replay(t *testing.T) {
)
assert.NilError(t, err)
types, err := NewTypeCatalog(Type{new(oplog_test.ReplayableTestUser), "user"})
types, err := NewTypeCatalog(Type{new(oplog_test.TestUser), "user"})
assert.NilError(t, err)
var foundEntry Entry
@ -525,18 +520,14 @@ func Test_Replay(t *testing.T) {
assert.NilError(t, err)
userName2 := "foo-" + id4
// create a user that's replayable
userCreate2 := oplog_test.ReplayableTestUser{
TestUser: oplog_test.TestUser{
Name: userName2,
},
userCreate2 := oplog_test.TestUser{
Name: userName2,
}
err = tx2.Create(&userCreate2).Error
assert.NilError(t, err)
deleteUser2 := oplog_test.ReplayableTestUser{
TestUser: oplog_test.TestUser{
Id: userCreate2.Id,
},
deleteUser2 := oplog_test.TestUser{
Id: userCreate2.Id,
}
err = tx2.Delete(&deleteUser2).Error
assert.NilError(t, err)
@ -565,7 +556,7 @@ func Test_Replay(t *testing.T) {
err = foundEntry2.DecryptData(context.Background())
assert.NilError(t, err)
types, err := NewTypeCatalog(Type{new(oplog_test.ReplayableTestUser), "user"})
types, err := NewTypeCatalog(Type{new(oplog_test.TestUser), "user"})
assert.NilError(t, err)
err = foundEntry2.Replay(context.Background(), &GormWriter{tx2}, types, tableSuffix)

@ -18,31 +18,47 @@ func Reinit(db *gorm.DB) {
Init(db)
}
// ReplayableTestUser is simply that: a user we can replay for tests
// the big diff is that it supports overriding the table name
type ReplayableTestUser struct {
TestUser
Table string `gorm:"-"`
}
// TableName overrides the table name for the test user model
func (u *ReplayableTestUser) TableName() string {
// TableName overrides the table name for TestUser
func (u *TestUser) TableName() string {
if u.Table != "" {
return u.Table
}
return "oplog_test_user"
}
func (u *ReplayableTestUser) SetTableName(name string) {
if name != "" {
u.Table = name
// SetTableName allows the table name to be overridden and makes a TestUser a ReplayableMessage
func (u *TestUser) SetTableName(n string) {
if n != "" {
u.Table = n
}
}
func (*TestUser) TableName() string { return "oplog_test_user" }
// TableName overrides the table name for TestCar
func (c *TestCar) TableName() string {
if c.Table != "" {
return c.Table
}
return "oplog_test_car"
}
// TableName overrides the table name for the test car model
func (*TestCar) TableName() string { return "oplog_test_car" }
// SetTableName allows the table name to be overridden and makes a TestCar a ReplayableMessage
func (c *TestCar) SetTableName(n string) {
if n != "" {
c.Table = n
}
}
// TableName overrids the table name for the test rental model
func (*TestRental) TableName() string { return "oplog_test_rental" }
// TableName overrids the table name for TestRental
func (r *TestRental) TableName() string {
if r.Table != "" {
return r.Table
}
return "oplog_test_rental"
}
// SetTableName allows the table name to be overridden and makes a TestRental a ReplayableMessage
func (r *TestRental) SetTableName(n string) {
if n != "" {
r.Table = n
}
}

@ -38,6 +38,8 @@ type TestUser struct {
Name string `protobuf:"bytes,4,opt,name=name,proto3" json:"name,omitempty"`
PhoneNumber string `protobuf:"bytes,5,opt,name=phone_number,json=phoneNumber,proto3" json:"phone_number,omitempty"`
Email string `protobuf:"bytes,6,opt,name=email,proto3" json:"email,omitempty"`
// @inject_tag: gorm:"-" json:"-"
Table string `protobuf:"bytes,7,opt,name=table,proto3" json:"-" gorm:"-"`
}
func (x *TestUser) Reset() {
@ -100,6 +102,13 @@ func (x *TestUser) GetEmail() string {
return ""
}
func (x *TestUser) GetTable() string {
if x != nil {
return x.Table
}
return ""
}
// TestCar for gorm test car model
type TestCar struct {
state protoimpl.MessageState
@ -110,6 +119,8 @@ type TestCar struct {
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty" gorm:"primary_key"`
Model string `protobuf:"bytes,4,opt,name=model,proto3" json:"model,omitempty"`
Mpg int32 `protobuf:"varint,5,opt,name=mpg,proto3" json:"mpg,omitempty"`
// @inject_tag: gorm:"-" json:"-"
Table string `protobuf:"bytes,6,opt,name=table,proto3" json:"-" gorm:"-"`
}
func (x *TestCar) Reset() {
@ -165,6 +176,13 @@ func (x *TestCar) GetMpg() int32 {
return 0
}
func (x *TestCar) GetTable() string {
if x != nil {
return x.Table
}
return ""
}
// TestRental for gorm test rental model
type TestRental struct {
state protoimpl.MessageState
@ -173,6 +191,8 @@ type TestRental struct {
UserId uint32 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
CarId uint32 `protobuf:"varint,2,opt,name=car_id,json=carId,proto3" json:"car_id,omitempty"`
// @inject_tag: gorm:"-" json:"-"
Table string `protobuf:"bytes,3,opt,name=table,proto3" json:"-" gorm:"-"`
}
func (x *TestRental) Reset() {
@ -221,6 +241,86 @@ func (x *TestRental) GetCarId() uint32 {
return 0
}
func (x *TestRental) GetTable() string {
if x != nil {
return x.Table
}
return ""
}
// TestNonReplayableUser for negative test
type TestNonReplayableUser struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// @inject_tag: gorm:"primary_key"
Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty" gorm:"primary_key"`
Name string `protobuf:"bytes,4,opt,name=name,proto3" json:"name,omitempty"`
PhoneNumber string `protobuf:"bytes,5,opt,name=phone_number,json=phoneNumber,proto3" json:"phone_number,omitempty"`
Email string `protobuf:"bytes,6,opt,name=email,proto3" json:"email,omitempty"`
}
func (x *TestNonReplayableUser) Reset() {
*x = TestNonReplayableUser{}
if protoimpl.UnsafeEnabled {
mi := &file_internal_oplog_oplog_test_oplog_test_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TestNonReplayableUser) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TestNonReplayableUser) ProtoMessage() {}
func (x *TestNonReplayableUser) ProtoReflect() protoreflect.Message {
mi := &file_internal_oplog_oplog_test_oplog_test_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TestNonReplayableUser.ProtoReflect.Descriptor instead.
func (*TestNonReplayableUser) Descriptor() ([]byte, []int) {
return file_internal_oplog_oplog_test_oplog_test_proto_rawDescGZIP(), []int{3}
}
func (x *TestNonReplayableUser) GetId() uint32 {
if x != nil {
return x.Id
}
return 0
}
func (x *TestNonReplayableUser) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *TestNonReplayableUser) GetPhoneNumber() string {
if x != nil {
return x.PhoneNumber
}
return ""
}
func (x *TestNonReplayableUser) GetEmail() string {
if x != nil {
return x.Email
}
return ""
}
var File_internal_oplog_oplog_test_oplog_test_proto protoreflect.FileDescriptor
var file_internal_oplog_oplog_test_oplog_test_proto_rawDesc = []byte{
@ -229,27 +329,38 @@ var file_internal_oplog_oplog_test_oplog_test_proto_rawDesc = []byte{
0x67, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x2d, 0x68, 0x61,
0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2e, 0x77, 0x61, 0x74, 0x63, 0x68, 0x74, 0x6f, 0x77,
0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, 0x6f, 0x70,
0x6c, 0x6f, 0x67, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x22, 0x67, 0x0a, 0x08, 0x54,
0x6c, 0x6f, 0x67, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x76, 0x31, 0x22, 0x7d, 0x0a, 0x08, 0x54,
0x65, 0x73, 0x74, 0x55, 0x73, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18,
0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70,
0x68, 0x6f, 0x6e, 0x65, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0b, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x14,
0x0a, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65,
0x6d, 0x61, 0x69, 0x6c, 0x22, 0x41, 0x0a, 0x07, 0x54, 0x65, 0x73, 0x74, 0x43, 0x61, 0x72, 0x12,
0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12,
0x14, 0x0a, 0x05, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x70, 0x67, 0x18, 0x05, 0x20, 0x01,
0x28, 0x05, 0x52, 0x03, 0x6d, 0x70, 0x67, 0x22, 0x3c, 0x0a, 0x0a, 0x54, 0x65, 0x73, 0x74, 0x52,
0x65, 0x6e, 0x74, 0x61, 0x6c, 0x12, 0x17, 0x0a, 0x07, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x15,
0x0a, 0x06, 0x63, 0x61, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05,
0x63, 0x61, 0x72, 0x49, 0x64, 0x42, 0x46, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x77, 0x61,
0x74, 0x63, 0x68, 0x74, 0x6f, 0x77, 0x65, 0x72, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61,
0x6c, 0x2f, 0x6f, 0x70, 0x6c, 0x6f, 0x67, 0x2f, 0x6f, 0x70, 0x6c, 0x6f, 0x67, 0x5f, 0x74, 0x65,
0x73, 0x74, 0x3b, 0x6f, 0x70, 0x6c, 0x6f, 0x67, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
0x6d, 0x61, 0x69, 0x6c, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x07, 0x20,
0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x22, 0x57, 0x0a, 0x07, 0x54, 0x65,
0x73, 0x74, 0x43, 0x61, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x18, 0x04,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x6d,
0x70, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x6d, 0x70, 0x67, 0x12, 0x14, 0x0a,
0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x61,
0x62, 0x6c, 0x65, 0x22, 0x52, 0x0a, 0x0a, 0x54, 0x65, 0x73, 0x74, 0x52, 0x65, 0x6e, 0x74, 0x61,
0x6c, 0x12, 0x17, 0x0a, 0x07, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0d, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x63, 0x61,
0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x63, 0x61, 0x72, 0x49,
0x64, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x22, 0x74, 0x0a, 0x15, 0x54, 0x65, 0x73, 0x74, 0x4e,
0x6f, 0x6e, 0x52, 0x65, 0x70, 0x6c, 0x61, 0x79, 0x61, 0x62, 0x6c, 0x65, 0x55, 0x73, 0x65, 0x72,
0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64,
0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x6e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x68, 0x6f, 0x6e, 0x65, 0x5f, 0x6e, 0x75,
0x6d, 0x62, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x68, 0x6f, 0x6e,
0x65, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c,
0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x42, 0x46, 0x5a,
0x44, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68,
0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x77, 0x61, 0x74, 0x63, 0x68, 0x74, 0x6f, 0x77, 0x65, 0x72,
0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6f, 0x70, 0x6c, 0x6f, 0x67, 0x2f,
0x6f, 0x70, 0x6c, 0x6f, 0x67, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x3b, 0x6f, 0x70, 0x6c, 0x6f, 0x67,
0x5f, 0x74, 0x65, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -264,11 +375,12 @@ func file_internal_oplog_oplog_test_oplog_test_proto_rawDescGZIP() []byte {
return file_internal_oplog_oplog_test_oplog_test_proto_rawDescData
}
var file_internal_oplog_oplog_test_oplog_test_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_internal_oplog_oplog_test_oplog_test_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_internal_oplog_oplog_test_oplog_test_proto_goTypes = []interface{}{
(*TestUser)(nil), // 0: hashicorp.watchtower.controller.oplog.test.v1.TestUser
(*TestCar)(nil), // 1: hashicorp.watchtower.controller.oplog.test.v1.TestCar
(*TestRental)(nil), // 2: hashicorp.watchtower.controller.oplog.test.v1.TestRental
(*TestUser)(nil), // 0: hashicorp.watchtower.controller.oplog.test.v1.TestUser
(*TestCar)(nil), // 1: hashicorp.watchtower.controller.oplog.test.v1.TestCar
(*TestRental)(nil), // 2: hashicorp.watchtower.controller.oplog.test.v1.TestRental
(*TestNonReplayableUser)(nil), // 3: hashicorp.watchtower.controller.oplog.test.v1.TestNonReplayableUser
}
var file_internal_oplog_oplog_test_oplog_test_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
@ -320,6 +432,18 @@ func file_internal_oplog_oplog_test_oplog_test_proto_init() {
return nil
}
}
file_internal_oplog_oplog_test_oplog_test_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TestNonReplayableUser); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -327,7 +451,7 @@ func file_internal_oplog_oplog_test_oplog_test_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_internal_oplog_oplog_test_oplog_test_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumMessages: 4,
NumExtensions: 0,
NumServices: 0,
},

@ -11,6 +11,8 @@ message TestUser {
string name = 4;
string phone_number = 5;
string email = 6;
// @inject_tag: gorm:"-" json:"-"
string table = 7;
}
// TestCar for gorm test car model
message TestCar {
@ -18,9 +20,22 @@ message TestCar {
uint32 id = 1;
string model = 4;
int32 mpg = 5;
// @inject_tag: gorm:"-" json:"-"
string table = 6;
}
// TestRental for gorm test rental model
message TestRental {
uint32 user_id = 1;
uint32 car_id = 2;
// @inject_tag: gorm:"-" json:"-"
string table = 3;
}
// TestNonReplayableUser for negative test
message TestNonReplayableUser {
// @inject_tag: gorm:"primary_key"
uint32 id = 1;
string name = 4;
string phone_number = 5;
string email = 6;
}

@ -28,6 +28,9 @@ func (r *Queue) Add(m proto.Message, typeName string, t OpType, opt ...Option) e
opts := GetOpts(opt...)
withPaths := opts[optionWithFieldMaskPaths].([]string)
if _, ok := m.(ReplayableMessage); !ok {
return fmt.Errorf("error %T is not a ReplayableMessage", m)
}
value, err := proto.Marshal(m)
if err != nil {
return fmt.Errorf("error marshaling add parameter: %w", err)

@ -85,5 +85,20 @@ func Test_Queue(t *testing.T) {
assert.Check(t, err != nil)
assert.Equal(t, err.Error(), "remove Catalog is nil")
})
t.Run("not replayable", func(t *testing.T) {
u := &oplog_test.TestNonReplayableUser{
Name: "Alice",
PhoneNumber: "867-5309",
Email: "alice@bob.com",
}
err = queue.Add(u, "user", OpType_CREATE_OP)
assert.Check(t, err != nil)
assert.Equal(t, err.Error(), "error *oplog_test.TestNonReplayableUser is not a ReplayableMessage")
})
t.Run("nil message", func(t *testing.T) {
err = queue.Add(nil, "user", OpType_CREATE_OP)
assert.Check(t, err != nil)
assert.Equal(t, err.Error(), "error <nil> is not a ReplayableMessage")
})
}

@ -19,6 +19,15 @@ type Writer interface {
// Delete the entry
Delete(interface{}) error
// HasTable checks if tableName exists
hasTable(tableName string) bool
// CreateTableLike will create a newTableName using the existing table as a starting point
createTableLike(existingTableName string, newTableName string) error
// DropTableIfExists will drop the table if it exists
dropTableIfExists(tableName string) error
}
// GormWriter uses a gorm DB connection for writing
@ -99,3 +108,49 @@ func (w *GormWriter) Delete(i interface{}) error {
}
return nil
}
// HasTable checks if tableName exists
func (w *GormWriter) hasTable(tableName string) bool {
if tableName == "" {
return false
}
return w.Tx.Dialect().HasTable(tableName)
}
// CreateTableLike will create a newTableName like the model's table
// the new table should have all things like the existing model's table (defaults, constraints, indexes, etc)
func (w *GormWriter) createTableLike(existingTableName string, newTableName string) error {
if existingTableName == "" {
return errors.New("error existingTableName is empty string")
}
if newTableName == "" {
return errors.New("error newTableName is empty string")
}
existingTableName = w.Tx.Dialect().Quote(existingTableName)
newTableName = w.Tx.Dialect().Quote(newTableName)
var sql string
switch w.Tx.Dialect().GetName() {
case "postgres":
sql = fmt.Sprintf(
`CREATE TABLE %s ( LIKE %s INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING INDEXES );`,
newTableName,
existingTableName,
)
case "mysql":
sql = fmt.Sprintf("CREATE TABLE %s LIKE %s",
newTableName,
existingTableName,
)
default:
return errors.New("error unsupported RDBMS")
}
return w.Tx.Exec(sql).Error
}
// DropTableIfExists will drop the table if it exists
func (w *GormWriter) dropTableIfExists(tableName string) error {
if tableName == "" {
return errors.New("cannot drop table whose name is an empty string")
}
return w.Tx.DropTableIfExists(tableName).Error
}

@ -218,3 +218,120 @@ func Test_GormWriterUpdate(t *testing.T) {
assert.Equal(t, err.Error(), "update interface is nil")
})
}
// Test_GormWriterHasTable provides unit tests for GormWriter HasTable
func Test_GormWriterHasTable(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
w := GormWriter{Tx: db}
t.Run("success", func(t *testing.T) {
ok := w.hasTable("oplog_test_user")
assert.Equal(t, ok, true)
})
t.Run("no table", func(t *testing.T) {
badTableName, err := uuid.GenerateUUID()
assert.NilError(t, err)
ok := w.hasTable(badTableName)
assert.Equal(t, ok, false)
})
t.Run("blank table name", func(t *testing.T) {
ok := w.hasTable("")
assert.Equal(t, ok, false)
})
}
// Test_GormWriterCreateTable provides unit tests for GormWriter CreateTable
func Test_GormWriterCreateTable(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("success", func(t *testing.T) {
w := GormWriter{Tx: db}
suffix, err := uuid.GenerateUUID()
assert.NilError(t, err)
u := &oplog_test.TestUser{}
newTableName := u.TableName() + "_" + suffix
defer func() { assert.NilError(t, w.dropTableIfExists(newTableName)) }()
err = w.createTableLike(u.TableName(), newTableName)
assert.NilError(t, err)
})
t.Run("call twice", func(t *testing.T) {
w := GormWriter{Tx: db}
suffix, err := uuid.GenerateUUID()
assert.NilError(t, err)
u := &oplog_test.TestUser{}
newTableName := u.TableName() + "_" + suffix
defer func() { assert.NilError(t, w.dropTableIfExists(newTableName)) }()
err = w.createTableLike(u.TableName(), newTableName)
assert.NilError(t, err)
// should be an error to create the same table twice
err = w.createTableLike(u.TableName(), newTableName)
assert.Check(t, err != nil)
assert.Error(t, err, err.Error(), nil)
})
t.Run("empty existing", func(t *testing.T) {
w := GormWriter{Tx: db}
suffix, err := uuid.GenerateUUID()
assert.NilError(t, err)
u := &oplog_test.TestUser{}
newTableName := u.TableName() + "_" + suffix
defer func() { assert.NilError(t, w.dropTableIfExists(newTableName)) }()
err = w.createTableLike("", newTableName)
assert.Check(t, err != nil)
assert.Error(t, err, err.Error(), nil)
assert.Equal(t, err.Error(), "error existingTableName is empty string")
})
t.Run("blank name", func(t *testing.T) {
w := GormWriter{Tx: db}
u := &oplog_test.TestUser{}
err = w.createTableLike(u.TableName(), "")
assert.Check(t, err != nil)
assert.Error(t, err, err.Error(), nil)
assert.Equal(t, err.Error(), "error newTableName is empty string")
})
}
// Test_GormWriterDropTableIfExists provides unit tests for GormWriter DropTableIfExists
func Test_GormWriterDropTableIfExists(t *testing.T) {
t.Parallel()
startTest()
cleanup, url := setup(t)
defer cleanup()
defer completeTest() // must come after the "defer cleanup()"
db, err := test_dbconn(url)
assert.NilError(t, err)
defer db.Close()
t.Run("success", func(t *testing.T) {
w := GormWriter{Tx: db}
suffix, err := uuid.GenerateUUID()
assert.NilError(t, err)
u := &oplog_test.TestUser{}
newTableName := u.TableName() + "_" + suffix
err = w.createTableLike(u.TableName(), newTableName)
assert.NilError(t, err)
defer func() { assert.NilError(t, w.dropTableIfExists(newTableName)) }()
})
t.Run("success with blank", func(t *testing.T) {
w := GormWriter{Tx: db}
err := w.dropTableIfExists("")
assert.Check(t, err != nil)
assert.Equal(t, err.Error(), "cannot drop table whose name is an empty string")
})
}

Loading…
Cancel
Save