fix: Support streaming HTTP responses (#2956)

* all: Upgrade listenerutil dependency

The new listenerutil implements http.Flusher, which is necessary
for streaming responses from the grpc-gateway to work.

* daemon/controller: Remove newline delimiter in streaming responses

The default gRPC-Gateway HTTPBody marshaler uses newlines
as a delimiter. This introduces newlines in our streamed responses.
Wrapping the default marshaler lets us define a nil delimiter, which
ensures our streaming responses are unaffected by the server side
chunking.
pull/2968/head
Johan Brandhorst-Satzkorn 3 years ago committed by GitHub
parent bb6d1cad58
commit cf412b95a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -38,7 +38,7 @@ require (
github.com/hashicorp/go-secure-stdlib/configutil/v2 v2.0.7
github.com/hashicorp/go-secure-stdlib/gatedwriter v0.1.1
github.com/hashicorp/go-secure-stdlib/kv-builder v0.1.1
github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5-0.20221130175209-f7789ac19a1f
github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5
github.com/hashicorp/go-secure-stdlib/mlock v0.1.1
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.7
github.com/hashicorp/go-secure-stdlib/password v0.1.1

@ -709,8 +709,8 @@ github.com/hashicorp/go-secure-stdlib/gatedwriter v0.1.1 h1:9um9R8i0+HbRHS9d64kd
github.com/hashicorp/go-secure-stdlib/gatedwriter v0.1.1/go.mod h1:6RoRTSMDK2H/rKh3P/JIsk1tK8aatKTt3JyvIopi3GQ=
github.com/hashicorp/go-secure-stdlib/kv-builder v0.1.1 h1:IJgULbAXuvWxzKFfu+Au1FUmHIJulS6N4F7Hkn+Kck0=
github.com/hashicorp/go-secure-stdlib/kv-builder v0.1.1/go.mod h1:rf5JPE13wi+NwjgsmGkbg4b2CgHq8v7Htn/F0nDe/hg=
github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5-0.20221130175209-f7789ac19a1f h1:FemjU7MTEVt5HVI59M1AQjzEaR8mvlXr5I985BUbLSg=
github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5-0.20221130175209-f7789ac19a1f/go.mod h1:MSXg3Md+eg1hOJFSKuTELy6YnFhfMJBVYu7t07BdPc4=
github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5 h1:+vfyObZczNFbpGxHJWTZ+ZdTEF9boRal5rAkwKTc2yg=
github.com/hashicorp/go-secure-stdlib/listenerutil v0.1.5/go.mod h1:MSXg3Md+eg1hOJFSKuTELy6YnFhfMJBVYu7t07BdPc4=
github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 h1:cCRo8gK7oq6A2L6LICkUZ+/a5rLiRXFMf1Qd4xSwxTc=
github.com/hashicorp/go-secure-stdlib/mlock v0.1.1/go.mod h1:zq93CJChV6L9QTfGKtfBxKqD7BqqXx5O04A/ns2p5+I=
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.1/go.mod h1:QmrqtbKuxxSWTN3ETMPuB+VtEiBJ/A9XhoYGv8E1uD8=

@ -39,10 +39,20 @@ func gatewayDialOptions(lis grpcServerListener) []grpc.DialOption {
}
}
type noDelimiterStreamingMarshaler struct {
runtime.Marshaler
}
func (noDelimiterStreamingMarshaler) Delimiter() []byte {
return nil
}
func newGrpcGatewayMux() *runtime.ServeMux {
return runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{
Marshaler: handlers.JSONMarshaler(),
runtime.WithMarshalerOption(runtime.MIMEWildcard, &noDelimiterStreamingMarshaler{
&runtime.HTTPBodyMarshaler{
Marshaler: handlers.JSONMarshaler(),
},
}),
runtime.WithErrorHandler(handlers.ErrorHandler()),
runtime.WithForwardResponseOption(handlers.OutgoingResponseFilter),

@ -80,7 +80,7 @@ func createMuxWithEndpoints(c *Controller, props HandlerProperties) (http.Handle
return p == uiPath
}
return http.HandlerFunc(mux.ServeHTTP), isUiRequest, nil
return mux, isUiRequest, nil
}
// apiHandler returns an http.Handler for the services. This can be used on

@ -6,8 +6,10 @@ package controller
import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
@ -15,9 +17,13 @@ import (
"strings"
"testing"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/hashicorp/boundary/internal/daemon/controller/handlers"
"github.com/hashicorp/boundary/internal/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/api/httpbody"
"google.golang.org/protobuf/proto"
)
func TestAuthenticationHandler(t *testing.T) {
@ -347,3 +353,62 @@ func TestCallbackInterceptor(t *testing.T) {
require.NoError(t, server.Shutdown(context.Background()))
}
func TestStreamingResponse(t *testing.T) {
listener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
t.Cleanup(func() {
// Ignore errors as a normal shutdown will also close the listener when
// the server Shutdown is called. This is just in case.
_ = listener.Close()
})
mux := newGrpcGatewayMux()
marshaler := &noDelimiterStreamingMarshaler{
&runtime.HTTPBodyMarshaler{
Marshaler: handlers.JSONMarshaler(),
},
}
size := 500
blob := make([]byte, size)
_, err = io.ReadFull(rand.Reader, blob)
require.NoError(t, err)
var i int
n := 5
recv := func() (proto.Message, error) {
t.Log("Sending chunk", i)
if i < n {
buf := make([]byte, size/n)
copy(buf, blob[i*len(buf):])
i++
return &httpbody.HttpBody{
ContentType: "application/octet-stream",
Data: buf,
}, nil
}
return nil, io.EOF
}
mux.HandlePath("GET", "/", runtime.HandlerFunc(func(w http.ResponseWriter, r *http.Request, _ map[string]string) {
ctx := r.Context()
ctx = runtime.NewServerMetadataContext(ctx, runtime.ServerMetadata{})
runtime.ForwardResponseStream(ctx, mux, marshaler, w, r, recv)
}))
server := &http.Server{
Handler: mux,
}
go func() {
if err := server.Serve(listener); !errors.Is(err, http.ErrServerClosed) {
assert.NoError(t, err)
}
}()
t.Cleanup(func() {
require.NoError(t, server.Shutdown(context.Background()))
})
resp, err := http.Get("http://" + listener.Addr().String())
require.NoError(t, err)
read, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.True(t, string(read) == string(blob), "Got: %q", string(read))
require.Equal(t, i, n)
}

Loading…
Cancel
Save