You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
terraform/internal/rpcapi/grpc_helpers.go

58 lines
1.9 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package rpcapi
import (
"sync"
"google.golang.org/protobuf/proto"
)
// This interface should match the interfaces that grpc-gen-go tends to
// generate for a the server side of RPC function which produces streaming
// results with a particular message type.
type grpcServerStreamingSender[Message proto.Message] interface {
Send(Message) error
}
// syncStreamingRPCSender is a wrapper around a generated gprc.ServerStream
// wrapper that makes Send calls concurrency-safe by holding a mutex throughout
// each call to the underlying Send.
//
// Instantiate this using [newSyncStreamingRPCSender] so you can rely on
// type inference to avoid writing out the type parameters explicitly.
// Consider declaring a type alias with specific Server and Message types if
// you need to name an instantiation of this generic type, so you'll only have
// to write the long-winded instantiation expression once and can use a more
// intuitive name elsewhere.
type syncStreamingRPCSender[
Server grpcServerStreamingSender[Message],
Message proto.Message,
] struct {
wrapped Server
mu sync.Mutex
}
// newSyncStreamingRPCSender wraps an interface value implementing an interface
// generated for the server side of a streaming RPC response and makes its
// Send method concurrency-safe, by holding a mutex throughout the call to
// the underlying Send.
func newSyncStreamingRPCSender[
Server grpcServerStreamingSender[Message],
Message proto.Message,
](wrapped Server) *syncStreamingRPCSender[Server, Message] {
return &syncStreamingRPCSender[Server, Message]{
wrapped: wrapped,
}
}
// Send holds a mutex while calling Send on the wrapped server, and then
// returns its error value.
func (s *syncStreamingRPCSender[Server, Message]) Send(msg Message) error {
s.mu.Lock()
err := s.wrapped.Send(msg)
s.mu.Unlock()
return err
}