From 8ab9ffbcac2a1d5cff6eb7bddda942d9b006da6e Mon Sep 17 00:00:00 2001 From: Hugo Vieira Date: Tue, 6 Sep 2022 16:01:52 +0100 Subject: [PATCH] refact(cmd): Encapsulate some functionality on Command and Server This commit extracts 3 separate pieces of functionality into their own function/method: - Extracts the database opening with the intention to have a function that performs all that functionality but returns the created database object rather than setting it to the Server structure. - Extracts database validation logic, to provide a common code path for upcoming functionality. - Expose schema manager on the Command struct as well as providing common functions to acquire shared lock/close. --- internal/cmd/base/dev.go | 2 +- internal/cmd/base/servers.go | 28 +++- internal/cmd/commands/database/funcs_test.go | 2 +- internal/cmd/commands/database/init.go | 2 +- internal/cmd/commands/server/server.go | 132 +++++++++++++------ internal/daemon/controller/testing.go | 2 +- 6 files changed, 117 insertions(+), 51 deletions(-) diff --git a/internal/cmd/base/dev.go b/internal/cmd/base/dev.go index 3e83670dde..4df533da25 100644 --- a/internal/cmd/base/dev.go +++ b/internal/cmd/base/dev.go @@ -93,7 +93,7 @@ func (b *Server) CreateDevDatabase(ctx context.Context, opt ...Option) error { b.Info["dev database container"] = strings.TrimPrefix(container, "/") } - if err := b.ConnectToDatabase(ctx, dialect); err != nil { + if err := b.OpenAndSetServerDatabase(ctx, dialect); err != nil { if c != nil { err = multierror.Append(err, c()) } diff --git a/internal/cmd/base/servers.go b/internal/cmd/base/servers.go index c99b7659b3..433d3ceb36 100644 --- a/internal/cmd/base/servers.go +++ b/internal/cmd/base/servers.go @@ -678,11 +678,26 @@ func (b *Server) RunShutdownFuncs() error { return mErr.ErrorOrNil() } -func (b *Server) ConnectToDatabase(ctx context.Context, dialect string) error { +// OpenAndSetServerDatabase calls OpenDatabase and sets its result *db.DB to the Server's +// `Database` field. +func (b *Server) OpenAndSetServerDatabase(ctx context.Context, dialect string) error { + dbase, err := b.OpenDatabase(ctx, dialect, b.DatabaseUrl) + if err != nil { + return err + } + b.Database = dbase + return nil +} + +// OpenDatabase creates a database connection with the given URL and returns it to the caller. +// It supports various configuration options - The values must be set on the Server object +// beforehand. +func (b *Server) OpenDatabase(ctx context.Context, dialect, url string) (*db.DB, error) { dbType, err := db.StringToDbType(dialect) if err != nil { - return fmt.Errorf("unable to create db object with dialect %s: %w", dialect, err) + return nil, fmt.Errorf("unable to create db object with dialect %s: %w", dialect, err) } + opts := []db.Option{ db.WithMaxOpenConnections(b.DatabaseMaxOpenConnections), db.WithMaxIdleConnections(b.DatabaseMaxIdleConnections), @@ -691,12 +706,13 @@ func (b *Server) ConnectToDatabase(ctx context.Context, dialect string) error { if os.Getenv("BOUNDARY_DISABLE_GORM_FORMATTER") == "" { opts = append(opts, db.WithGormFormatter(b.Logger)) } - dbase, err := db.Open(ctx, dbType, b.DatabaseUrl, opts...) + + dbase, err := db.Open(ctx, dbType, url, opts...) if err != nil { - return fmt.Errorf("unable to create db object with dialect %s: %w", dialect, err) + return nil, fmt.Errorf("unable to create db object with dialect %s: %w", dialect, err) } - b.Database = dbase - return nil + + return dbase, nil } func (b *Server) CreateGlobalKmsKeys(ctx context.Context) error { diff --git a/internal/cmd/commands/database/funcs_test.go b/internal/cmd/commands/database/funcs_test.go index ba99537aa0..5fc795fd85 100644 --- a/internal/cmd/commands/database/funcs_test.go +++ b/internal/cmd/commands/database/funcs_test.go @@ -226,7 +226,7 @@ func TestVerifyOplogIsEmpty(t *testing.T) { cmd := InitCommand{Server: base.NewServer(base.NewCommand(cli.NewMockUi()))} cmd.DatabaseUrl = u - require.NoError(t, cmd.ConnectToDatabase(ctx, dialect)) + require.NoError(t, cmd.OpenAndSetServerDatabase(ctx, dialect)) assert.NoError(t, cmd.verifyOplogIsEmpty(ctx)) } diff --git a/internal/cmd/commands/database/init.go b/internal/cmd/commands/database/init.go index 0dfa35a530..8664b55923 100644 --- a/internal/cmd/commands/database/init.go +++ b/internal/cmd/commands/database/init.go @@ -287,7 +287,7 @@ func (c *InitCommand) Run(args []string) (retCode int) { return base.CommandUserError } // Everything after is done with normal database URL and is affecting actual data - if err := c.ConnectToDatabase(c.Context, dialect); err != nil { + if err := c.OpenAndSetServerDatabase(c.Context, dialect); err != nil { c.UI.Error(fmt.Errorf("Error connecting to database after migrations: %w", err).Error()) return base.CommandCliError } diff --git a/internal/cmd/commands/server/server.go b/internal/cmd/commands/server/server.go index ce3c08c481..8973331494 100644 --- a/internal/cmd/commands/server/server.go +++ b/internal/cmd/commands/server/server.go @@ -50,9 +50,11 @@ type Command struct { SighupCh chan struct{} SigUSR2Ch chan struct{} - Config *config.Config - controller *controller.Controller - worker *worker.Worker + Config *config.Config + + schemaManager *schema.Manager + controller *controller.Controller + worker *worker.Worker flagConfig string flagConfigKms string @@ -395,60 +397,40 @@ func (c *Command) Run(args []string) int { c.DatabaseMaxIdleConnections = c.Config.Controller.Database.MaxIdleConnections c.DatabaseConnMaxIdleTimeDuration = c.Config.Controller.Database.ConnMaxIdleTimeDuration - if err := c.ConnectToDatabase(c.Context, "postgres"); err != nil { + if err := c.OpenAndSetServerDatabase(c.Context, "postgres"); err != nil { c.UI.Error(fmt.Errorf("Error connecting to database: %w", err).Error()) return base.CommandCliError } - underlyingDB, err := c.Database.SqlDB(c.Context) - if err != nil { - c.UI.Error(fmt.Errorf("Can't get db: %w.", err).Error()) - return base.CommandCliError - } - sMan, err := schema.NewManager(c.Context, "postgres", underlyingDB) + sm, err := acquireDatabaseSharedLock(c.Context, c.Server.Database) if err != nil { - c.UI.Error(fmt.Errorf("Can't get schema manager: %w.", err).Error()) - return base.CommandCliError - } - // This is an advisory locks on the DB which is released when the db session ends. - if err := sMan.SharedLock(c.Context); err != nil { - c.UI.Error(fmt.Errorf("Unable to gain shared access to the database: %w", err).Error()) + c.UI.Error(fmt.Errorf("Failed to acquire database shared lock: %w", err).Error()) return base.CommandCliError } + c.schemaManager = sm + defer func() { + if c.schemaManager == nil { + c.UI.Error("no schema manager to unlock database with") + return + } + // The base context has already been canceled so we shouldn't use it here. // 1 second is chosen so the shutdown is still responsive and this is a mostly // non critical step since the lock should be released when the session with the // database is closed. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - if err := sMan.Close(ctx); err != nil { + + err := c.schemaManager.Close(ctx) + if err != nil { c.UI.Error(fmt.Errorf("Unable to release shared lock to the database: %w", err).Error()) } }() - ckState, err := sMan.CurrentState(c.Context) + + err = verifyDatabaseState(c.Context, c.Server.Database, c.schemaManager) if err != nil { - c.UI.Error(fmt.Errorf("Error checking schema state: %w", err).Error()) - return base.CommandCliError - } - if !ckState.Initialized { - c.UI.Error(base.WrapAtLength("The database has not been initialized. Please run 'boundary database init'.")) - return base.CommandCliError - } - if !ckState.MigrationsApplied() { - for _, e := range ckState.Editions { - if e.DatabaseSchemaState == schema.Ahead { - c.UI.Error(base.WrapAtLength(fmt.Sprintf("Newer schema version (%s %d) "+ - "than this binary expects. Please use a newer version of the boundary "+ - "binary.", e.Name, e.DatabaseSchemaVersion))) - return base.CommandCliError - } - } - c.UI.Error(base.WrapAtLength("Database schema must be updated to use this version. Run 'boundary database migrate' to update the database. NOTE: Boundary does not currently support live migration; ensure all controllers are shut down before running the migration command.")) - return base.CommandCliError - } - if err := c.verifyKmsSetup(); err != nil { - c.UI.Error(base.WrapAtLength("Database is in a bad state. Please revert the database into the last known good state.")) + c.UI.Error(err.Error()) return base.CommandCliError } } @@ -846,9 +828,9 @@ func (c *Command) Reload(newConf *config.Config) error { return reloadErrors.ErrorOrNil() } -func (c *Command) verifyKmsSetup() error { +func verifyKmsSetup(dbase *db.DB) error { const op = "server.(Command).verifyKmsExists" - rw := db.New(c.Database) + rw := db.New(dbase) ctx := context.Background() kmsCache, err := kms.New(ctx, rw, rw) @@ -861,6 +843,74 @@ func (c *Command) verifyKmsSetup() error { return nil } +// acquireDatabaseSharedLock uses the schema manager to acquire a shared lock on +// the database. This is done as a mechanism to disallow running migration commands +// while the database is in use. +func acquireDatabaseSharedLock(ctx context.Context, db *db.DB) (*schema.Manager, error) { + if db == nil { + return nil, fmt.Errorf("nil database") + } + + underlyingDb, err := db.SqlDB(ctx) + if err != nil { + return nil, fmt.Errorf("failed to obtain sql db: %w", err) + } + + manager, err := schema.NewManager(ctx, "postgres", underlyingDb) + if err != nil { + return nil, fmt.Errorf("failed to create new schema manager: %w", err) + } + + // This is an advisory locks on the DB which is released when the db session ends. + err = manager.SharedLock(ctx) + if err != nil { + return nil, fmt.Errorf("failed to gain shared access to the database: %w", err) + } + + return manager, nil +} + +// verifyDatabaseState checks that the migrations and kms setup for the given database are correctly setup. +func verifyDatabaseState(ctx context.Context, db *db.DB, schemaManager *schema.Manager) error { + if db == nil { + return fmt.Errorf("nil database") + } + if schemaManager == nil { + return fmt.Errorf("nil schema manager") + } + + s, err := schemaManager.CurrentState(ctx) + if err != nil { + return fmt.Errorf("failed to get current schema state: %w", err) + } + if !s.Initialized { + return fmt.Errorf("The database has not been initialized. Please ensure your database user " + + "has access to all Boundary tables, or run 'boundary database init' if you haven't initialized " + + "your database for Boundary.") + } + if !s.MigrationsApplied() { + for _, e := range s.Editions { + if e.DatabaseSchemaState == schema.Ahead { + return fmt.Errorf("Newer schema version (%s %d) "+ + "than this binary expects. Please use a newer version of the boundary "+ + "binary.", e.Name, e.DatabaseSchemaVersion) + } + } + return fmt.Errorf("Database schema must be updated to use this version. " + + "Run 'boundary database migrate' to update the database. " + + "NOTE: Boundary does not currently support live migration; " + + "Ensure all controllers are shut down before running the migration command.") + } + + err = verifyKmsSetup(db) + if err != nil { + return fmt.Errorf("Database is in a bad state. Please revert the database "+ + "into the last known good state. (Failed to verify kms setup: %w)", err) + } + + return nil +} + var extraConfigValidationFunc = func(cfg *config.Config) error { if cfg.Controller == nil && cfg.Worker == nil { return stderrors.New("Neither worker nor controller specified in configuration file.") diff --git a/internal/daemon/controller/testing.go b/internal/daemon/controller/testing.go index d83a6ec483..7bd35266a2 100644 --- a/internal/daemon/controller/testing.go +++ b/internal/daemon/controller/testing.go @@ -664,7 +664,7 @@ func TestControllerConfig(t testing.TB, ctx context.Context, tc *TestController, if _, err := schema.MigrateStore(ctx, "postgres", tc.b.DatabaseUrl); err != nil { t.Fatal(err) } - if err := tc.b.ConnectToDatabase(tc.ctx, "postgres"); err != nil { + if err := tc.b.OpenAndSetServerDatabase(tc.ctx, "postgres"); err != nil { t.Fatal(err) } if !opts.DisableKmsKeyCreation {