From d5bf5d7f391efc00b34b4978c6c637400e5935e4 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 1 Jan 2014 21:53:36 -0800 Subject: [PATCH 1/8] packer/rpc: make sure we read all the data --- packer/rpc/muxconn.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index d03c68c6b..9b4d2f6fd 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -284,10 +284,13 @@ func (m *MuxConn) loop() { // TODO(mitchellh): probably would be better to re-use a buffer... data := make([]byte, length) - if length > 0 { - if _, err := m.rwc.Read(data); err != nil { + n := 0 + for n < int(length) { + if n2, err := m.rwc.Read(data); err != nil { log.Printf("[ERR] Error reading data: %s", err) return + } else { + n += n2 } } @@ -434,10 +437,20 @@ func (m *MuxConn) write(from muxPacketFrom, id uint32, dataType muxPacketType, p if err := binary.Write(m.rwc, binary.BigEndian, int32(len(p))); err != nil { return 0, err } - if len(p) == 0 { - return 0, nil + + // Write all the bytes. If we don't write all the bytes, report an error + var err error = nil + n := 0 + for n < len(p) { + var n2 int + n2, err = m.rwc.Write(p) + n += n2 + if err != nil { + break + } } - return m.rwc.Write(p) + + return n, err } // Stream is a single stream of data and implements io.ReadWriteCloser. From a7144d10258068d4c375e80484297ca8f075bbd2 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 1 Jan 2014 21:53:57 -0800 Subject: [PATCH 2/8] packer/rpc: a little more logging --- packer/rpc/muxconn.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index 9b4d2f6fd..d4ce526e7 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -146,6 +146,8 @@ func (m *MuxConn) Accept(id uint32) (io.ReadWriteCloser, error) { // Dial opens a connection to the remote end using the given stream ID. // An Accept on the remote end will only work with if the IDs match. func (m *MuxConn) Dial(id uint32) (io.ReadWriteCloser, error) { + log.Printf("[TRACE] %p: Dial on stream ID: %d", m, id) + m.muDial.Lock() // If we have any streams with this ID, then it is a failure. The From 1dd5a13139c1639b1b937fd7a52d3e308e48af17 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 1 Jan 2014 21:59:00 -0800 Subject: [PATCH 3/8] packer/rpc: more logging in muxconn --- packer/rpc/muxconn.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index d4ce526e7..d6df9f9e3 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -448,6 +448,7 @@ func (m *MuxConn) write(from muxPacketFrom, id uint32, dataType muxPacketType, p n2, err = m.rwc.Write(p) n += n2 if err != nil { + log.Printf("[ERR] %p: Stream %d (%s) write error: %s", m, id, from, err) break } } From bf8715b7044c996433a1625f9eec30509700e393 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 1 Jan 2014 22:03:12 -0800 Subject: [PATCH 4/8] packer/rpc: more logs --- packer/rpc/muxconn.go | 1 + 1 file changed, 1 insertion(+) diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index d6df9f9e3..ea7b8487e 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -447,6 +447,7 @@ func (m *MuxConn) write(from muxPacketFrom, id uint32, dataType muxPacketType, p var n2 int n2, err = m.rwc.Write(p) n += n2 + log.Printf("[TRACE] %p: Stream %d (%s) write %d/%d bytes", m, id, from, n, len(p)) if err != nil { log.Printf("[ERR] %p: Stream %d (%s) write error: %s", m, id, from, err) break From e37e690e99946476c42285f9fdbd265dd4ca1cc1 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 1 Jan 2014 22:13:06 -0800 Subject: [PATCH 5/8] packer/rpc: more logging and fix a bug with read buffer start point --- packer/rpc/muxconn.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index ea7b8487e..e7cde415e 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -288,11 +288,14 @@ func (m *MuxConn) loop() { data := make([]byte, length) n := 0 for n < int(length) { - if n2, err := m.rwc.Read(data); err != nil { + if n2, err := m.rwc.Read(data[n:]); err != nil { log.Printf("[ERR] Error reading data: %s", err) return } else { n += n2 + if n < int(length) { + log.Printf("[TRACE] %p: Stream %d read %d/%d bytes", m, id, n, length) + } } } @@ -447,7 +450,9 @@ func (m *MuxConn) write(from muxPacketFrom, id uint32, dataType muxPacketType, p var n2 int n2, err = m.rwc.Write(p) n += n2 - log.Printf("[TRACE] %p: Stream %d (%s) write %d/%d bytes", m, id, from, n, len(p)) + if n < len(p) { + log.Printf("[TRACE] %p: Stream %d (%s) write %d/%d bytes", m, id, from, n, len(p)) + } if err != nil { log.Printf("[ERR] %p: Stream %d (%s) write error: %s", m, id, from, err) break From 84541c670ba34d5ab0f26a017c19ede2916278b0 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 1 Jan 2014 22:19:43 -0800 Subject: [PATCH 6/8] packer/rpc: get rid of trace level --- packer/rpc/muxconn.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index e7cde415e..376613447 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -100,7 +100,7 @@ func (m *MuxConn) Close() error { // Accept accepts a multiplexed connection with the given ID. This // will block until a request is made to connect. func (m *MuxConn) Accept(id uint32) (io.ReadWriteCloser, error) { - log.Printf("[TRACE] %p: Accept on stream ID: %d", m, id) + //log.Printf("[TRACE] %p: Accept on stream ID: %d", m, id) // Get the stream. It is okay if it is already in the list of streams // because we may have prematurely received a syn for it. @@ -146,7 +146,7 @@ func (m *MuxConn) Accept(id uint32) (io.ReadWriteCloser, error) { // Dial opens a connection to the remote end using the given stream ID. // An Accept on the remote end will only work with if the IDs match. func (m *MuxConn) Dial(id uint32) (io.ReadWriteCloser, error) { - log.Printf("[TRACE] %p: Dial on stream ID: %d", m, id) + //log.Printf("[TRACE] %p: Dial on stream ID: %d", m, id) m.muDial.Lock() @@ -293,9 +293,6 @@ func (m *MuxConn) loop() { return } else { n += n2 - if n < int(length) { - log.Printf("[TRACE] %p: Stream %d read %d/%d bytes", m, id, n, length) - } } } @@ -330,7 +327,7 @@ func (m *MuxConn) loop() { continue } - log.Printf("[TRACE] %p: Stream %d (%s) received packet %d", m, id, from, packetType) + //log.Printf("[TRACE] %p: Stream %d (%s) received packet %d", m, id, from, packetType) switch packetType { case muxPacketSyn: // If the stream is nil, this is the only case where we'll @@ -450,9 +447,6 @@ func (m *MuxConn) write(from muxPacketFrom, id uint32, dataType muxPacketType, p var n2 int n2, err = m.rwc.Write(p) n += n2 - if n < len(p) { - log.Printf("[TRACE] %p: Stream %d (%s) write %d/%d bytes", m, id, from, n, len(p)) - } if err != nil { log.Printf("[ERR] %p: Stream %d (%s) write error: %s", m, id, from, err) break @@ -568,7 +562,7 @@ func (s *Stream) closeWriter() { } func (s *Stream) setState(state streamState) { - log.Printf("[TRACE] %p: Stream %d (%s) went to state %d", s.mux, s.id, s.from, state) + //log.Printf("[TRACE] %p: Stream %d (%s) went to state %d", s.mux, s.id, s.from, state) s.state = state s.stateUpdated = time.Now().UTC() for ch, _ := range s.stateChange { From e6fb71d14f3e7904d2a6c5dc42629fb56069b733 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 1 Jan 2014 22:20:03 -0800 Subject: [PATCH 7/8] packer/rpc: write only the remaining data on muxconn retry --- packer/rpc/muxconn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index 376613447..17312f367 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -445,7 +445,7 @@ func (m *MuxConn) write(from muxPacketFrom, id uint32, dataType muxPacketType, p n := 0 for n < len(p) { var n2 int - n2, err = m.rwc.Write(p) + n2, err = m.rwc.Write(p[n:]) n += n2 if err != nil { log.Printf("[ERR] %p: Stream %d (%s) write error: %s", m, id, from, err) From a41678eb3bf533e09abc485bb23e49c1c8e5c962 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 1 Jan 2014 22:24:29 -0800 Subject: [PATCH 8/8] update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dde55505b..ca310bac2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ BUG FIXES: * core: If a stream ID loops around, don't let it use stream ID 0 [GH-767] +* core: Fix issue where large writes to plugins would result in stream + corruption. [GH-727] * builders/virtualbox-ovf: `shutdown_timeout` config works. [GH-772] * builders/vmware-iso: Remote driver works properly again. [GH-773]