|
|
|
|
@ -418,14 +418,6 @@ func (s *Stream) remoteClose() {
|
|
|
|
|
s.writeCh <- nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Stream) registerStateListener(ch chan<- streamState) {
|
|
|
|
|
s.stateChange[ch] = struct{}{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Stream) deregisterStateListener(ch chan<- streamState) {
|
|
|
|
|
delete(s.stateChange, ch)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Stream) setState(state streamState) {
|
|
|
|
|
log.Printf("[TRACE] Stream %d went to state %d", s.id, state)
|
|
|
|
|
s.state = state
|
|
|
|
|
@ -441,12 +433,12 @@ func (s *Stream) setState(state streamState) {
|
|
|
|
|
func (s *Stream) waitState(target streamState) error {
|
|
|
|
|
// Register a state change listener to wait for changes
|
|
|
|
|
stateCh := make(chan streamState, 10)
|
|
|
|
|
s.registerStateListener(stateCh)
|
|
|
|
|
s.stateChange[stateCh] = struct{}{}
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
s.deregisterStateListener(stateCh)
|
|
|
|
|
delete(s.stateChange, stateCh)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
state := <-stateCh
|
|
|
|
|
|