|
|
|
|
@ -175,16 +175,29 @@ func (m *MuxConn) openStream(id uint32) (*Stream, error) {
|
|
|
|
|
|
|
|
|
|
// Create the stream object and channel where data will be sent to
|
|
|
|
|
dataR, dataW := io.Pipe()
|
|
|
|
|
writeCh := make(chan []byte, 10)
|
|
|
|
|
|
|
|
|
|
// Set the data channel so we can write to it.
|
|
|
|
|
stream := &Stream{
|
|
|
|
|
id: id,
|
|
|
|
|
mux: m,
|
|
|
|
|
reader: dataR,
|
|
|
|
|
writer: dataW,
|
|
|
|
|
id: id,
|
|
|
|
|
mux: m,
|
|
|
|
|
reader: dataR,
|
|
|
|
|
writer: dataW,
|
|
|
|
|
writeCh: writeCh,
|
|
|
|
|
}
|
|
|
|
|
stream.setState(streamStateClosed)
|
|
|
|
|
|
|
|
|
|
// Start the goroutine that will read from the queue and write
|
|
|
|
|
// data out.
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
data := <-writeCh
|
|
|
|
|
if _, err := dataW.Write(data); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
m.streams[id] = stream
|
|
|
|
|
return m.streams[id], nil
|
|
|
|
|
}
|
|
|
|
|
@ -256,7 +269,11 @@ func (m *MuxConn) loop() {
|
|
|
|
|
case muxPacketData:
|
|
|
|
|
stream.mu.Lock()
|
|
|
|
|
if stream.state == streamStateEstablished {
|
|
|
|
|
stream.writer.Write(data)
|
|
|
|
|
select {
|
|
|
|
|
case stream.writeCh <- data:
|
|
|
|
|
default:
|
|
|
|
|
log.Printf("[ERR] Failed to write data, buffer full: %d", id)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
log.Printf("[ERR] Data received for stream in state: %d", stream.state)
|
|
|
|
|
}
|
|
|
|
|
@ -293,6 +310,7 @@ type Stream struct {
|
|
|
|
|
state streamState
|
|
|
|
|
stateUpdated time.Time
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
writeCh chan<- []byte
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type streamState byte
|
|
|
|
|
|