test(storage): Avoid panic in test when writing to closed channel

pull/3251/head
Louis Ruch 3 years ago committed by Timothy Messier
parent af8e4d63be
commit 042c46e00e
No known key found for this signature in database
GPG Key ID: EFD2F184F7600572

@ -65,6 +65,7 @@ const (
WorkerNotFoundForRequest = 133 // WorkerNotFoundForRequest represents an error when no appropriate worker is found which meets the conditions required to handle a request
Closed = 134 // Closed represents an error when an operation cannot be completed because the thing being operated on is closed
ChecksumMismatch = 135 // ChecksumMismatch represents an error when a checksum is mismatched
AuthAttemptExpired Code = 198 // AuthAttemptExpired represents an expired authentication attempt
AuthMethodInactive Code = 199 // AuthMethodInactive represents an error that means the auth method is not active.
@ -130,7 +131,8 @@ const (
MigrationLock Code = 2001 // MigrationLock represents an error related to locking of the DB
// External system errors are reserved codes 3000-3999
Unavailable Code = 3000 // Unavailable represents that an external system is unavailable
Unavailable Code = 3000 // Unavailable represents that an external system is unavailable
ExternalPlugin Code = 3001 // ExternalPlugin represent an error that occurred on a plugin external to Boundary
// Vault specific errors
VaultTokenNotOrphan Code = 3010 // VaultTokenNotOrphan represents an error for a Vault token that is not an orphan token

@ -400,6 +400,16 @@ func TestCode_Both_String_Info(t *testing.T) {
c: Closed,
want: Closed,
},
{
name: "ExternalPlugin",
c: ExternalPlugin,
want: ExternalPlugin,
},
{
name: "ChecksumMismatch",
c: ChecksumMismatch,
want: ChecksumMismatch,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

@ -335,4 +335,12 @@ var errorCodeInfo = map[Code]Info{
Message: "closed",
Kind: State,
},
ExternalPlugin: {
Message: "plugin error",
Kind: External,
},
ChecksumMismatch: {
Message: "checksum mismatch",
Kind: Integrity,
},
}

@ -30,6 +30,9 @@ type getObjectStream struct {
m *sync.Mutex
streamClosed bool
ctx context.Context
cancelCtx context.CancelFunc
}
// IsStreamClosed returns true if the stream is closed.
@ -47,6 +50,8 @@ func (s *getObjectStream) Close() {
if s.streamClosed {
return
}
// Cancel ctx to notify writers chan is closing
s.cancelCtx()
close(s.messages)
s.streamClosed = true
}
@ -125,6 +130,8 @@ func (c *getObjectClient) RecvMsg(m interface{}) error {
// getObjectServer is used to mock the server stream
// interactions for the GetObject method.
type getObjectServer struct {
ctx context.Context
// sendToClient is used to mock the server sending messages to the client.
sendToClient chan *getObjectStreamResponse
@ -148,7 +155,12 @@ func (s *getObjectServer) Send(resp *plgpb.GetObjectResponse) error {
if s.isStreamClosed() {
return fmt.Errorf("stream is closed")
}
s.sendToClient <- &getObjectStreamResponse{msg: resp}
select {
case s.sendToClient <- &getObjectStreamResponse{msg: resp}:
case <-s.ctx.Done():
return fmt.Errorf("stream is closed")
}
return nil
}
@ -189,13 +201,21 @@ func (s *getObjectServer) SendMsg(m interface{}) error {
if s.isStreamClosed() {
return fmt.Errorf("stream is closed")
}
s.sendToClient <- &getObjectStreamResponse{msg: msg}
select {
case s.sendToClient <- &getObjectStreamResponse{msg: msg}:
case <-s.ctx.Done():
return fmt.Errorf("stream is closed")
}
case error:
if s.isStreamClosed() {
return fmt.Errorf("stream is closed")
}
defer s.closeStream()
s.sendToClient <- &getObjectStreamResponse{err: msg}
select {
case s.sendToClient <- &getObjectStreamResponse{err: msg}:
case <-s.ctx.Done():
return fmt.Errorf("stream is closed")
}
default:
return fmt.Errorf("invalid argument %v", m)
}
@ -213,9 +233,12 @@ func (s *getObjectServer) RecvMsg(m interface{}) error {
// The client and server stream is mocked by creating a GetObjectResponse
// channel and an error channel that is shared between the client and server.
func newGetObjectStream() *getObjectStream {
ctx, cnl := context.WithCancel(context.Background())
stream := &getObjectStream{
m: new(sync.Mutex),
messages: make(chan *getObjectStreamResponse),
ctx: ctx,
cancelCtx: cnl,
m: new(sync.Mutex),
messages: make(chan *getObjectStreamResponse),
}
stream.client = &getObjectClient{
sentFromServer: stream.messages,
@ -223,6 +246,7 @@ func newGetObjectStream() *getObjectStream {
isStreamClosed: stream.IsStreamClosed,
}
stream.server = &getObjectServer{
ctx: ctx,
sendToClient: stream.messages,
closeStream: stream.Close,
isStreamClosed: stream.IsStreamClosed,

Loading…
Cancel
Save