diff --git a/internal/rpcapi/internal_client.go b/internal/rpcapi/internal_client.go new file mode 100644 index 0000000000..c64fcc331a --- /dev/null +++ b/internal/rpcapi/internal_client.go @@ -0,0 +1,120 @@ +package rpcapi + +import ( + "context" + "fmt" + "net" + + "github.com/hashicorp/terraform/internal/rpcapi/terraform1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) + +// Client is a client for the RPC API. +// +// This just wraps a raw gRPC client connection and provides a more convenient +// API to access its services. +type Client struct { + // conn should be a connection to a server that has already completed + // the Setup.Handshake call. + conn *grpc.ClientConn + // serverCaps should be from the result of the Setup.Handshake call + // previously made to the server that conn is connected to. + serverCaps *terraform1.ServerCapabilities + + close func(context.Context) error +} + +// NewInternalClient returns a client for the RPC API that uses in-memory +// buffers to allow callers within the same Terraform CLI process to access +// the RPC API without any sockets or child processes. +// +// This is intended for exposing Terraform Core functionality through Terraform +// CLI, to establish an explicit interface between those two sides without +// the overhead of forking a child process containing exactly the same code. +// +// Callers should call the Close method of the returned client once they are +// done using it, or else they will leak goroutines. +func NewInternalClient(ctx context.Context, clientCaps *terraform1.ClientCapabilities) (*Client, error) { + fakeListener := bufconn.Listen(4 * 1024 * 1024 /* buffer size */) + srv := grpc.NewServer() + registerGRPCServices(srv) + + go func() { + if err := srv.Serve(fakeListener); err != nil { + // We can't actually return an error here, but this should + // not arise with our fake listener anyway so we'll just panic. + panic(err) + } + }() + + fakeDialer := func(ctx context.Context, fakeAddr string) (net.Conn, error) { + return fakeListener.DialContext(ctx) + } + clientConn, err := grpc.DialContext( + ctx, "testfake", + grpc.WithContextDialer(fakeDialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, fmt.Errorf("failed to connect to RPC API: %w", err) + } + + // We perform the setup step on the caller's behalf, so that they can + // immediately use the main services. (The caller would otherwise need + // to do this immediately on return anyway, or the result would be + // useless.) + setupClient := terraform1.NewSetupClient(clientConn) + setupResp, err := setupClient.Handshake(ctx, &terraform1.Handshake_Request{ + Capabilities: clientCaps, + }) + if err != nil { + return nil, fmt.Errorf("setup failed: %w", err) + } + + var client *Client + client = &Client{ + conn: clientConn, + serverCaps: setupResp.Capabilities, + close: func(ctx context.Context) error { + clientConn.Close() + srv.Stop() + fakeListener.Close() + client.conn = nil + client.serverCaps = nil + client.close = func(context.Context) error { + return nil + } + return nil + }, + } + + return client, nil +} + +// Close frees the internal buffers and terminates the goroutines that handle +// the internal RPC API connection. +// +// Any service clients previously returned by other methods become invalid +// as soon as this method is called, and must not be used any further. +func (c *Client) Close(ctx context.Context) error { + return c.close(ctx) +} + +// ServerCapabilities returns the server's response to capability negotiation. +// +// Callers must not modify anything reachable through the returned pointer. +func (c *Client) ServerCapabilities() *terraform1.ServerCapabilities { + return c.serverCaps +} + +// Dependencies returns a client for the Dependencies service of the RPC API. +func (c *Client) Dependencies() terraform1.DependenciesClient { + return terraform1.NewDependenciesClient(c.conn) +} + +// Stacks returns a client for the Stacks service of the RPC API. +func (c *Client) Stacks() terraform1.StacksClient { + return terraform1.NewStacksClient(c.conn) +} diff --git a/internal/rpcapi/internal_client_test.go b/internal/rpcapi/internal_client_test.go new file mode 100644 index 0000000000..fc5fa932ad --- /dev/null +++ b/internal/rpcapi/internal_client_test.go @@ -0,0 +1,25 @@ +package rpcapi_test + +import ( + "context" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/hashicorp/terraform/internal/rpcapi" + "github.com/hashicorp/terraform/internal/rpcapi/terraform1" +) + +func TestInternalClientOpenClose(t *testing.T) { + ctx := context.Background() + client, err := rpcapi.NewInternalClient(ctx, &terraform1.ClientCapabilities{}) + if err != nil { + t.Error(err) + } + + t.Logf("server capabilities: %s", spew.Sdump(client.ServerCapabilities())) + + err = client.Close(ctx) + if err != nil { + t.Error(err) + } +} diff --git a/internal/rpcapi/plugin.go b/internal/rpcapi/plugin.go index 37c0b3c826..3423569938 100644 --- a/internal/rpcapi/plugin.go +++ b/internal/rpcapi/plugin.go @@ -24,15 +24,19 @@ func (p *corePlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, } func (p *corePlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + registerGRPCServices(s) + return nil +} + +func registerGRPCServices(s *grpc.Server) { // We initially only register the setup server, because the registration // of other services can vary depending on the capabilities negotiated // during handshake. - setup := newSetupServer(p.handshakeFunc(s)) + setup := newSetupServer(serverHandshake(s)) terraform1.RegisterSetupServer(s, setup) - return nil } -func (p *corePlugin) handshakeFunc(s *grpc.Server) func(context.Context, *terraform1.ClientCapabilities) (*terraform1.ServerCapabilities, error) { +func serverHandshake(s *grpc.Server) func(context.Context, *terraform1.ClientCapabilities) (*terraform1.ServerCapabilities, error) { dependencies := dynrpcserver.NewDependenciesStub() terraform1.RegisterDependenciesServer(s, dependencies) stacks := dynrpcserver.NewStacksStub()