From 92ec548dce4fca2c174827ae93ba323834f07432 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 10 Dec 2013 11:43:02 -0800 Subject: [PATCH] packer/rpc: implement Communicator --- packer/rpc/client.go | 7 ++ packer/rpc/communicator.go | 133 +++++++++++++------------------- packer/rpc/communicator_test.go | 20 ++--- packer/rpc/muxconn.go | 2 +- packer/rpc/server.go | 2 +- packer/rpc/server_new.go | 8 ++ 6 files changed, 77 insertions(+), 95 deletions(-) diff --git a/packer/rpc/client.go b/packer/rpc/client.go index 4dd9deb47..cc657741e 100644 --- a/packer/rpc/client.go +++ b/packer/rpc/client.go @@ -51,6 +51,13 @@ func (c *Client) Cache() packer.Cache { } } +func (c *Client) Communicator() packer.Communicator { + return &communicator{ + client: c.client, + mux: c.mux, + } +} + func (c *Client) PostProcessor() packer.PostProcessor { return &postProcessor{ client: c.client, diff --git a/packer/rpc/communicator.go b/packer/rpc/communicator.go index 77e321153..de624a7af 100644 --- a/packer/rpc/communicator.go +++ b/packer/rpc/communicator.go @@ -2,11 +2,9 @@ package rpc import ( "encoding/gob" - "errors" "github.com/mitchellh/packer/packer" "io" "log" - "net" "net/rpc" ) @@ -14,12 +12,14 @@ import ( // executed over an RPC connection. type communicator struct { client *rpc.Client + mux *MuxConn } // CommunicatorServer wraps a packer.Communicator implementation and makes // it exportable as part of a Golang RPC server. type CommunicatorServer struct { - c packer.Communicator + c packer.Communicator + mux *MuxConn } type CommandFinished struct { @@ -27,21 +27,21 @@ type CommandFinished struct { } type CommunicatorStartArgs struct { - Command string - StdinAddress string - StdoutAddress string - StderrAddress string - ResponseAddress string + Command string + StdinStreamId uint32 + StdoutStreamId uint32 + StderrStreamId uint32 + ResponseStreamId uint32 } type CommunicatorDownloadArgs struct { - Path string - WriterAddress string + Path string + WriterStreamId uint32 } type CommunicatorUploadArgs struct { - Path string - ReaderAddress string + Path string + ReaderStreamId uint32 } type CommunicatorUploadDirArgs struct { @@ -51,7 +51,7 @@ type CommunicatorUploadDirArgs struct { } func Communicator(client *rpc.Client) *communicator { - return &communicator{client} + return &communicator{client: client} } func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { @@ -59,41 +59,38 @@ func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { args.Command = cmd.Command if cmd.Stdin != nil { - stdinL := netListenerInRange(portRangeMin, portRangeMax) - args.StdinAddress = stdinL.Addr().String() - go serveSingleCopy("stdin", stdinL, nil, cmd.Stdin) + args.StdinStreamId = c.mux.NextId() + go serveSingleCopy("stdin", c.mux, args.StdinStreamId, nil, cmd.Stdin) } if cmd.Stdout != nil { - stdoutL := netListenerInRange(portRangeMin, portRangeMax) - args.StdoutAddress = stdoutL.Addr().String() - go serveSingleCopy("stdout", stdoutL, cmd.Stdout, nil) + args.StdoutStreamId = c.mux.NextId() + go serveSingleCopy("stdout", c.mux, args.StdoutStreamId, cmd.Stdout, nil) } if cmd.Stderr != nil { - stderrL := netListenerInRange(portRangeMin, portRangeMax) - args.StderrAddress = stderrL.Addr().String() - go serveSingleCopy("stderr", stderrL, cmd.Stderr, nil) + args.StderrStreamId = c.mux.NextId() + go serveSingleCopy("stderr", c.mux, args.StderrStreamId, cmd.Stderr, nil) } - responseL := netListenerInRange(portRangeMin, portRangeMax) - args.ResponseAddress = responseL.Addr().String() + responseStreamId := c.mux.NextId() + args.ResponseStreamId = responseStreamId go func() { - defer responseL.Close() - - conn, err := responseL.Accept() + conn, err := c.mux.Accept(responseStreamId) if err != nil { + log.Printf("[ERR] Error accepting response stream %d: %s", + responseStreamId, err) cmd.SetExited(123) return } - defer conn.Close() - decoder := gob.NewDecoder(conn) - var finished CommandFinished + decoder := gob.NewDecoder(conn) if err := decoder.Decode(&finished); err != nil { + log.Printf("[ERR] Error decoding response stream %d: %s", + responseStreamId, err) cmd.SetExited(123) return } @@ -106,23 +103,13 @@ func (c *communicator) Start(cmd *packer.RemoteCmd) (err error) { } func (c *communicator) Upload(path string, r io.Reader) (err error) { - // We need to create a server that can proxy the reader data - // over because we can't simply gob encode an io.Reader - readerL := netListenerInRange(portRangeMin, portRangeMax) - if readerL == nil { - err = errors.New("couldn't allocate listener for upload reader") - return - } - - // Make sure at the end of this call, we close the listener - defer readerL.Close() - // Pipe the reader through to the connection - go serveSingleCopy("uploadReader", readerL, nil, r) + streamId := c.mux.NextId() + go serveSingleCopy("uploadReader", c.mux, streamId, nil, r) args := CommunicatorUploadArgs{ - path, - readerL.Addr().String(), + Path: path, + ReaderStreamId: streamId, } err = c.client.Call("Communicator.Upload", &args, new(interface{})) @@ -146,23 +133,13 @@ func (c *communicator) UploadDir(dst string, src string, exclude []string) error } func (c *communicator) Download(path string, w io.Writer) (err error) { - // We need to create a server that can proxy that data downloaded - // into the writer because we can't gob encode a writer directly. - writerL := netListenerInRange(portRangeMin, portRangeMax) - if writerL == nil { - err = errors.New("couldn't allocate listener for download writer") - return - } - - // Make sure we close the listener once we're done because we'll be done - defer writerL.Close() - // Serve a single connection and a single copy - go serveSingleCopy("downloadWriter", writerL, w, nil) + streamId := c.mux.NextId() + go serveSingleCopy("downloadWriter", c.mux, streamId, w, nil) args := CommunicatorDownloadArgs{ - path, - writerL.Addr().String(), + Path: path, + WriterStreamId: streamId, } err = c.client.Call("Communicator.Download", &args, new(interface{})) @@ -175,40 +152,40 @@ func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface var cmd packer.RemoteCmd cmd.Command = args.Command - toClose := make([]net.Conn, 0) - if args.StdinAddress != "" { - stdinC, err := tcpDial(args.StdinAddress) + toClose := make([]io.Closer, 0) + if args.StdinStreamId > 0 { + conn, err := c.mux.Dial(args.StdinStreamId) if err != nil { return err } - toClose = append(toClose, stdinC) - cmd.Stdin = stdinC + toClose = append(toClose, conn) + cmd.Stdin = conn } - if args.StdoutAddress != "" { - stdoutC, err := tcpDial(args.StdoutAddress) + if args.StdoutStreamId > 0 { + conn, err := c.mux.Dial(args.StdoutStreamId) if err != nil { return err } - toClose = append(toClose, stdoutC) - cmd.Stdout = stdoutC + toClose = append(toClose, conn) + cmd.Stdout = conn } - if args.StderrAddress != "" { - stderrC, err := tcpDial(args.StderrAddress) + if args.StderrStreamId > 0 { + conn, err := c.mux.Dial(args.StderrStreamId) if err != nil { return err } - toClose = append(toClose, stderrC) - cmd.Stderr = stderrC + toClose = append(toClose, conn) + cmd.Stderr = conn } // Connect to the response address so we can write our result to it // when ready. - responseC, err := tcpDial(args.ResponseAddress) + responseC, err := c.mux.Dial(args.ResponseStreamId) if err != nil { return err } @@ -234,11 +211,10 @@ func (c *CommunicatorServer) Start(args *CommunicatorStartArgs, reply *interface } func (c *CommunicatorServer) Upload(args *CommunicatorUploadArgs, reply *interface{}) (err error) { - readerC, err := tcpDial(args.ReaderAddress) + readerC, err := c.mux.Dial(args.ReaderStreamId) if err != nil { return } - defer readerC.Close() err = c.c.Upload(args.Path, readerC) @@ -250,21 +226,18 @@ func (c *CommunicatorServer) UploadDir(args *CommunicatorUploadDirArgs, reply *e } func (c *CommunicatorServer) Download(args *CommunicatorDownloadArgs, reply *interface{}) (err error) { - writerC, err := tcpDial(args.WriterAddress) + writerC, err := c.mux.Dial(args.WriterStreamId) if err != nil { return } - defer writerC.Close() err = c.c.Download(args.Path, writerC) return } -func serveSingleCopy(name string, l net.Listener, dst io.Writer, src io.Reader) { - defer l.Close() - - conn, err := l.Accept() +func serveSingleCopy(name string, mux *MuxConn, id uint32, dst io.Writer, src io.Reader) { + conn, err := mux.Accept(id) if err != nil { log.Printf("'%s' accept error: %s", name, err) return diff --git a/packer/rpc/communicator_test.go b/packer/rpc/communicator_test.go index f6013a061..ca8239514 100644 --- a/packer/rpc/communicator_test.go +++ b/packer/rpc/communicator_test.go @@ -4,7 +4,6 @@ import ( "bufio" "github.com/mitchellh/packer/packer" "io" - "net/rpc" "reflect" "testing" ) @@ -14,16 +13,11 @@ func TestCommunicatorRPC(t *testing.T) { c := new(packer.MockCommunicator) // Start the server - server := rpc.NewServer() - RegisterCommunicator(server, c) - address := serveSingleConn(server) - - // Create the client over RPC and run some methods to verify it works - client, err := rpc.Dial("tcp", address) - if err != nil { - t.Fatalf("err: %s", err) - } - remote := Communicator(client) + client, server := testClientServer(t) + defer client.Close() + defer server.Close() + server.RegisterCommunicator(c) + remote := client.Communicator() // The remote command we'll use stdin_r, stdin_w := io.Pipe() @@ -42,7 +36,7 @@ func TestCommunicatorRPC(t *testing.T) { c.StartExitStatus = 42 // Test Start - err = remote.Start(&cmd) + err := remote.Start(&cmd) if err != nil { t.Fatalf("err: %s", err) } @@ -74,7 +68,7 @@ func TestCommunicatorRPC(t *testing.T) { stdin_w.Close() cmd.Wait() if c.StartStdin != "info\n" { - t.Fatalf("bad data: %s", data) + t.Fatalf("bad data: %s", c.StartStdin) } // Test that we can get the exit status properly diff --git a/packer/rpc/muxconn.go b/packer/rpc/muxconn.go index f5f0ce804..87941b826 100644 --- a/packer/rpc/muxconn.go +++ b/packer/rpc/muxconn.go @@ -266,7 +266,7 @@ func (m *MuxConn) loop() { return } - log.Printf("[DEBUG] Stream %d received packet %d", id, packetType) + //log.Printf("[DEBUG] Stream %d received packet %d", id, packetType) switch packetType { case muxPacketAck: stream.mu.Lock() diff --git a/packer/rpc/server.go b/packer/rpc/server.go index 68116885b..0d6cacf62 100644 --- a/packer/rpc/server.go +++ b/packer/rpc/server.go @@ -38,7 +38,7 @@ func RegisterCommand(s *rpc.Server, c packer.Command) { // Registers the appropriate endpoint on an RPC server to serve a // Packer Communicator. func RegisterCommunicator(s *rpc.Server, c packer.Communicator) { - registerComponent(s, "Communicator", &CommunicatorServer{c}, false) + registerComponent(s, "Communicator", &CommunicatorServer{c: c}, false) } // Registers the appropriate endpoint on an RPC server to serve a diff --git a/packer/rpc/server_new.go b/packer/rpc/server_new.go index 4f531f187..8a1d0a128 100644 --- a/packer/rpc/server_new.go +++ b/packer/rpc/server_new.go @@ -14,6 +14,7 @@ var endpointId uint64 const ( DefaultArtifactEndpoint string = "Artifact" DefaultCacheEndpoint = "Cache" + DefaultCommunicatorEndpoint = "Communicator" DefaultPostProcessorEndpoint = "PostProcessor" DefaultUiEndpoint = "Ui" ) @@ -55,6 +56,13 @@ func (s *Server) RegisterCache(c packer.Cache) { }) } +func (s *Server) RegisterCommunicator(c packer.Communicator) { + s.server.RegisterName(DefaultCommunicatorEndpoint, &CommunicatorServer{ + c: c, + mux: s.mux, + }) +} + func (s *Server) RegisterPostProcessor(p packer.PostProcessor) { s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{ mux: s.mux,