diff --git a/backend/remote-state/consul/backend_state.go b/backend/remote-state/consul/backend_state.go index 9a8fd080f4..e77729446d 100644 --- a/backend/remote-state/consul/backend_state.go +++ b/backend/remote-state/consul/backend_state.go @@ -91,9 +91,10 @@ func (b *Backend) State(name string) (state.State, error) { // Build the state client var stateMgr state.State = &remote.State{ Client: &RemoteClient{ - Client: client, - Path: path, - GZip: gzip, + Client: client, + Path: path, + GZip: gzip, + lockState: b.lock, }, } diff --git a/backend/remote-state/consul/client.go b/backend/remote-state/consul/client.go index be9873417d..eb9f8254ce 100644 --- a/backend/remote-state/consul/client.go +++ b/backend/remote-state/consul/client.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "sync" "time" @@ -28,6 +29,8 @@ type RemoteClient struct { GZip bool mu sync.Mutex + // lockState is true if we're using locks + lockState bool // The index of the last state we wrote. // If this is > 0, Put will perform a CAS to ensure that the state wasn't @@ -40,6 +43,10 @@ type RemoteClient struct { lockCh <-chan struct{} info *state.LockInfo + + // cancel the goroutine which is monitoring the lock. + monitorCancel chan struct{} + monitorDone chan struct{} } func (c *RemoteClient) Get() (*remote.Payload, error) { @@ -88,6 +95,7 @@ func (c *RemoteClient) Put(data []byte) error { kv := c.Client.KV() + // default to doing a CAS verb := consulapi.KVCAS // Assume a 0 index doesn't need a CAS for now, since we are either @@ -123,7 +131,6 @@ func (c *RemoteClient) Put(data []byte) error { } c.modifyIndex = resp.Results[0].ModifyIndex - return nil } @@ -172,11 +179,19 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { c.mu.Lock() defer c.mu.Unlock() + if !c.lockState { + return "", nil + } + + c.info = info + + // These checks only are to ensure we strictly follow the specification. + // Terraform shouldn't ever re-lock, so provide errors for the 2 possible + // states if this is called. select { case <-c.lockCh: // We had a lock, but lost it. - // Since we typically only call lock once, we shouldn't ever see this. - return "", errors.New("lost consul lock") + return "", errors.New("lost consul lock, cannot re-lock") default: if c.lockCh != nil { // we have an active lock already @@ -184,10 +199,13 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { } } - return c.lock(info) + return c.lock() } -func (c *RemoteClient) lock(info *state.LockInfo) (string, error) { +// called after a lock is acquired +var testLockHook func() + +func (c *RemoteClient) lock() (string, error) { if c.consulLock == nil { opts := &consulapi.LockOptions{ Key: c.Path + lockSuffix, @@ -226,22 +244,83 @@ func (c *RemoteClient) lock(info *state.LockInfo) (string, error) { c.lockCh = lockCh - err = c.putLockInfo(info) + err = c.putLockInfo(c.info) if err != nil { - if unlockErr := c.Unlock(info.ID); unlockErr != nil { + if unlockErr := c.unlock(c.info.ID); unlockErr != nil { err = multierror.Append(err, unlockErr) } return "", err } - return info.ID, nil + // Start a goroutine to monitor the lock state. + // If we lose the lock to due communication issues with the consul agent, + // attempt to immediately reacquire the lock. Put will verify the integrity + // of the state by using a CAS operation. + c.monitorCancel = make(chan struct{}) + c.monitorDone = make(chan struct{}) + go func(cancel, done chan struct{}) { + defer func() { + close(done) + }() + select { + case <-c.lockCh: + for { + c.mu.Lock() + c.consulLock = nil + _, err := c.lock() + c.mu.Unlock() + + if err != nil { + // We failed to get the lock, keep trying as long as + // terraform is running. There may be changes in progress, + // so there's no use in aborting. Either we eventually + // reacquire the lock, or a Put will fail on a CAS. + log.Printf("[ERROR] attempting to reacquire lock: %s", err) + time.Sleep(time.Second) + + select { + case <-cancel: + return + default: + } + continue + } + + // if the error was nil, the new lock started a new copy of + // this goroutine. + return + } + + case <-cancel: + return + } + }(c.monitorCancel, c.monitorDone) + + if testLockHook != nil { + testLockHook() + } + + return c.info.ID, nil } func (c *RemoteClient) Unlock(id string) error { c.mu.Lock() defer c.mu.Unlock() + if !c.lockState { + return nil + } + + return c.unlock(id) +} + +func (c *RemoteClient) unlock(id string) error { + // cancel our monitoring goroutine + if c.monitorCancel != nil { + close(c.monitorCancel) + } + // this doesn't use the lock id, because the lock is tied to the consul client. if c.consulLock == nil || c.lockCh == nil { return nil @@ -253,20 +332,28 @@ func (c *RemoteClient) Unlock(id string) error { default: } - err := c.consulLock.Unlock() + kv := c.Client.KV() + + var errs error + + if _, err := kv.Delete(c.Path+lockInfoSuffix, nil); err != nil { + errs = multierror.Append(errs, err) + } + + if err := c.consulLock.Unlock(); err != nil { + errs = multierror.Append(errs, err) + } + + // the monitoring goroutine may be in a select on this chan, so we need to + // wait for it to return before changing the value. + <-c.monitorDone c.lockCh = nil // This is only cleanup, and will fail if the lock was immediately taken by // another client, so we don't report an error to the user here. c.consulLock.Destroy() - kv := c.Client.KV() - _, delErr := kv.Delete(c.Path+lockInfoSuffix, nil) - if delErr != nil { - err = multierror.Append(err, delErr) - } - - return err + return errs } func compressState(data []byte) ([]byte, error) {