From ea4171f163d3bfbc399e98910532459b0373cd26 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sun, 2 Jun 2013 23:08:40 -0700 Subject: [PATCH] packer/rpc: Hook up the new communicator interface --- packer/communicator_test.go | 1 - packer/rpc/communicator.go | 135 ++++++++++++-------------------- packer/rpc/communicator_test.go | 67 ++++++---------- 3 files changed, 75 insertions(+), 128 deletions(-) diff --git a/packer/communicator_test.go b/packer/communicator_test.go index a90b5e79a..d7c610c15 100644 --- a/packer/communicator_test.go +++ b/packer/communicator_test.go @@ -1,2 +1 @@ package packer - diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go index 05010fb72..5d43406d3 100644 --- a/packer/rpc/communicator.go +++ b/packer/rpc/communicator.go @@ -21,17 +21,11 @@ type CommunicatorServer struct { c packer.Communicator } -// RemoteCommandServer wraps a packer.RemoteCommand struct and makes it -// exportable as part of a Golang RPC server. -type RemoteCommandServer struct { - rc *packer.RemoteCommand -} - -type CommunicatorStartResponse struct { - StdinAddress string - StdoutAddress string - StderrAddress string - RemoteCommandAddress string +type CommunicatorStartArgs struct { + Command string + StdinAddress string + StdoutAddress string + StderrAddress string } type CommunicatorDownloadArgs struct { @@ -48,52 +42,29 @@ func Communicator(client *rpc.Client) *communicator { return &communicator{client} } -func (c *communicator) Start(cmd string) (rc *packer.RemoteCommand, err error) { - var response CommunicatorStartResponse - err = c.client.Call("Communicator.Start", &cmd, &response) - if err != nil { - return - } - - // Connect to the three streams that will handle stdin, stdout, - // and stderr and get net.Conns for them. - stdinC, err := net.Dial("tcp", response.StdinAddress) - if err != nil { - return - } - - stdoutC, err := net.Dial("tcp", response.StdoutAddress) - if err != nil { - return - } +func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { + var args CommunicatorStartArgs + args.Command = cmd.Command - stderrC, err := net.Dial("tcp", response.StderrAddress) - if err != nil { - return + if cmd.Stdin != nil { + stdinL := netListenerInRange(portRangeMin, portRangeMax) + args.StdinAddress = stdinL.Addr().String() + go serveSingleCopy("stdin", stdinL, nil, cmd.Stdin) } - // Connect to the RPC server for the remote command - client, err := rpc.Dial("tcp", response.RemoteCommandAddress) - if err != nil { - return + if cmd.Stdout != nil { + stdoutL := netListenerInRange(portRangeMin, portRangeMax) + args.StdoutAddress = stdoutL.Addr().String() + go serveSingleCopy("stdout", stdoutL, cmd.Stdout, nil) } - // Build the response object using the streams we created - rc = &packer.RemoteCommand{ - Stdin: stdinC, - Stdout: stdoutC, - Stderr: stderrC, - Exited: false, - ExitStatus: -1, + if cmd.Stderr != nil { + stderrL := netListenerInRange(portRangeMin, portRangeMax) + args.StderrAddress = stderrL.Addr().String() + go serveSingleCopy("stderr", stderrL, cmd.Stderr, nil) } - // In a goroutine, we wait for the process to exit, then we set - // that it has exited. - go func() { - client.Call("RemoteCommand.Wait", new(interface{}), &rc.ExitStatus) - rc.Exited = true - }() - + err = c.client.Call("Communicator.Start", &args, new(interface{})) return } @@ -145,41 +116,41 @@ func (c *communicator) Download(path string, w io.Writer) (err error) { return } -func (c *CommunicatorServer) Start(cmd *string, reply *CommunicatorStartResponse) (err error) { - // Start executing the command. - command, err := c.c.Start(*cmd) - if err != nil { - return +func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface{}) (err error) { + // Build the RemoteCmd on this side so that it all pipes over + // to the remote side. + var cmd packer.RemoteCmd + cmd.Command = args.Command + + if args.StdinAddress != "" { + stdinC, err := net.Dial("tcp", args.StdinAddress) + if err != nil { + return err + } + + cmd.Stdin = stdinC } - // If we didn't get a proper command... that's not right. - if command == nil { - return errors.New("communicator returned nil remote command") + if args.StdoutAddress != "" { + stdoutC, err := net.Dial("tcp", args.StdoutAddress) + if err != nil { + return err + } + + cmd.Stdout = stdoutC } - // Next, we need to take the stdin/stdout and start a listener - // for each because the client will connect to us via TCP and use - // that connection as the io.Reader or io.Writer. These exist for - // only a single connection that is persistent. - stdinL := netListenerInRange(portRangeMin, portRangeMax) - stdoutL := netListenerInRange(portRangeMin, portRangeMax) - stderrL := netListenerInRange(portRangeMin, portRangeMax) - go serveSingleCopy("stdin", stdinL, command.Stdin, nil) - go serveSingleCopy("stdout", stdoutL, nil, command.Stdout) - go serveSingleCopy("stderr", stderrL, nil, command.Stderr) - - // For the exit status, we use a simple RPC Server that serves - // some of the RemoteComand methods. - server := rpc.NewServer() - server.RegisterName("RemoteCommand", &RemoteCommandServer{command}) - - *reply = CommunicatorStartResponse{ - stdinL.Addr().String(), - stdoutL.Addr().String(), - stderrL.Addr().String(), - serveSingleConn(server), + if args.StderrAddress != "" { + stderrC, err := net.Dial("tcp", args.StderrAddress) + if err != nil { + return err + } + + cmd.Stderr = stderrC } + // Start the actual command + err = c.c.Start(&cmd) return } @@ -207,12 +178,6 @@ func (c *CommunicatorServer) Download(args *CommunicatorDownloadArgs, reply *int return } -func (rc *RemoteCommandServer) Wait(args *interface{}, reply *int) error { - rc.rc.Wait() - *reply = rc.rc.ExitStatus - return nil -} - func serveSingleCopy(name string, l net.Listener, dst io.Writer, src io.Reader) { defer l.Close() diff --git a/packer/rpc/communicator_test.go b/packer/rpc/communicator_test.go index 092b9285a..7b7867d86 100644 --- a/packer/rpc/communicator_test.go +++ b/packer/rpc/communicator_test.go @@ -11,13 +11,7 @@ import ( type testCommunicator struct { startCalled bool - startCmd string - - startIn *io.PipeReader - startOut *io.PipeWriter - startErr *io.PipeWriter - startExited *bool - startExitStatus *int + startCmd *packer.RemoteCmd uploadCalled bool uploadPath string @@ -27,29 +21,10 @@ type testCommunicator struct { downloadPath string } -func (t *testCommunicator) Start(cmd string) (*packer.RemoteCommand, error) { +func (t *testCommunicator) Start(cmd *packer.RemoteCmd) error { t.startCalled = true t.startCmd = cmd - - var stdin *io.PipeWriter - var stdout, stderr *io.PipeReader - - t.startIn, stdin = io.Pipe() - stdout, t.startOut = io.Pipe() - stderr, t.startErr = io.Pipe() - - rc := &packer.RemoteCommand{ - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - Exited: false, - ExitStatus: 0, - } - - t.startExited = &rc.Exited - t.startExitStatus = &rc.ExitStatus - - return rc, nil + return nil } func (t *testCommunicator) Upload(path string, reader io.Reader) (err error) { @@ -81,38 +56,46 @@ func TestCommunicatorRPC(t *testing.T) { // Create the client over RPC and run some methods to verify it works client, err := rpc.Dial("tcp", address) assert.Nil(err, "should be able to connect") + remote := Communicator(client) + + // The remote command we'll use + stdin_r, stdin_w := io.Pipe() + stdout_r, stdout_w := io.Pipe() + stderr_r, stderr_w := io.Pipe() + + var cmd packer.RemoteCmd + cmd.Command = "foo" + cmd.Stdin = stdin_r + cmd.Stdout = stdout_w + cmd.Stderr = stderr_w // Test Start - remote := Communicator(client) - rc, err := remote.Start("foo") + err = remote.Start(&cmd) assert.Nil(err, "should not have an error") // Test that we can read from stdout - bufOut := bufio.NewReader(rc.Stdout) - c.startOut.Write([]byte("outfoo\n")) + c.startCmd.Stdout.Write([]byte("outfoo\n")) + bufOut := bufio.NewReader(stdout_r) data, err := bufOut.ReadString('\n') assert.Nil(err, "should have no problem reading stdout") assert.Equal(data, "outfoo\n", "should be correct stdout") // Test that we can read from stderr - bufErr := bufio.NewReader(rc.Stderr) - c.startErr.Write([]byte("errfoo\n")) + c.startCmd.Stderr.Write([]byte("errfoo\n")) + bufErr := bufio.NewReader(stderr_r) data, err = bufErr.ReadString('\n') - assert.Nil(err, "should have no problem reading stdout") - assert.Equal(data, "errfoo\n", "should be correct stdout") + assert.Nil(err, "should have no problem reading stderr") + assert.Equal(data, "errfoo\n", "should be correct stderr") // Test that we can write to stdin - bufIn := bufio.NewReader(c.startIn) - rc.Stdin.Write([]byte("infoo\n")) + stdin_w.Write([]byte("infoo\n")) + bufIn := bufio.NewReader(c.startCmd.Stdin) data, err = bufIn.ReadString('\n') assert.Nil(err, "should have no problem reading stdin") assert.Equal(data, "infoo\n", "should be correct stdin") // Test that we can get the exit status properly - *c.startExitStatus = 42 - *c.startExited = true - rc.Wait() - assert.Equal(rc.ExitStatus, 42, "should have proper exit status") + // TODO // Test that we can upload things uploadR, uploadW := io.Pipe()