From 042c46e00e41a8fe2ed50e697a7bb175de8f2067 Mon Sep 17 00:00:00 2001 From: Louis Ruch Date: Thu, 11 May 2023 21:38:26 -0700 Subject: [PATCH] test(storage): Avoid panic in test when writing to closed channel --- internal/errors/code.go | 4 ++- internal/errors/code_test.go | 10 ++++++ internal/errors/info.go | 8 +++++ .../plugin/loopback/testing_grpc_stream.go | 34 ++++++++++++++++--- 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/internal/errors/code.go b/internal/errors/code.go index 7e2185f268..e9d30b7e3f 100644 --- a/internal/errors/code.go +++ b/internal/errors/code.go @@ -65,6 +65,7 @@ const ( WorkerNotFoundForRequest = 133 // WorkerNotFoundForRequest represents an error when no appropriate worker is found which meets the conditions required to handle a request Closed = 134 // Closed represents an error when an operation cannot be completed because the thing being operated on is closed + ChecksumMismatch = 135 // ChecksumMismatch represents an error when a checksum is mismatched AuthAttemptExpired Code = 198 // AuthAttemptExpired represents an expired authentication attempt AuthMethodInactive Code = 199 // AuthMethodInactive represents an error that means the auth method is not active. @@ -130,7 +131,8 @@ const ( MigrationLock Code = 2001 // MigrationLock represents an error related to locking of the DB // External system errors are reserved codes 3000-3999 - Unavailable Code = 3000 // Unavailable represents that an external system is unavailable + Unavailable Code = 3000 // Unavailable represents that an external system is unavailable + ExternalPlugin Code = 3001 // ExternalPlugin represent an error that occurred on a plugin external to Boundary // Vault specific errors VaultTokenNotOrphan Code = 3010 // VaultTokenNotOrphan represents an error for a Vault token that is not an orphan token diff --git a/internal/errors/code_test.go b/internal/errors/code_test.go index ab026223f5..614799625d 100644 --- a/internal/errors/code_test.go +++ b/internal/errors/code_test.go @@ -400,6 +400,16 @@ func TestCode_Both_String_Info(t *testing.T) { c: Closed, want: Closed, }, + { + name: "ExternalPlugin", + c: ExternalPlugin, + want: ExternalPlugin, + }, + { + name: "ChecksumMismatch", + c: ChecksumMismatch, + want: ChecksumMismatch, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/errors/info.go b/internal/errors/info.go index 652f05af33..1e9784bd9d 100644 --- a/internal/errors/info.go +++ b/internal/errors/info.go @@ -335,4 +335,12 @@ var errorCodeInfo = map[Code]Info{ Message: "closed", Kind: State, }, + ExternalPlugin: { + Message: "plugin error", + Kind: External, + }, + ChecksumMismatch: { + Message: "checksum mismatch", + Kind: Integrity, + }, } diff --git a/internal/plugin/loopback/testing_grpc_stream.go b/internal/plugin/loopback/testing_grpc_stream.go index 21d37b539a..d7c8351932 100644 --- a/internal/plugin/loopback/testing_grpc_stream.go +++ b/internal/plugin/loopback/testing_grpc_stream.go @@ -30,6 +30,9 @@ type getObjectStream struct { m *sync.Mutex streamClosed bool + + ctx context.Context + cancelCtx context.CancelFunc } // IsStreamClosed returns true if the stream is closed. @@ -47,6 +50,8 @@ func (s *getObjectStream) Close() { if s.streamClosed { return } + // Cancel ctx to notify writers chan is closing + s.cancelCtx() close(s.messages) s.streamClosed = true } @@ -125,6 +130,8 @@ func (c *getObjectClient) RecvMsg(m interface{}) error { // getObjectServer is used to mock the server stream // interactions for the GetObject method. type getObjectServer struct { + ctx context.Context + // sendToClient is used to mock the server sending messages to the client. sendToClient chan *getObjectStreamResponse @@ -148,7 +155,12 @@ func (s *getObjectServer) Send(resp *plgpb.GetObjectResponse) error { if s.isStreamClosed() { return fmt.Errorf("stream is closed") } - s.sendToClient <- &getObjectStreamResponse{msg: resp} + + select { + case s.sendToClient <- &getObjectStreamResponse{msg: resp}: + case <-s.ctx.Done(): + return fmt.Errorf("stream is closed") + } return nil } @@ -189,13 +201,21 @@ func (s *getObjectServer) SendMsg(m interface{}) error { if s.isStreamClosed() { return fmt.Errorf("stream is closed") } - s.sendToClient <- &getObjectStreamResponse{msg: msg} + select { + case s.sendToClient <- &getObjectStreamResponse{msg: msg}: + case <-s.ctx.Done(): + return fmt.Errorf("stream is closed") + } case error: if s.isStreamClosed() { return fmt.Errorf("stream is closed") } defer s.closeStream() - s.sendToClient <- &getObjectStreamResponse{err: msg} + select { + case s.sendToClient <- &getObjectStreamResponse{err: msg}: + case <-s.ctx.Done(): + return fmt.Errorf("stream is closed") + } default: return fmt.Errorf("invalid argument %v", m) } @@ -213,9 +233,12 @@ func (s *getObjectServer) RecvMsg(m interface{}) error { // The client and server stream is mocked by creating a GetObjectResponse // channel and an error channel that is shared between the client and server. func newGetObjectStream() *getObjectStream { + ctx, cnl := context.WithCancel(context.Background()) stream := &getObjectStream{ - m: new(sync.Mutex), - messages: make(chan *getObjectStreamResponse), + ctx: ctx, + cancelCtx: cnl, + m: new(sync.Mutex), + messages: make(chan *getObjectStreamResponse), } stream.client = &getObjectClient{ sentFromServer: stream.messages, @@ -223,6 +246,7 @@ func newGetObjectStream() *getObjectStream { isStreamClosed: stream.IsStreamClosed, } stream.server = &getObjectServer{ + ctx: ctx, sendToClient: stream.messages, closeStream: stream.Close, isStreamClosed: stream.IsStreamClosed,