@ -2,7 +2,6 @@ package resource
import (
"log"
"sync/atomic"
"time"
)
@ -62,33 +61,45 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
conf . ContinuousTargetOccurence = 1
}
// We can't safely read the result values if we timeout, so store them in
// an atomic.Value
type Result struct {
Result interface { }
State string
Error error
Done bool
}
var lastResult atomic . Value
lastResult . Store ( Result { } )
doneCh := make ( chan struct { } )
// read ever result from the refresh loop, waiting for a positive result.Done
resCh := make ( chan Result , 1 )
// cancellation channel for the refresh loop
cancelCh := make ( chan struct { } )
go func ( ) {
defer close ( doneCh )
defer close ( res Ch)
// Wait for the delay
time . Sleep ( conf . Delay )
wait := 100 * time . Millisecond
// start with 0 delay for the first loop
var wait time . Duration
for {
// wait and watch for cancellation
select {
case <- cancelCh :
return
case <- time . After ( wait ) :
// first round had no wait
if wait == 0 {
wait = 100 * time . Millisecond
}
}
res , currentState , err := conf . Refresh ( )
result := Result {
Result : res ,
State : currentState ,
Error : err ,
}
lastResult . Store ( result )
resCh <- result
if err != nil {
return
@ -98,6 +109,8 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
if res == nil && len ( conf . Target ) == 0 {
targetOccurence += 1
if conf . ContinuousTargetOccurence == targetOccurence {
result . Done = true
resCh <- result
return
} else {
continue
@ -113,7 +126,7 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
LastError : err ,
Retries : notfoundTick ,
}
lastResult. Store ( result )
resCh <- result
return
}
} else {
@ -126,6 +139,8 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
found = true
targetOccurence += 1
if conf . ContinuousTargetOccurence == targetOccurence {
result . Done = true
resCh <- result
return
} else {
continue
@ -147,7 +162,7 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
State : result . State ,
ExpectedState : conf . Target ,
}
lastResult. Store ( result )
resCh <- result
return
}
}
@ -162,30 +177,46 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
} else if wait > 10 * time . Second {
wait = 10 * time . Second
}
// Wait between refreshes using exponential backoff, except when
// waiting for the target state to reoccur.
if targetOccurence == 0 {
wait *= 2
}
}
log . Printf ( "[TRACE] Waiting %s before next try" , wait )
time . Sleep ( wait )
// Wait between refreshes using exponential backoff, except when
// waiting for the target state to reoccur.
if targetOccurence == 0 {
wait *= 2
}
}
} ( )
select {
case <- doneCh :
r := lastResult . Load ( ) . ( Result )
return r . Result , r . Error
case <- time . After ( conf . Timeout ) :
r := lastResult . Load ( ) . ( Result )
return nil , & TimeoutError {
LastError : r . Error ,
LastState : r . State ,
Timeout : conf . Timeout ,
ExpectedState : conf . Target ,
// store the last value result from the refresh loop
lastResult := Result { }
timeout := time . After ( conf . Timeout )
for {
select {
case r , ok := <- resCh :
// channel closed, so return the last result
if ! ok {
return lastResult . Result , lastResult . Error
}
// we reached the intended state
if r . Done {
return r . Result , r . Error
}
// still waiting, store the last result
lastResult = r
case <- timeout :
close ( cancelCh )
return nil , & TimeoutError {
LastError : lastResult . Error ,
LastState : lastResult . State ,
Timeout : conf . Timeout ,
ExpectedState : conf . Target ,
}
}
}
}