From 0e7e94518211f7a024c47c2cc81cc846c07873f9 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Mon, 9 Dec 2013 15:44:00 -0800 Subject: [PATCH] packer/rpc: post-processors work on a single connection --- packer/rpc/artifact.go | 15 +++--- packer/rpc/client.go | 23 +++++++-- packer/rpc/post_processor.go | 78 +++++++++++++++++++------------ packer/rpc/post_processor_test.go | 10 ++-- packer/rpc/server.go | 13 +----- packer/rpc/server_new.go | 61 +++++++++++++++++++----- packer/rpc/ui.go | 3 +- 7 files changed, 136 insertions(+), 67 deletions(-) diff --git a/packer/rpc/artifact.go b/packer/rpc/artifact.go index d71f8831a..8ca3ef869 100644 --- a/packer/rpc/artifact.go +++ b/packer/rpc/artifact.go @@ -8,7 +8,8 @@ import ( // An implementation of packer.Artifact where the artifact is actually // available over an RPC connection. type artifact struct { - client *rpc.Client + client *rpc.Client + endpoint string } // ArtifactServer wraps a packer.Artifact implementation and makes it @@ -18,32 +19,32 @@ type ArtifactServer struct { } func Artifact(client *rpc.Client) *artifact { - return &artifact{client} + return &artifact{client: client} } func (a *artifact) BuilderId() (result string) { - a.client.Call("Artifact.BuilderId", new(interface{}), &result) + a.client.Call(a.endpoint+".BuilderId", new(interface{}), &result) return } func (a *artifact) Files() (result []string) { - a.client.Call("Artifact.Files", new(interface{}), &result) + a.client.Call(a.endpoint+".Files", new(interface{}), &result) return } func (a *artifact) Id() (result string) { - a.client.Call("Artifact.Id", new(interface{}), &result) + a.client.Call(a.endpoint+".Id", new(interface{}), &result) return } func (a *artifact) String() (result string) { - a.client.Call("Artifact.String", new(interface{}), &result) + a.client.Call(a.endpoint+".String", new(interface{}), &result) return } func (a *artifact) Destroy() error { var result error - if err := a.client.Call("Artifact.Destroy", new(interface{}), &result); err != nil { + if err := a.client.Call(a.endpoint+".Destroy", new(interface{}), &result); err != nil { return err } diff --git a/packer/rpc/client.go b/packer/rpc/client.go index d80ce2899..08ddeee12 100644 --- a/packer/rpc/client.go +++ b/packer/rpc/client.go @@ -12,6 +12,7 @@ import ( type Client struct { mux *MuxConn client *rpc.Client + server *rpc.Server } func NewClient(rwc io.ReadWriteCloser) (*Client, error) { @@ -20,14 +21,28 @@ func NewClient(rwc io.ReadWriteCloser) (*Client, error) { // remote RPC server. On the remote side Server.ServeConn also listens // on this stream ID. mux := NewMuxConn(rwc) - stream, err := mux.Dial(0) + clientConn, err := mux.Dial(0) if err != nil { + mux.Close() return nil, err } + // Accept connection ID 1 which is what the remote end uses to + // be an RPC client back to us so we can even serve some objects. + serverConn, err := mux.Accept(1) + if err != nil { + mux.Close() + return nil, err + } + + // Start our RPC server on this end + server := rpc.NewServer() + go server.ServeConn(serverConn) + return &Client{ mux: mux, - client: rpc.NewClient(stream), + client: rpc.NewClient(clientConn), + server: server, }, nil } @@ -41,7 +56,8 @@ func (c *Client) Close() error { func (c *Client) Artifact() packer.Artifact { return &artifact{ - client: c.client, + client: c.client, + endpoint: DefaultArtifactEndpoint, } } @@ -54,5 +70,6 @@ func (c *Client) Cache() packer.Cache { func (c *Client) PostProcessor() packer.PostProcessor { return &postProcessor{ client: c.client, + server: c.server, } } diff --git a/packer/rpc/post_processor.go b/packer/rpc/post_processor.go index 2f743eb65..20e4f9b70 100644 --- a/packer/rpc/post_processor.go +++ b/packer/rpc/post_processor.go @@ -9,27 +9,36 @@ import ( // executed over an RPC connection. type postProcessor struct { client *rpc.Client + server *rpc.Server } // PostProcessorServer wraps a packer.PostProcessor implementation and makes it // exportable as part of a Golang RPC server. type PostProcessorServer struct { - p packer.PostProcessor + client *rpc.Client + server *rpc.Server + p packer.PostProcessor } type PostProcessorConfigureArgs struct { Configs []interface{} } +type PostProcessorPostProcessArgs struct { + ArtifactEndpoint string + UiEndpoint string +} + type PostProcessorProcessResponse struct { - Err error - Keep bool - RPCAddress string + Err error + Keep bool + ArtifactEndpoint string } func PostProcessor(client *rpc.Client) *postProcessor { - return &postProcessor{client} + return &postProcessor{client: client} } + func (p *postProcessor) Configure(raw ...interface{}) (err error) { args := &PostProcessorConfigureArgs{Configs: raw} if cerr := p.client.Call("PostProcessor.Configure", args, &err); cerr != nil { @@ -40,12 +49,21 @@ func (p *postProcessor) Configure(raw ...interface{}) (err error) { } func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Artifact, bool, error) { - server := rpc.NewServer() - RegisterArtifact(server, a) - RegisterUi(server, ui) + artifactEndpoint := registerComponent(p.server, "Artifact", &ArtifactServer{ + artifact: a, + }, true) + + uiEndpoint := registerComponent(p.server, "Ui", &UiServer{ + ui: ui, + }, true) + + args := PostProcessorPostProcessArgs{ + ArtifactEndpoint: artifactEndpoint, + UiEndpoint: uiEndpoint, + } var response PostProcessorProcessResponse - if err := p.client.Call("PostProcessor.PostProcess", serveSingleConn(server), &response); err != nil { + if err := p.client.Call("PostProcessor.PostProcess", &args, &response); err != nil { return nil, false, err } @@ -53,16 +71,14 @@ func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Art return nil, false, response.Err } - if response.RPCAddress == "" { + if response.ArtifactEndpoint == "" { return nil, false, nil } - client, err := rpcDial(response.RPCAddress) - if err != nil { - return nil, false, err - } - - return Artifact(client), response.Keep, nil + return &artifact{ + client: p.client, + endpoint: response.ArtifactEndpoint, + }, response.Keep, nil } func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply *error) error { @@ -74,19 +90,23 @@ func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply return nil } -func (p *PostProcessorServer) PostProcess(address string, reply *PostProcessorProcessResponse) error { - client, err := rpcDial(address) - if err != nil { - return err +func (p *PostProcessorServer) PostProcess(args *PostProcessorPostProcessArgs, reply *PostProcessorProcessResponse) error { + artifact := &artifact{ + client: p.client, + endpoint: args.ArtifactEndpoint, } - responseAddress := "" + ui := &Ui{ + client: p.client, + endpoint: args.UiEndpoint, + } - artifact, keep, err := p.p.PostProcess(&Ui{client: client}, Artifact(client)) - if err == nil && artifact != nil { - server := rpc.NewServer() - RegisterArtifact(server, artifact) - responseAddress = serveSingleConn(server) + var artifactEndpoint string + artifactResult, keep, err := p.p.PostProcess(ui, artifact) + if err == nil && artifactResult != nil { + artifactEndpoint = registerComponent(p.server, "Artifact", &ArtifactServer{ + artifact: artifactResult, + }, true) } if err != nil { @@ -94,9 +114,9 @@ func (p *PostProcessorServer) PostProcess(address string, reply *PostProcessorPr } *reply = PostProcessorProcessResponse{ - Err: err, - Keep: keep, - RPCAddress: responseAddress, + Err: err, + Keep: keep, + ArtifactEndpoint: artifactEndpoint, } return nil diff --git a/packer/rpc/post_processor_test.go b/packer/rpc/post_processor_test.go index 990e9a8ce..e22136ed6 100644 --- a/packer/rpc/post_processor_test.go +++ b/packer/rpc/post_processor_test.go @@ -56,7 +56,9 @@ func TestPostProcessorRPC(t *testing.T) { } // Test PostProcess - a := new(packer.MockArtifact) + a := &packer.MockArtifact{ + IdValue: "ppTestId", + } ui := new(testUi) artifact, _, err := ppClient.PostProcess(ui, a) if err != nil { @@ -67,12 +69,12 @@ func TestPostProcessorRPC(t *testing.T) { t.Fatal("postprocess should be called") } - if p.ppArtifact.BuilderId() != "bid" { + if p.ppArtifact.Id() != "ppTestId" { t.Fatal("unknown artifact") } - if artifact.BuilderId() != "bid" { - t.Fatal("unknown result artifact") + if artifact.Id() != "id" { + t.Fatalf("unknown artifact: %s", artifact.Id()) } } diff --git a/packer/rpc/server.go b/packer/rpc/server.go index e1ab19663..68116885b 100644 --- a/packer/rpc/server.go +++ b/packer/rpc/server.go @@ -56,7 +56,7 @@ func RegisterHook(s *rpc.Server, h packer.Hook) { // Registers the appropriate endpoing on an RPC server to serve a // PostProcessor. func RegisterPostProcessor(s *rpc.Server, p packer.PostProcessor) { - registerComponent(s, "PostProcessor", &PostProcessorServer{p}, false) + registerComponent(s, "PostProcessor", &PostProcessorServer{p: p}, false) } // Registers the appropriate endpoint on an RPC server to serve a packer.Provisioner @@ -70,17 +70,6 @@ func RegisterUi(s *rpc.Server, ui packer.Ui) { registerComponent(s, "Ui", &UiServer{ui}, false) } -// registerComponent registers a single Packer RPC component onto -// the RPC server. If id is true, then a unique ID number will be appended -// onto the end of the endpoint. -// -// The endpoint name is returned. -func registerComponent(s *rpc.Server, name string, rcvr interface{}, id bool) string { - endpoint := name - s.RegisterName(endpoint, rcvr) - return endpoint -} - func serveSingleConn(s *rpc.Server) string { l := netListenerInRange(portRangeMin, portRangeMax) diff --git a/packer/rpc/server_new.go b/packer/rpc/server_new.go index 6b596bc11..a4782a0d1 100644 --- a/packer/rpc/server_new.go +++ b/packer/rpc/server_new.go @@ -9,31 +9,35 @@ import ( "sync/atomic" ) +var endpointId uint64 + +const ( + DefaultArtifactEndpoint string = "Artifact" +) + // Server represents an RPC server for Packer. This must be paired on // the other side with a Client. type Server struct { - endpointId uint64 - rpcServer *rpc.Server + components map[string]interface{} } // NewServer returns a new Packer RPC server. func NewServer() *Server { return &Server{ - endpointId: 0, - rpcServer: rpc.NewServer(), + components: make(map[string]interface{}), } } func (s *Server) RegisterArtifact(a packer.Artifact) { - s.registerComponent("Artifact", &ArtifactServer{a}, false) + s.components[DefaultArtifactEndpoint] = a } func (s *Server) RegisterCache(c packer.Cache) { - s.registerComponent("Cache", &CacheServer{c}, false) + s.components["Cache"] = c } func (s *Server) RegisterPostProcessor(p packer.PostProcessor) { - s.registerComponent("PostProcessor", &PostProcessorServer{p}, false) + s.components["PostProcessor"] = p } // ServeConn serves a single connection over the RPC server. It is up @@ -50,7 +54,42 @@ func (s *Server) ServeConn(conn io.ReadWriteCloser) { return } - s.rpcServer.ServeConn(stream) + clientConn, err := mux.Dial(1) + if err != nil { + log.Printf("[ERR] Error connecting to client stream: %s", err) + return + } + client := rpc.NewClient(clientConn) + + // Create the RPC server + server := rpc.NewServer() + for endpoint, iface := range s.components { + var endpointVal interface{} + + switch v := iface.(type) { + case packer.Artifact: + endpointVal = &ArtifactServer{ + artifact: v, + } + case packer.Cache: + endpointVal = &CacheServer{ + cache: v, + } + case packer.PostProcessor: + endpointVal = &PostProcessorServer{ + client: client, + server: server, + p: v, + } + default: + log.Printf("[ERR] Unknown component for endpoint: %s", endpoint) + return + } + + registerComponent(server, endpoint, endpointVal, false) + } + + server.ServeConn(stream) } // registerComponent registers a single Packer RPC component onto @@ -58,12 +97,12 @@ func (s *Server) ServeConn(conn io.ReadWriteCloser) { // onto the end of the endpoint. // // The endpoint name is returned. -func (s *Server) registerComponent(name string, rcvr interface{}, id bool) string { +func registerComponent(server *rpc.Server, name string, rcvr interface{}, id bool) string { endpoint := name if id { - fmt.Sprintf("%s.%d", endpoint, atomic.AddUint64(&s.endpointId, 1)) + fmt.Sprintf("%s.%d", endpoint, atomic.AddUint64(&endpointId, 1)) } - s.rpcServer.RegisterName(endpoint, rcvr) + server.RegisterName(endpoint, rcvr) return endpoint } diff --git a/packer/rpc/ui.go b/packer/rpc/ui.go index 4d7ccc57f..da032160a 100644 --- a/packer/rpc/ui.go +++ b/packer/rpc/ui.go @@ -9,7 +9,8 @@ import ( // An implementation of packer.Ui where the Ui is actually executed // over an RPC connection. type Ui struct { - client *rpc.Client + client *rpc.Client + endpoint string } // UiServer wraps a packer.Ui implementation and makes it exportable