From b84b665ba33ced6a1407508897bec067aaae8f8f Mon Sep 17 00:00:00 2001 From: Megan Marsh Date: Fri, 26 Apr 2019 15:09:06 -0700 Subject: [PATCH 1/3] fix race --- packer/communicator.go | 78 ++++++++++++++++++++++++++++---------- packer/rpc/communicator.go | 8 ++++ 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/packer/communicator.go b/packer/communicator.go index a86c115ad..aef1775a0 100644 --- a/packer/communicator.go +++ b/packer/communicator.go @@ -4,11 +4,11 @@ import ( "context" "io" "os" + "strings" "sync" + "unicode" - "golang.org/x/sync/errgroup" - - "github.com/hashicorp/packer/common/iochan" + "github.com/mitchellh/iochan" ) // CmdDisconnect is a sentinel value to indicate a RemoteCmd @@ -115,30 +115,52 @@ func (r *RemoteCmd) RunWithUi(ctx context.Context, c Communicator, ui Ui) error r.Stderr = io.MultiWriter(r.Stderr, stderr_w) } - // Loop and get all our output until done. - printFn := func(in io.Reader, out func(string)) error { - for output := range iochan.LineReader(in) { + // Start the command + if err := c.Start(ctx, r); err != nil { + return err + } + + // Create the channels we'll use for data + stdoutCh := iochan.DelimReader(stdout_r, '\n') + stderrCh := iochan.DelimReader(stderr_r, '\n') + + // Start the goroutine to watch for the exit + go func() { + defer stdout_w.Close() + defer stderr_w.Close() + r.Wait() + }() + + // Loop and get all our output +OutputLoop: + for { + select { + case output := <-stderrCh: + if output != "" { + ui.Error(r.cleanOutputLine(output)) + } + case output := <-stdoutCh: if output != "" { - out(output) + ui.Message(r.cleanOutputLine(output)) } + case <-r.exitCh: + break OutputLoop + case <-ctx.Done(): + return ctx.Err() } - return nil } - wg, ctx := errgroup.WithContext(ctx) - - wg.Go(func() error { return printFn(stdout_r, ui.Message) }) - wg.Go(func() error { return printFn(stderr_r, ui.Error) }) - - if err := c.Start(ctx, r); err != nil { - return err + // Make sure we finish off stdout/stderr because we may have gotten + // a message from the exit channel before finishing these first. + for output := range stdoutCh { + ui.Message(r.cleanOutputLine(output)) } - select { - case <-ctx.Done(): - return ctx.Err() - case <-r.exitCh: - return nil + + for output := range stderrCh { + ui.Error(r.cleanOutputLine(output)) } + + return nil } // SetExited is a helper for setting that this process is exited. This @@ -174,3 +196,19 @@ func (r *RemoteCmd) initchan() { } }) } + +// cleanOutputLine cleans up a line so that '\r' don't muck up the +// UI output when we're reading from a remote command. +func (r *RemoteCmd) cleanOutputLine(line string) string { + // Trim surrounding whitespace + line = strings.TrimRightFunc(line, unicode.IsSpace) + + // Trim up to the first carriage return, since that text would be + // lost anyways. + idx := strings.LastIndex(line, "\r") + if idx > -1 { + line = line[idx+1:] + } + + return line +} diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go index 814dc05ca..2f2e8ee5f 100644 --- a/packer/rpc/communicator.go +++ b/packer/rpc/communicator.go @@ -101,10 +101,13 @@ func (c *communicator) Start(ctx context.Context, cmd *packer.RemoteCmd) (err er go func() { conn, err := c.mux.Accept(responseStreamId) + log.Printf("Megan waiting...") wg.Wait() + log.Printf("Megan done waiting...") if err != nil { log.Printf("[ERR] Error accepting response stream %d: %s", responseStreamId, err) + log.Printf("Megan SetExited 1...") cmd.SetExited(123) return } @@ -115,15 +118,19 @@ func (c *communicator) Start(ctx context.Context, cmd *packer.RemoteCmd) (err er if err := decoder.Decode(&finished); err != nil { log.Printf("[ERR] Error decoding response stream %d: %s", responseStreamId, err) + log.Printf("Megan SetExited 2...") cmd.SetExited(123) return } log.Printf("[INFO] RPC client: Communicator ended with: %d", finished.ExitStatus) + log.Printf("Megan SetExited 3...") cmd.SetExited(finished.ExitStatus) }() + log.Printf("Megan, calling communicator using c.client.call") err = c.client.Call("Communicator.Start", &args, new(interface{})) + log.Printf("Megan, returned from communicator.Start c.client.call") return } @@ -202,6 +209,7 @@ func (c *communicator) Download(path string, w io.Writer) (err error) { } func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface{}) error { + log.Println("Megan inside communicatorServer") ctx := context.TODO() // Build the RemoteCmd on this side so that it all pipes over From 1287fcfa275f712ec774fcab4c328e0f19e39143 Mon Sep 17 00:00:00 2001 From: Megan Marsh Date: Fri, 26 Apr 2019 16:22:53 -0700 Subject: [PATCH 2/3] add iochan by mitchell to modules --- go.mod | 1 + go.sum | 1 + vendor/github.com/mitchellh/iochan/LICENSE.md | 21 ++++++++++ vendor/github.com/mitchellh/iochan/README.md | 13 ++++++ vendor/github.com/mitchellh/iochan/go.mod | 1 + vendor/github.com/mitchellh/iochan/iochan.go | 41 +++++++++++++++++++ vendor/modules.txt | 2 + 7 files changed, 80 insertions(+) create mode 100644 vendor/github.com/mitchellh/iochan/LICENSE.md create mode 100644 vendor/github.com/mitchellh/iochan/README.md create mode 100644 vendor/github.com/mitchellh/iochan/go.mod create mode 100644 vendor/github.com/mitchellh/iochan/iochan.go diff --git a/go.mod b/go.mod index e3ea6a5f8..397a9d58e 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/mitchellh/go-fs v0.0.0-20180402234041-7b48fa161ea7 github.com/mitchellh/go-homedir v1.0.0 github.com/mitchellh/go-vnc v0.0.0-20150629162542-723ed9867aed + github.com/mitchellh/iochan v1.0.0 github.com/mitchellh/mapstructure v0.0.0-20180111000720-b4575eea38cc github.com/mitchellh/panicwrap v0.0.0-20170106182340-fce601fe5557 github.com/mitchellh/prefixedio v0.0.0-20151214002211-6e6954073784 diff --git a/go.sum b/go.sum index 8f16863fa..ad68c6980 100644 --- a/go.sum +++ b/go.sum @@ -278,6 +278,7 @@ github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eI github.com/mitchellh/go-vnc v0.0.0-20150629162542-723ed9867aed h1:FI2NIv6fpef6BQl2u3IZX/Cj20tfypRF4yd+uaHOMtI= github.com/mitchellh/go-vnc v0.0.0-20150629162542-723ed9867aed/go.mod h1:3rdaFaCv4AyBgu5ALFM0+tSuHrBh6v692nyQe3ikrq0= github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= +github.com/mitchellh/iochan v1.0.0 h1:C+X3KsSTLFVBr/tK1eYN/vs4rJcvsiLU338UhYPJWeY= github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v0.0.0-20180111000720-b4575eea38cc h1:5T6hzGUO5OrL6MdYXYoLQtRWJDDgjdlOVBn9mIqGY1g= diff --git a/vendor/github.com/mitchellh/iochan/LICENSE.md b/vendor/github.com/mitchellh/iochan/LICENSE.md new file mode 100644 index 000000000..762008c22 --- /dev/null +++ b/vendor/github.com/mitchellh/iochan/LICENSE.md @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2015 Mitchell Hashimoto + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/mitchellh/iochan/README.md b/vendor/github.com/mitchellh/iochan/README.md new file mode 100644 index 000000000..23771b1b9 --- /dev/null +++ b/vendor/github.com/mitchellh/iochan/README.md @@ -0,0 +1,13 @@ +# iochan + +iochan is a Go library for treating `io` readers and writers like channels. +This is useful when sometimes you wish to use `io.Reader` and such in `select` +statements. + +## Installation + +Standard `go get`: + +``` +$ go get github.com/mitchellh/iochan +``` diff --git a/vendor/github.com/mitchellh/iochan/go.mod b/vendor/github.com/mitchellh/iochan/go.mod new file mode 100644 index 000000000..50ab2f14e --- /dev/null +++ b/vendor/github.com/mitchellh/iochan/go.mod @@ -0,0 +1 @@ +module github.com/mitchellh/iochan diff --git a/vendor/github.com/mitchellh/iochan/iochan.go b/vendor/github.com/mitchellh/iochan/iochan.go new file mode 100644 index 000000000..c10cef02d --- /dev/null +++ b/vendor/github.com/mitchellh/iochan/iochan.go @@ -0,0 +1,41 @@ +package iochan + +import ( + "bufio" + "io" +) + +// DelimReader takes an io.Reader and produces the contents of the reader +// on the returned channel. The contents on the channel will be returned +// on boundaries specified by the delim parameter, and will include this +// delimiter. +// +// If an error occurs while reading from the reader, the reading will end. +// +// In the case of an EOF or error, the channel will be closed. +// +// This must only be called once for any individual reader. The behavior is +// unknown and will be unexpected if this is called multiple times with the +// same reader. +func DelimReader(r io.Reader, delim byte) <-chan string { + ch := make(chan string) + + go func() { + buf := bufio.NewReader(r) + + for { + line, err := buf.ReadString(delim) + if line != "" { + ch <- line + } + + if err != nil { + break + } + } + + close(ch) + }() + + return ch +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 109e0b17c..18842f689 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -330,6 +330,8 @@ github.com/mitchellh/go-homedir github.com/mitchellh/go-testing-interface # github.com/mitchellh/go-vnc v0.0.0-20150629162542-723ed9867aed github.com/mitchellh/go-vnc +# github.com/mitchellh/iochan v1.0.0 +github.com/mitchellh/iochan # github.com/mitchellh/mapstructure v0.0.0-20180111000720-b4575eea38cc github.com/mitchellh/mapstructure # github.com/mitchellh/panicwrap v0.0.0-20170106182340-fce601fe5557 From 2e0f9223c850c78f3def85119085d577f7949ef6 Mon Sep 17 00:00:00 2001 From: Megan Marsh Date: Mon, 29 Apr 2019 10:33:41 -0700 Subject: [PATCH 3/3] remove all the extra debug lines --- packer/rpc/communicator.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go index 2f2e8ee5f..814dc05ca 100644 --- a/packer/rpc/communicator.go +++ b/packer/rpc/communicator.go @@ -101,13 +101,10 @@ func (c *communicator) Start(ctx context.Context, cmd *packer.RemoteCmd) (err er go func() { conn, err := c.mux.Accept(responseStreamId) - log.Printf("Megan waiting...") wg.Wait() - log.Printf("Megan done waiting...") if err != nil { log.Printf("[ERR] Error accepting response stream %d: %s", responseStreamId, err) - log.Printf("Megan SetExited 1...") cmd.SetExited(123) return } @@ -118,19 +115,15 @@ func (c *communicator) Start(ctx context.Context, cmd *packer.RemoteCmd) (err er if err := decoder.Decode(&finished); err != nil { log.Printf("[ERR] Error decoding response stream %d: %s", responseStreamId, err) - log.Printf("Megan SetExited 2...") cmd.SetExited(123) return } log.Printf("[INFO] RPC client: Communicator ended with: %d", finished.ExitStatus) - log.Printf("Megan SetExited 3...") cmd.SetExited(finished.ExitStatus) }() - log.Printf("Megan, calling communicator using c.client.call") err = c.client.Call("Communicator.Start", &args, new(interface{})) - log.Printf("Megan, returned from communicator.Start c.client.call") return } @@ -209,7 +202,6 @@ func (c *communicator) Download(path string, w io.Writer) (err error) { } func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface{}) error { - log.Println("Megan inside communicatorServer") ctx := context.TODO() // Build the RemoteCmd on this side so that it all pipes over