@ -1,11 +1,9 @@
package pg
import (
"context"
"crypto/md5"
"database/sql"
"fmt"
"sync"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/terraform/state"
@ -19,24 +17,12 @@ type RemoteClient struct {
Name string
SchemaName string
// In-flight database transaction. Empty unless Locked.
txn * sql . Tx
txnMux sync . Mutex
info * state . LockInfo
info * state . LockInfo
}
func ( c * RemoteClient ) Get ( ) ( * remote . Payload , error ) {
query := ` SELECT data FROM %s.%s WHERE name = $1 `
var row * sql . Row
// Take exclusive access to the database transaction
c . txnMux . Lock ( )
defer c . txnMux . Unlock ( )
// Use the open transaction when present
if c . txn != nil {
row = c . txn . QueryRow ( fmt . Sprintf ( query , c . SchemaName , statesTableName ) , c . Name )
} else {
row = c . Client . QueryRow ( fmt . Sprintf ( query , c . SchemaName , statesTableName ) , c . Name )
}
row := c . Client . QueryRow ( fmt . Sprintf ( query , c . SchemaName , statesTableName ) , c . Name )
var data [ ] byte
err := row . Scan ( & data )
switch {
@ -58,16 +44,7 @@ func (c *RemoteClient) Put(data []byte) error {
query := ` INSERT INTO % s . % s ( name , data ) VALUES ( $ 1 , $ 2 )
ON CONFLICT ( name ) DO UPDATE
SET data = $ 2 WHERE % s . name = $ 1 `
var err error
// Take exclusive access to the database transaction
c . txnMux . Lock ( )
defer c . txnMux . Unlock ( )
// Use the open transaction when present
if c . txn != nil {
_ , err = c . txn . Exec ( fmt . Sprintf ( query , c . SchemaName , statesTableName , statesTableName ) , c . Name , data )
} else {
_ , err = c . Client . Exec ( fmt . Sprintf ( query , c . SchemaName , statesTableName , statesTableName ) , c . Name , data )
}
_ , err := c . Client . Exec ( fmt . Sprintf ( query , c . SchemaName , statesTableName , statesTableName ) , c . Name , data )
if err != nil {
return err
}
@ -76,16 +53,7 @@ func (c *RemoteClient) Put(data []byte) error {
func ( c * RemoteClient ) Delete ( ) error {
query := ` DELETE FROM %s.%s WHERE name = $1 `
var err error
// Take exclusive access to the database transaction
c . txnMux . Lock ( )
defer c . txnMux . Unlock ( )
// Use the open transaction when present
if c . txn != nil {
_ , err = c . txn . Exec ( fmt . Sprintf ( query , c . SchemaName , statesTableName ) , c . Name )
} else {
_ , err = c . Client . Exec ( fmt . Sprintf ( query , c . SchemaName , statesTableName ) , c . Name )
}
_ , err := c . Client . Exec ( fmt . Sprintf ( query , c . SchemaName , statesTableName ) , c . Name )
if err != nil {
return err
}
@ -95,70 +63,62 @@ func (c *RemoteClient) Delete() error {
func ( c * RemoteClient ) Lock ( info * state . LockInfo ) ( string , error ) {
var err error
var lockID string
var txn * sql . Tx
if info . ID == "" {
lockID , err = uuid . GenerateUUID ( )
if err != nil {
return "" , err
}
info . Operation = "client"
info . ID = lockID
}
// Take exclusive access to the database transaction
c . txnMux . Lock ( )
defer c . txnMux . Unlock ( )
if c . txn == nil {
// Most strict transaction isolation to prevent cross-talk
// between incomplete state transactions.
txn , err = c . Client . BeginTx ( context . Background ( ) , & sql . TxOptions {
Isolation : sql . LevelSerializable ,
} )
// Local helper function so we can call it multiple places
//
lockUnlock := func ( pgLockId string ) error {
query := ` SELECT pg_advisory_unlock(%s) `
row := c . Client . QueryRow ( fmt . Sprintf ( query , pgLockId ) )
var didUnlock [ ] byte
err := row . Scan ( & didUnlock )
if err != nil {
return "" , & state . LockError { Info : info , Err : err }
return & state . LockError { Info : info , Err : err }
}
c . txn = txn
} else {
return "" , & state . LockError { Info : info , Err : fmt . Errorf ( "Client is already in a locking transaction" ) }
}
// Do not wait before giving up on a contended lock.
_ , err = c . Client . Exec ( ` SET LOCAL lock_timeout = 0 ` )
if err != nil {
c . rollback ( info )
return "" , err
return nil
}
// Try to acquire lock for the existing row.
query := ` SELECT pg_try_advisory_xact_ lock(%s.id) FROM %s.%s WHERE %s.name = $1`
row := c . txn . QueryRow ( fmt . Sprintf ( query , statesTableName , c . SchemaName , statesTableName , statesTableName ) , c . Name )
var didLock [ ] byte
err = row . Scan ( & didLock)
// Try to acquire locks for the existing row `id` and the creation lock `-1`.
query := ` SELECT %s.id, pg_try_advisory_lock(%s.id), pg_try_advisory_lock(-1) FROM %s.%s WHERE %s.name = $1 `
row := c . Client . QueryRow ( fmt . Sprintf ( query , statesTableName , statesTableName , c . SchemaName , statesTableName , statesTableName ) , c . Name )
var pgLockId , didLock , didLockForCreate [ ] byte
err = row . Scan ( & pgLockId , & didLock , & didLockForCreate )
switch {
case err == sql . ErrNoRows :
// When the row does not yet exist in state, take
// the `-1` lock to create the new row.
innerRow := c . txn . QueryRow ( ` SELECT pg_try_advisory_xact_lock(-1) ` )
// No rows means we're creating the workspace. Take the creation lock.
innerRow := c . Client . QueryRow ( ` SELECT pg_try_advisory_lock(-1) ` )
var innerDidLock [ ] byte
err := innerRow . Scan ( & innerDidLock )
if err != nil {
c . rollback ( info )
return "" , & state . LockError { Info : info , Err : err }
}
if string ( innerDidLock ) == "false" {
c . rollback ( info )
return "" , & state . LockError { Info : info , Err : fmt . Errorf ( "Workspace is already locked: %s" , c . Name ) }
return "" , & state . LockError { Info : info , Err : fmt . Errorf ( "Already locked for workspace creation: %s" , c . Name ) }
}
info . Path = "-1"
case err != nil :
c . rollback ( info )
return "" , & state . LockError { Info : info , Err : err }
case string ( didLock ) == "false" :
c . rollback ( info )
// Existing workspace is already locked. Release the attempted creation lock.
lockUnlock ( "-1" )
return "" , & state . LockError { Info : info , Err : fmt . Errorf ( "Workspace is already locked: %s" , c . Name ) }
case string ( didLockForCreate ) == "false" :
// Someone has the creation lock already. Release the existing workspace because it might not be safe to touch.
lockUnlock ( string ( pgLockId ) )
return "" , & state . LockError { Info : info , Err : fmt . Errorf ( "Cannot lock workspace; already locked for workspace creation: %s" , c . Name ) }
default :
// Existing workspace is now locked. Release the attempted creation lock.
lockUnlock ( "-1" )
info . Path = string ( pgLockId )
}
c . info = info
return info . ID , nil
}
@ -168,35 +128,15 @@ func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) {
}
func ( c * RemoteClient ) Unlock ( id string ) error {
// Take exclusive access to the database transaction
c . txnMux . Lock ( )
defer c . txnMux . Unlock ( )
if c . txn != nil {
err := c . txn . Commit ( )
if err != nil {
return err
}
c . txn = nil
}
c . info = nil
return nil
}
// This must be called from any code path where the
// transaction would not be committed (unlocked),
// otherwise the transactions will leak and prevent
// the process from exiting cleanly.
//
// Does not use mutex because this will implicitly be
// called from within an already mutex'd scope.
func ( c * RemoteClient ) rollback ( info * state . LockInfo ) error {
if c . txn != nil {
err := c . txn . Rollback ( )
if c . info != nil && c . info . Path != "" {
query := ` SELECT pg_advisory_unlock(%s) `
row := c . Client . QueryRow ( fmt . Sprintf ( query , c . info . Path ) )
var didUnlock [ ] byte
err := row . Scan ( & didUnlock )
if err != nil {
return err
return & state . LockError { Info : c . info , Err : err }
}
c . txn = nil
c . info = nil
}
c . info = nil
return nil
}