refact(cmd): Encapsulate some functionality on Command and Server

This commit extracts 3 separate pieces of functionality into their own
function/method:

- Extracts the database opening with the intention to have a function
  that performs all that functionality but returns the created database
  object rather than setting it to the Server structure.
- Extracts database validation logic, to provide a common code path for
  upcoming functionality.
- Expose schema manager on the Command struct as well as providing
  common functions to acquire shared lock/close.
pull/2422/head
Hugo Vieira 4 years ago
parent 621f8a9ff2
commit 8ab9ffbcac

@ -93,7 +93,7 @@ func (b *Server) CreateDevDatabase(ctx context.Context, opt ...Option) error {
b.Info["dev database container"] = strings.TrimPrefix(container, "/")
}
if err := b.ConnectToDatabase(ctx, dialect); err != nil {
if err := b.OpenAndSetServerDatabase(ctx, dialect); err != nil {
if c != nil {
err = multierror.Append(err, c())
}

@ -678,11 +678,26 @@ func (b *Server) RunShutdownFuncs() error {
return mErr.ErrorOrNil()
}
func (b *Server) ConnectToDatabase(ctx context.Context, dialect string) error {
// OpenAndSetServerDatabase calls OpenDatabase and sets its result *db.DB to the Server's
// `Database` field.
func (b *Server) OpenAndSetServerDatabase(ctx context.Context, dialect string) error {
dbase, err := b.OpenDatabase(ctx, dialect, b.DatabaseUrl)
if err != nil {
return err
}
b.Database = dbase
return nil
}
// OpenDatabase creates a database connection with the given URL and returns it to the caller.
// It supports various configuration options - The values must be set on the Server object
// beforehand.
func (b *Server) OpenDatabase(ctx context.Context, dialect, url string) (*db.DB, error) {
dbType, err := db.StringToDbType(dialect)
if err != nil {
return fmt.Errorf("unable to create db object with dialect %s: %w", dialect, err)
return nil, fmt.Errorf("unable to create db object with dialect %s: %w", dialect, err)
}
opts := []db.Option{
db.WithMaxOpenConnections(b.DatabaseMaxOpenConnections),
db.WithMaxIdleConnections(b.DatabaseMaxIdleConnections),
@ -691,12 +706,13 @@ func (b *Server) ConnectToDatabase(ctx context.Context, dialect string) error {
if os.Getenv("BOUNDARY_DISABLE_GORM_FORMATTER") == "" {
opts = append(opts, db.WithGormFormatter(b.Logger))
}
dbase, err := db.Open(ctx, dbType, b.DatabaseUrl, opts...)
dbase, err := db.Open(ctx, dbType, url, opts...)
if err != nil {
return fmt.Errorf("unable to create db object with dialect %s: %w", dialect, err)
return nil, fmt.Errorf("unable to create db object with dialect %s: %w", dialect, err)
}
b.Database = dbase
return nil
return dbase, nil
}
func (b *Server) CreateGlobalKmsKeys(ctx context.Context) error {

@ -226,7 +226,7 @@ func TestVerifyOplogIsEmpty(t *testing.T) {
cmd := InitCommand{Server: base.NewServer(base.NewCommand(cli.NewMockUi()))}
cmd.DatabaseUrl = u
require.NoError(t, cmd.ConnectToDatabase(ctx, dialect))
require.NoError(t, cmd.OpenAndSetServerDatabase(ctx, dialect))
assert.NoError(t, cmd.verifyOplogIsEmpty(ctx))
}

@ -287,7 +287,7 @@ func (c *InitCommand) Run(args []string) (retCode int) {
return base.CommandUserError
}
// Everything after is done with normal database URL and is affecting actual data
if err := c.ConnectToDatabase(c.Context, dialect); err != nil {
if err := c.OpenAndSetServerDatabase(c.Context, dialect); err != nil {
c.UI.Error(fmt.Errorf("Error connecting to database after migrations: %w", err).Error())
return base.CommandCliError
}

@ -50,9 +50,11 @@ type Command struct {
SighupCh chan struct{}
SigUSR2Ch chan struct{}
Config *config.Config
controller *controller.Controller
worker *worker.Worker
Config *config.Config
schemaManager *schema.Manager
controller *controller.Controller
worker *worker.Worker
flagConfig string
flagConfigKms string
@ -395,60 +397,40 @@ func (c *Command) Run(args []string) int {
c.DatabaseMaxIdleConnections = c.Config.Controller.Database.MaxIdleConnections
c.DatabaseConnMaxIdleTimeDuration = c.Config.Controller.Database.ConnMaxIdleTimeDuration
if err := c.ConnectToDatabase(c.Context, "postgres"); err != nil {
if err := c.OpenAndSetServerDatabase(c.Context, "postgres"); err != nil {
c.UI.Error(fmt.Errorf("Error connecting to database: %w", err).Error())
return base.CommandCliError
}
underlyingDB, err := c.Database.SqlDB(c.Context)
if err != nil {
c.UI.Error(fmt.Errorf("Can't get db: %w.", err).Error())
return base.CommandCliError
}
sMan, err := schema.NewManager(c.Context, "postgres", underlyingDB)
sm, err := acquireDatabaseSharedLock(c.Context, c.Server.Database)
if err != nil {
c.UI.Error(fmt.Errorf("Can't get schema manager: %w.", err).Error())
return base.CommandCliError
}
// This is an advisory locks on the DB which is released when the db session ends.
if err := sMan.SharedLock(c.Context); err != nil {
c.UI.Error(fmt.Errorf("Unable to gain shared access to the database: %w", err).Error())
c.UI.Error(fmt.Errorf("Failed to acquire database shared lock: %w", err).Error())
return base.CommandCliError
}
c.schemaManager = sm
defer func() {
if c.schemaManager == nil {
c.UI.Error("no schema manager to unlock database with")
return
}
// The base context has already been canceled so we shouldn't use it here.
// 1 second is chosen so the shutdown is still responsive and this is a mostly
// non critical step since the lock should be released when the session with the
// database is closed.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := sMan.Close(ctx); err != nil {
err := c.schemaManager.Close(ctx)
if err != nil {
c.UI.Error(fmt.Errorf("Unable to release shared lock to the database: %w", err).Error())
}
}()
ckState, err := sMan.CurrentState(c.Context)
err = verifyDatabaseState(c.Context, c.Server.Database, c.schemaManager)
if err != nil {
c.UI.Error(fmt.Errorf("Error checking schema state: %w", err).Error())
return base.CommandCliError
}
if !ckState.Initialized {
c.UI.Error(base.WrapAtLength("The database has not been initialized. Please run 'boundary database init'."))
return base.CommandCliError
}
if !ckState.MigrationsApplied() {
for _, e := range ckState.Editions {
if e.DatabaseSchemaState == schema.Ahead {
c.UI.Error(base.WrapAtLength(fmt.Sprintf("Newer schema version (%s %d) "+
"than this binary expects. Please use a newer version of the boundary "+
"binary.", e.Name, e.DatabaseSchemaVersion)))
return base.CommandCliError
}
}
c.UI.Error(base.WrapAtLength("Database schema must be updated to use this version. Run 'boundary database migrate' to update the database. NOTE: Boundary does not currently support live migration; ensure all controllers are shut down before running the migration command."))
return base.CommandCliError
}
if err := c.verifyKmsSetup(); err != nil {
c.UI.Error(base.WrapAtLength("Database is in a bad state. Please revert the database into the last known good state."))
c.UI.Error(err.Error())
return base.CommandCliError
}
}
@ -846,9 +828,9 @@ func (c *Command) Reload(newConf *config.Config) error {
return reloadErrors.ErrorOrNil()
}
func (c *Command) verifyKmsSetup() error {
func verifyKmsSetup(dbase *db.DB) error {
const op = "server.(Command).verifyKmsExists"
rw := db.New(c.Database)
rw := db.New(dbase)
ctx := context.Background()
kmsCache, err := kms.New(ctx, rw, rw)
@ -861,6 +843,74 @@ func (c *Command) verifyKmsSetup() error {
return nil
}
// acquireDatabaseSharedLock uses the schema manager to acquire a shared lock on
// the database. This is done as a mechanism to disallow running migration commands
// while the database is in use.
func acquireDatabaseSharedLock(ctx context.Context, db *db.DB) (*schema.Manager, error) {
if db == nil {
return nil, fmt.Errorf("nil database")
}
underlyingDb, err := db.SqlDB(ctx)
if err != nil {
return nil, fmt.Errorf("failed to obtain sql db: %w", err)
}
manager, err := schema.NewManager(ctx, "postgres", underlyingDb)
if err != nil {
return nil, fmt.Errorf("failed to create new schema manager: %w", err)
}
// This is an advisory locks on the DB which is released when the db session ends.
err = manager.SharedLock(ctx)
if err != nil {
return nil, fmt.Errorf("failed to gain shared access to the database: %w", err)
}
return manager, nil
}
// verifyDatabaseState checks that the migrations and kms setup for the given database are correctly setup.
func verifyDatabaseState(ctx context.Context, db *db.DB, schemaManager *schema.Manager) error {
if db == nil {
return fmt.Errorf("nil database")
}
if schemaManager == nil {
return fmt.Errorf("nil schema manager")
}
s, err := schemaManager.CurrentState(ctx)
if err != nil {
return fmt.Errorf("failed to get current schema state: %w", err)
}
if !s.Initialized {
return fmt.Errorf("The database has not been initialized. Please ensure your database user " +
"has access to all Boundary tables, or run 'boundary database init' if you haven't initialized " +
"your database for Boundary.")
}
if !s.MigrationsApplied() {
for _, e := range s.Editions {
if e.DatabaseSchemaState == schema.Ahead {
return fmt.Errorf("Newer schema version (%s %d) "+
"than this binary expects. Please use a newer version of the boundary "+
"binary.", e.Name, e.DatabaseSchemaVersion)
}
}
return fmt.Errorf("Database schema must be updated to use this version. " +
"Run 'boundary database migrate' to update the database. " +
"NOTE: Boundary does not currently support live migration; " +
"Ensure all controllers are shut down before running the migration command.")
}
err = verifyKmsSetup(db)
if err != nil {
return fmt.Errorf("Database is in a bad state. Please revert the database "+
"into the last known good state. (Failed to verify kms setup: %w)", err)
}
return nil
}
var extraConfigValidationFunc = func(cfg *config.Config) error {
if cfg.Controller == nil && cfg.Worker == nil {
return stderrors.New("Neither worker nor controller specified in configuration file.")

@ -664,7 +664,7 @@ func TestControllerConfig(t testing.TB, ctx context.Context, tc *TestController,
if _, err := schema.MigrateStore(ctx, "postgres", tc.b.DatabaseUrl); err != nil {
t.Fatal(err)
}
if err := tc.b.ConnectToDatabase(tc.ctx, "postgres"); err != nil {
if err := tc.b.OpenAndSetServerDatabase(tc.ctx, "postgres"); err != nil {
t.Fatal(err)
}
if !opts.DisableKmsKeyCreation {

Loading…
Cancel
Save