You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
boundary/internal/plugin/loopback/testing_grpc_stream_test.go

275 lines
6.7 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package loopback
import (
"fmt"
"io"
"sync"
"testing"
plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin"
tr "github.com/stretchr/testify/require"
)
func Test_GetObjectStream_Client(t *testing.T) {
require := tr.New(t)
stream := newGetObjectStream()
require.NotNil(stream)
require.NotNil(stream.client)
// Validate Header is not nil and does not return an error
header, err := stream.client.Header()
require.NotNil(header)
require.NoError(err)
// Validate Trailer is not nil
trailer := stream.client.Trailer()
require.NotNil(trailer)
// Validate Context is not nil
ctx := stream.client.Context()
require.NotNil(ctx)
// validate SendMsg does not write to the channel messages
err = stream.client.SendMsg(nil)
require.NoError(err)
require.Empty(stream.messages)
// Validate RecvMsg does not read from the channel messages
streamSize := len(stream.messages)
var msg *getObjectStreamResponse
err = stream.client.RecvMsg(msg)
require.NoError(err)
require.Nil(msg)
require.Equal(streamSize, len(stream.messages))
// Validate Recv reads a message from the channel messages
var wg sync.WaitGroup
wg.Add(1)
go func() {
stream.messages <- &getObjectStreamResponse{
msg: &plgpb.GetObjectResponse{},
}
}()
go func() {
defer wg.Done()
resp, err := stream.client.Recv()
require.NoError(err)
require.NotNil(resp)
}()
wg.Wait()
// Validates Recv reads a error from the channel messages
wg.Add(1)
go func() {
stream.messages <- &getObjectStreamResponse{
err: fmt.Errorf("mock error"),
}
}()
go func() {
defer wg.Done()
resp, err := stream.client.Recv()
require.Error(err)
require.Nil(resp)
}()
wg.Wait()
// Validate CloseSend does not return an error
// Validate streamClosed is set to true
// Validate the channel messages is closed
var ok bool
err = stream.client.CloseSend()
require.NoError(err)
require.True(stream.streamClosed)
_, ok = <-stream.messages
require.False(ok)
// Validate CloseSend does return an error when the channel is closed
// Validate streamClosed is set to true
// Validate the channel messages is closed
err = stream.client.CloseSend()
require.Error(err)
require.Contains(err.Error(), "stream is closed")
require.True(stream.streamClosed)
_, ok = <-stream.messages
require.False(ok)
// Validate Recv returns EOF error when the channel is closed
resp, err := stream.client.Recv()
require.Equal(io.EOF, err)
require.Nil(resp)
}
func Test_GetObjectStream_Server(t *testing.T) {
require := tr.New(t)
stream := newGetObjectStream()
require.NotNil(stream)
require.NotNil(stream.server)
// Validate SetHeader does not return an error
err := stream.server.SetHeader(nil)
require.NoError(err)
// Validate SendHeader does not return an error
err = stream.server.SendHeader(nil)
require.NoError(err)
// Validate Context returns a context
ctx := stream.server.Context()
require.NotNil(ctx)
// Validate RecvMsg does not read from the channel messages
streamSize := len(stream.messages)
var msg *getObjectStreamResponse
err = stream.server.RecvMsg(msg)
require.NoError(err)
require.Nil(msg)
require.Equal(streamSize, len(stream.messages))
// Validate Send writes to the channel messages
var wg sync.WaitGroup
go func() {
err = stream.server.Send(&plgpb.GetObjectResponse{})
require.NoError(err)
}()
wg.Add(1)
go func() {
defer wg.Done()
resp, ok := <-stream.messages
require.True(ok)
require.NotNil(resp)
require.NotNil(resp.msg)
require.Nil(resp.err)
}()
wg.Wait()
// Validate Send returns an error when the channel is closed
stream.Close()
require.True(stream.streamClosed)
err = stream.server.Send(&plgpb.GetObjectResponse{})
require.Error(err)
require.Contains(err.Error(), "stream is closed")
// Validate SendMsg returns an error when the channel is closed
err = stream.server.SendMsg(&plgpb.GetObjectResponse{})
require.Error(err)
require.Contains(err.Error(), "stream is closed")
err = stream.server.SendMsg(fmt.Errorf("mock error"))
require.Error(err)
require.Contains(err.Error(), "stream is closed")
// Reset stream
stream = newGetObjectStream()
// Validate SendMsg returns an error for unknown message type
err = stream.server.SendMsg(map[string]string{})
require.False(stream.streamClosed)
require.Error(err)
require.Contains(err.Error(), "invalid argument")
// Validate SendMsg writes to the channel messages for GetObjectResponse type
go func() {
err = stream.server.SendMsg(&plgpb.GetObjectResponse{})
require.NoError(err)
}()
wg.Add(1)
go func() {
defer wg.Done()
resp, ok := <-stream.messages
require.True(ok)
require.NotNil(resp)
require.NotNil(resp.msg)
require.Nil(resp.err)
}()
wg.Wait()
// Validate SendMsg closes the channel messages for error type
go func() {
err = stream.server.SendMsg(fmt.Errorf("mock error"))
require.NoError(err)
}()
wg.Add(1)
go func() {
defer wg.Done()
resp, ok := <-stream.messages
require.True(ok)
require.NotNil(resp)
require.NotNil(resp.err)
require.Nil(resp.msg)
}()
wg.Wait()
require.True(stream.streamClosed)
_, ok := <-stream.messages
require.False(ok)
}
func TestNewGetObjectStream(t *testing.T) {
require := tr.New(t)
stream := newGetObjectStream()
require.NotNil(stream)
require.NotNil(stream.m)
require.NotNil(stream.client)
require.NotNil(stream.server)
require.NotNil(stream.messages)
require.False(stream.streamClosed)
require.False(stream.IsStreamClosed())
// Validate Close() closes the channel messages
stream.Close()
require.True(stream.streamClosed)
require.True(stream.IsStreamClosed())
_, ok := <-stream.messages
require.False(ok)
}
func Test_GetObjectStream(t *testing.T) {
require := tr.New(t)
stream := newGetObjectStream()
// Validate the server receives an error when the client closes the stream
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
err := stream.server.Send(&plgpb.GetObjectResponse{})
if err == nil {
continue
}
require.Error(err)
require.Contains(err.Error(), "stream is closed")
break
}
}()
stream.client.CloseSend()
wg.Wait()
require.True(stream.streamClosed)
require.True(stream.IsStreamClosed())
_, ok := <-stream.messages
require.False(ok)
// Reset Stream
stream = newGetObjectStream()
// Validate the client recieves an error when the server sends an error
// Validate the channel messages is closed
go func() {
err := stream.server.SendMsg(fmt.Errorf("mock error"))
require.NoError(err)
}()
resp, err := stream.client.Recv()
require.Nil(resp)
require.Error(err)
require.Contains(err.Error(), "mock error")
require.True(stream.streamClosed)
require.True(stream.IsStreamClosed())
_, ok = <-stream.messages
require.False(ok)
}