From 532faec4579cff2bd942507d68ffa7aabd48712c Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sun, 2 Jun 2013 21:20:27 -0700 Subject: [PATCH] packer: New Communicator interface is much simplified --- packer/communicator.go | 180 ++++++------------------------------ packer/communicator_test.go | 120 ------------------------ 2 files changed, 28 insertions(+), 272 deletions(-) diff --git a/packer/communicator.go b/packer/communicator.go index dd322ff23..d1b6f3a66 100644 --- a/packer/communicator.go +++ b/packer/communicator.go @@ -1,13 +1,36 @@ package packer import ( - "bufio" "io" - "log" - "sync" - "time" ) +// RemoteCmd represents a remote command being prepared or run. +type RemoteCmd struct { + // Command is the command to run remotely. This is executed as if + // it were a shell command, so you are expected to do any shell escaping + // necessary. + Command string + + // Stdin specifies the process's standard input. If Stdin is + // nil, the process reads from an empty bytes.Buffer. + Stdin io.Reader + + // Stdout and Stderr represent the process's standard output and + // error. + // + // If either is nil, it will be set to ioutil.Discard. + Stdout io.Writer + Stderr io.Writer + + // This will be set to true when the remote command has exited. It + // shouldn't be set manually by the user, but there is no harm in + // doing so. + Exited bool + + // Once Exited is true, this will contain the exit code of the process. + ExitStatus int +} + // A Communicator is the interface used to communicate with the machine // that exists that will eventually be packaged into an image. Communicators // allow you to execute remote commands, upload files, etc. @@ -15,154 +38,7 @@ import ( // Communicators must be safe for concurrency, meaning multiple calls to // Start or any other method may be called at the same time. type Communicator interface { - Start(string) (*RemoteCommand, error) + Start(*RemoteCmd) error Upload(string, io.Reader) error Download(string, io.Writer) error } - -// This struct contains some information about the remote command being -// executed and can be used to wait for it to complete. -// -// Stdin, Stdout, Stderr are readers and writers to varios IO streams for -// the remote command. -// -// Exited is false until Wait is called. It can be used to check if Wait -// has already been called. -// -// ExitStatus is the exit code of the remote process. It is only available -// once Wait is called. -type RemoteCommand struct { - Stdin io.Writer - Stdout io.Reader - Stderr io.Reader - Exited bool - ExitStatus int - - exitChans []chan<- int - exitChanLock sync.Mutex - errChans []chan<- string - errChanLock sync.Mutex - outChans []chan<- string - outChanLock sync.Mutex -} - -// StderrStream returns a channel that will be sent all the -func (r *RemoteCommand) StderrChan() <-chan string { - r.errChanLock.Lock() - defer r.errChanLock.Unlock() - - // If no channels have been made, make that slice and start - // the goroutine to read and send to them - if r.errChans == nil { - r.errChans = make([]chan<- string, 0, 5) - go r.channelReader(r.Stderr, &r.errChans, &r.errChanLock) - } - - // Create the channel, append it to the channels we care about - errChan := make(chan string, 10) - r.errChans = append(r.errChans, errChan) - return errChan -} - -// StdoutStream returns a channel that will be sent all the output -// of stdout as it comes. The output isn't guaranteed to be a full line. -// When the channel is closed, the process is exited. -func (r *RemoteCommand) StdoutChan() <-chan string { - r.outChanLock.Lock() - defer r.outChanLock.Unlock() - - // If no output channels have been made yet, then make that slice - // and start the goroutine to read and send to them. - if r.outChans == nil { - r.outChans = make([]chan<- string, 0, 5) - go r.channelReader(r.Stdout, &r.outChans, &r.outChanLock) - } - - // Create the channel, append it to the channels we care about - outChan := make(chan string, 10) - r.outChans = append(r.outChans, outChan) - return outChan -} - -// ExitChan returns a channel that will be sent the exit status once -// the process exits. This can be used in cases such a select statement -// waiting on the process to end. -func (r *RemoteCommand) ExitChan() <-chan int { - r.exitChanLock.Lock() - defer r.exitChanLock.Unlock() - - // If we haven't made any channels yet, make that slice - if r.exitChans == nil { - r.exitChans = make([]chan<- int, 0, 5) - - go func() { - // Wait for the command to finish - r.Wait() - - // Grab the exit chan lock so we can iterate over it and - // message to each channel. - r.exitChanLock.Lock() - defer r.exitChanLock.Unlock() - - for _, ch := range r.exitChans { - // Use a select so the send never blocks - select { - case ch <- r.ExitStatus: - default: - log.Println("remote command exit channel wouldn't blocked. Weird.") - } - - close(ch) - } - - r.exitChans = nil - }() - } - - // Append our new channel onto it and return it - exitChan := make(chan int, 1) - r.exitChans = append(r.exitChans, exitChan) - return exitChan -} - -// Wait waits for the command to exit. -func (r *RemoteCommand) Wait() { - // Busy wait on being exited. We put a sleep to be kind to the - // Go scheduler, and because we don't really need smaller granularity. - for !r.Exited { - time.Sleep(10 * time.Millisecond) - } -} - -// Takes an io.Reader and then writes its data to a slice of channels. -// The channel slice is expected to be protected by the given mutex, and -// after the io.Reader is over, all channels will be closed and the slice -// set to nil. -func (*RemoteCommand) channelReader(r io.Reader, chans *[]chan<- string, chanLock *sync.Mutex) { - buf := bufio.NewReader(r) - - var err error - for err != io.EOF { - var data []byte - data, err = buf.ReadSlice('\n') - - if len(data) > 0 { - for _, ch := range *chans { - // Note: this blocks if the channel is full (they - // are buffered by default). What to do? - ch <- string(data) - } - } - } - - // Clean up the channels by closing them and setting the - // list to nil. - chanLock.Lock() - defer chanLock.Unlock() - - for _, ch := range *chans { - close(ch) - } - - *chans = nil -} diff --git a/packer/communicator_test.go b/packer/communicator_test.go index f5865430d..a90b5e79a 100644 --- a/packer/communicator_test.go +++ b/packer/communicator_test.go @@ -1,122 +1,2 @@ package packer -import ( - "bytes" - "testing" - "time" -) - -func TestRemoteCommand_ExitChan(t *testing.T) { - t.Parallel() - - rc := &RemoteCommand{} - exitChan := rc.ExitChan() - - // Set the exit data so that it is sent - rc.ExitStatus = 42 - rc.Exited = true - - select { - case exitCode := <-exitChan: - if exitCode != 42 { - t.Fatal("invalid exit code") - } - - _, ok := <-exitChan - if ok { - t.Fatal("exit channel should be closed") - } - case <-time.After(500 * time.Millisecond): - t.Fatal("exit channel never sent") - } -} - -func TestRemoteCommand_StderrChan(t *testing.T) { - expected := "DATA!!!" - - stderrBuf := new(bytes.Buffer) - stderrBuf.WriteString(expected) - - rc := &RemoteCommand{} - rc.Stderr = stderrBuf - - errChan := rc.StderrChan() - - results := new(bytes.Buffer) - for data := range errChan { - results.WriteString(data) - } - - if results.String() != expected { - t.Fatalf( - "outputs didn't match:\ngot:\n%s\nexpected:\n%s", - results.String(), stderrBuf.String()) - } -} - -func TestRemoteCommand_StdoutChan(t *testing.T) { - expected := "DATA!!!" - - stdoutBuf := new(bytes.Buffer) - stdoutBuf.WriteString(expected) - - rc := &RemoteCommand{} - rc.Stdout = stdoutBuf - - outChan := rc.StdoutChan() - - results := new(bytes.Buffer) - for data := range outChan { - results.WriteString(data) - } - - if results.String() != expected { - t.Fatalf( - "outputs didn't match:\ngot:\n%s\nexpected:\n%s", - results.String(), stdoutBuf.String()) - } -} - -func TestRemoteCommand_WaitBlocks(t *testing.T) { - t.Parallel() - - rc := &RemoteCommand{} - - complete := make(chan bool) - - // Make a goroutine that never exits. Since this is just in a test, - // this should be okay. - go func() { - rc.Wait() - complete <- true - }() - - select { - case <-complete: - t.Fatal("It never should've completed") - case <-time.After(500 * time.Millisecond): - // All is well - } -} - -func TestRemoteCommand_WaitCompletes(t *testing.T) { - t.Parallel() - - rc := &RemoteCommand{} - - complete := make(chan bool) - go func() { - rc.Wait() - complete <- true - }() - - // Flag that it completed - rc.Exited = true - - select { - case <-complete: - // All is well - case <-time.After(1 * time.Second): - t.Fatal("Timeout waiting for command completion.") - } -}