@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"time"
@ -28,6 +29,8 @@ type RemoteClient struct {
GZip bool
mu sync . Mutex
// lockState is true if we're using locks
lockState bool
// The index of the last state we wrote.
// If this is > 0, Put will perform a CAS to ensure that the state wasn't
@ -40,6 +43,10 @@ type RemoteClient struct {
lockCh <- chan struct { }
info * state . LockInfo
// cancel the goroutine which is monitoring the lock.
monitorCancel chan struct { }
monitorDone chan struct { }
}
func ( c * RemoteClient ) Get ( ) ( * remote . Payload , error ) {
@ -88,6 +95,7 @@ func (c *RemoteClient) Put(data []byte) error {
kv := c . Client . KV ( )
// default to doing a CAS
verb := consulapi . KVCAS
// Assume a 0 index doesn't need a CAS for now, since we are either
@ -123,7 +131,6 @@ func (c *RemoteClient) Put(data []byte) error {
}
c . modifyIndex = resp . Results [ 0 ] . ModifyIndex
return nil
}
@ -172,11 +179,19 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
c . mu . Lock ( )
defer c . mu . Unlock ( )
if ! c . lockState {
return "" , nil
}
c . info = info
// These checks only are to ensure we strictly follow the specification.
// Terraform shouldn't ever re-lock, so provide errors for the 2 possible
// states if this is called.
select {
case <- c . lockCh :
// We had a lock, but lost it.
// Since we typically only call lock once, we shouldn't ever see this.
return "" , errors . New ( "lost consul lock" )
return "" , errors . New ( "lost consul lock, cannot re-lock" )
default :
if c . lockCh != nil {
// we have an active lock already
@ -184,10 +199,13 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
}
}
return c . lock ( info )
return c . lock ( )
}
func ( c * RemoteClient ) lock ( info * state . LockInfo ) ( string , error ) {
// called after a lock is acquired
var testLockHook func ( )
func ( c * RemoteClient ) lock ( ) ( string , error ) {
if c . consulLock == nil {
opts := & consulapi . LockOptions {
Key : c . Path + lockSuffix ,
@ -226,22 +244,83 @@ func (c *RemoteClient) lock(info *state.LockInfo) (string, error) {
c . lockCh = lockCh
err = c . putLockInfo ( info)
err = c . putLockInfo ( c. info)
if err != nil {
if unlockErr := c . Unlock( info . ID ) ; unlockErr != nil {
if unlockErr := c . unlock( c . info . ID ) ; unlockErr != nil {
err = multierror . Append ( err , unlockErr )
}
return "" , err
}
return info . ID , nil
// Start a goroutine to monitor the lock state.
// If we lose the lock to due communication issues with the consul agent,
// attempt to immediately reacquire the lock. Put will verify the integrity
// of the state by using a CAS operation.
c . monitorCancel = make ( chan struct { } )
c . monitorDone = make ( chan struct { } )
go func ( cancel , done chan struct { } ) {
defer func ( ) {
close ( done )
} ( )
select {
case <- c . lockCh :
for {
c . mu . Lock ( )
c . consulLock = nil
_ , err := c . lock ( )
c . mu . Unlock ( )
if err != nil {
// We failed to get the lock, keep trying as long as
// terraform is running. There may be changes in progress,
// so there's no use in aborting. Either we eventually
// reacquire the lock, or a Put will fail on a CAS.
log . Printf ( "[ERROR] attempting to reacquire lock: %s" , err )
time . Sleep ( time . Second )
select {
case <- cancel :
return
default :
}
continue
}
// if the error was nil, the new lock started a new copy of
// this goroutine.
return
}
case <- cancel :
return
}
} ( c . monitorCancel , c . monitorDone )
if testLockHook != nil {
testLockHook ( )
}
return c . info . ID , nil
}
func ( c * RemoteClient ) Unlock ( id string ) error {
c . mu . Lock ( )
defer c . mu . Unlock ( )
if ! c . lockState {
return nil
}
return c . unlock ( id )
}
func ( c * RemoteClient ) unlock ( id string ) error {
// cancel our monitoring goroutine
if c . monitorCancel != nil {
close ( c . monitorCancel )
}
// this doesn't use the lock id, because the lock is tied to the consul client.
if c . consulLock == nil || c . lockCh == nil {
return nil
@ -253,20 +332,28 @@ func (c *RemoteClient) Unlock(id string) error {
default :
}
err := c . consulLock . Unlock ( )
kv := c . Client . KV ( )
var errs error
if _ , err := kv . Delete ( c . Path + lockInfoSuffix , nil ) ; err != nil {
errs = multierror . Append ( errs , err )
}
if err := c . consulLock . Unlock ( ) ; err != nil {
errs = multierror . Append ( errs , err )
}
// the monitoring goroutine may be in a select on this chan, so we need to
// wait for it to return before changing the value.
<- c . monitorDone
c . lockCh = nil
// This is only cleanup, and will fail if the lock was immediately taken by
// another client, so we don't report an error to the user here.
c . consulLock . Destroy ( )
kv := c . Client . KV ( )
_ , delErr := kv . Delete ( c . Path + lockInfoSuffix , nil )
if delErr != nil {
err = multierror . Append ( err , delErr )
}
return err
return errs
}
func compressState ( data [ ] byte ) ( [ ] byte , error ) {