From cf412b95a26ddcaf4f09f177094b0860ac25f88b Mon Sep 17 00:00:00 2001 From: Johan Brandhorst-Satzkorn Date: Mon, 13 Feb 2023 11:56:53 -0800 Subject: [PATCH] fix: Support streaming HTTP responses (#2956) * all: Upgrade listenerutil dependency The new listenerutil implements http.Flusher, which is necessary for streaming responses from the grpc-gateway to work. * daemon/controller: Remove newline delimiter in streaming responses The default gRPC-Gateway HTTPBody marshaler uses newlines as a delimiter. This introduces newlines in our streamed responses. Wrapping the default marshaler lets us define a nil delimiter, which ensures our streaming responses are unaffected by the server side chunking. --- go.mod | 2 +- go.sum | 4 +- internal/daemon/controller/gateway.go | 14 +++- internal/daemon/controller/handler.go | 2 +- internal/daemon/controller/handler_test.go | 65 +++++++++++++++++++ .../handlers/{marshler.go => marshaler.go} | 0 6 files changed, 81 insertions(+), 6 deletions(-) rename internal/daemon/controller/handlers/{marshler.go => marshaler.go} (100%) diff --git a/go.mod b/go.mod index 85ac18a0cc..29e87a4d1b 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/hashicorp/go-secure-stdlib/configutil/v2 v2.0.7 github.com/hashicorp/go-secure-stdlib/gatedwriter v0.1.1 github.com/hashicorp/go-secure-stdlib/kv-builder v0.1.1 - github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5-0.20221130175209-f7789ac19a1f + github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5 github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 github.com/hashicorp/go-secure-stdlib/parseutil v0.1.7 github.com/hashicorp/go-secure-stdlib/password v0.1.1 diff --git a/go.sum b/go.sum index 202734ace6..b6d273ceaf 100644 --- a/go.sum +++ b/go.sum @@ -709,8 +709,8 @@ github.com/hashicorp/go-secure-stdlib/gatedwriter v0.1.1 h1:9um9R8i0+HbRHS9d64kd github.com/hashicorp/go-secure-stdlib/gatedwriter v0.1.1/go.mod h1:6RoRTSMDK2H/rKh3P/JIsk1tK8aatKTt3JyvIopi3GQ= github.com/hashicorp/go-secure-stdlib/kv-builder v0.1.1 h1:IJgULbAXuvWxzKFfu+Au1FUmHIJulS6N4F7Hkn+Kck0= github.com/hashicorp/go-secure-stdlib/kv-builder v0.1.1/go.mod h1:rf5JPE13wi+NwjgsmGkbg4b2CgHq8v7Htn/F0nDe/hg= -github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5-0.20221130175209-f7789ac19a1f h1:FemjU7MTEVt5HVI59M1AQjzEaR8mvlXr5I985BUbLSg= -github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5-0.20221130175209-f7789ac19a1f/go.mod h1:MSXg3Md+eg1hOJFSKuTELy6YnFhfMJBVYu7t07BdPc4= +github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5 h1:+vfyObZczNFbpGxHJWTZ+ZdTEF9boRal5rAkwKTc2yg= +github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5/go.mod h1:MSXg3Md+eg1hOJFSKuTELy6YnFhfMJBVYu7t07BdPc4= github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 h1:cCRo8gK7oq6A2L6LICkUZ+/a5rLiRXFMf1Qd4xSwxTc= github.com/hashicorp/go-secure-stdlib/mlock v0.1.1/go.mod h1:zq93CJChV6L9QTfGKtfBxKqD7BqqXx5O04A/ns2p5+I= github.com/hashicorp/go-secure-stdlib/parseutil v0.1.1/go.mod h1:QmrqtbKuxxSWTN3ETMPuB+VtEiBJ/A9XhoYGv8E1uD8= diff --git a/internal/daemon/controller/gateway.go b/internal/daemon/controller/gateway.go index e8fc02d6cd..97b1493948 100644 --- a/internal/daemon/controller/gateway.go +++ b/internal/daemon/controller/gateway.go @@ -39,10 +39,20 @@ func gatewayDialOptions(lis grpcServerListener) []grpc.DialOption { } } +type noDelimiterStreamingMarshaler struct { + runtime.Marshaler +} + +func (noDelimiterStreamingMarshaler) Delimiter() []byte { + return nil +} + func newGrpcGatewayMux() *runtime.ServeMux { return runtime.NewServeMux( - runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{ - Marshaler: handlers.JSONMarshaler(), + runtime.WithMarshalerOption(runtime.MIMEWildcard, &noDelimiterStreamingMarshaler{ + &runtime.HTTPBodyMarshaler{ + Marshaler: handlers.JSONMarshaler(), + }, }), runtime.WithErrorHandler(handlers.ErrorHandler()), runtime.WithForwardResponseOption(handlers.OutgoingResponseFilter), diff --git a/internal/daemon/controller/handler.go b/internal/daemon/controller/handler.go index 60529c976d..dbeb059d29 100644 --- a/internal/daemon/controller/handler.go +++ b/internal/daemon/controller/handler.go @@ -80,7 +80,7 @@ func createMuxWithEndpoints(c *Controller, props HandlerProperties) (http.Handle return p == uiPath } - return http.HandlerFunc(mux.ServeHTTP), isUiRequest, nil + return mux, isUiRequest, nil } // apiHandler returns an http.Handler for the services. This can be used on diff --git a/internal/daemon/controller/handler_test.go b/internal/daemon/controller/handler_test.go index 0aa2e8a593..c9b73b6c5c 100644 --- a/internal/daemon/controller/handler_test.go +++ b/internal/daemon/controller/handler_test.go @@ -6,8 +6,10 @@ package controller import ( "bytes" "context" + "crypto/rand" "encoding/json" "fmt" + "io" "io/ioutil" "net" "net/http" @@ -15,9 +17,13 @@ import ( "strings" "testing" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/hashicorp/boundary/internal/daemon/controller/handlers" + "github.com/hashicorp/boundary/internal/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/genproto/googleapis/api/httpbody" + "google.golang.org/protobuf/proto" ) func TestAuthenticationHandler(t *testing.T) { @@ -347,3 +353,62 @@ func TestCallbackInterceptor(t *testing.T) { require.NoError(t, server.Shutdown(context.Background())) } + +func TestStreamingResponse(t *testing.T) { + listener, err := net.Listen("tcp", ":0") + require.NoError(t, err) + t.Cleanup(func() { + // Ignore errors as a normal shutdown will also close the listener when + // the server Shutdown is called. This is just in case. + _ = listener.Close() + }) + mux := newGrpcGatewayMux() + marshaler := &noDelimiterStreamingMarshaler{ + &runtime.HTTPBodyMarshaler{ + Marshaler: handlers.JSONMarshaler(), + }, + } + size := 500 + blob := make([]byte, size) + _, err = io.ReadFull(rand.Reader, blob) + require.NoError(t, err) + var i int + n := 5 + recv := func() (proto.Message, error) { + t.Log("Sending chunk", i) + if i < n { + buf := make([]byte, size/n) + copy(buf, blob[i*len(buf):]) + i++ + return &httpbody.HttpBody{ + ContentType: "application/octet-stream", + Data: buf, + }, nil + } + return nil, io.EOF + } + + mux.HandlePath("GET", "/", runtime.HandlerFunc(func(w http.ResponseWriter, r *http.Request, _ map[string]string) { + ctx := r.Context() + ctx = runtime.NewServerMetadataContext(ctx, runtime.ServerMetadata{}) + runtime.ForwardResponseStream(ctx, mux, marshaler, w, r, recv) + })) + + server := &http.Server{ + Handler: mux, + } + go func() { + if err := server.Serve(listener); !errors.Is(err, http.ErrServerClosed) { + assert.NoError(t, err) + } + }() + t.Cleanup(func() { + require.NoError(t, server.Shutdown(context.Background())) + }) + resp, err := http.Get("http://" + listener.Addr().String()) + require.NoError(t, err) + read, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.True(t, string(read) == string(blob), "Got: %q", string(read)) + require.Equal(t, i, n) +} diff --git a/internal/daemon/controller/handlers/marshler.go b/internal/daemon/controller/handlers/marshaler.go similarity index 100% rename from internal/daemon/controller/handlers/marshler.go rename to internal/daemon/controller/handlers/marshaler.go