diff --git a/internal/plugin/loopback/options.go b/internal/plugin/loopback/options.go index f2da84e585..aed9ed7e92 100644 --- a/internal/plugin/loopback/options.go +++ b/internal/plugin/loopback/options.go @@ -38,6 +38,10 @@ const ( PutObject ) +const ( + defaultStreamChunckSize = 8 +) + // PluginMockError is used to mock an error when interacting with an external object store. type PluginMockError struct { BucketName string diff --git a/internal/plugin/loopback/storage.go b/internal/plugin/loopback/storage.go index 19a1ff1582..ec48b21d2b 100644 --- a/internal/plugin/loopback/storage.go +++ b/internal/plugin/loopback/storage.go @@ -284,9 +284,21 @@ func (l *LoopbackStorage) getObject(req *plgpb.GetObjectRequest, stream plgpb.St } } go func() { + chunkSize := req.GetChunkSize() + if chunkSize == 0 { + chunkSize = defaultStreamChunckSize + } + data := []byte{} for _, chunk := range object.DataChunks { + data = append(data, chunk...) + } + for i := 0; i < len(data); i += int(chunkSize) { + end := i + int(chunkSize) + if end > len(data) { + end = len(data) + } if err := stream.Send(&plgpb.GetObjectResponse{ - FileChunk: chunk, + FileChunk: append([]byte{}, data[i:end]...), }); err != nil { stream.SendMsg(status.Errorf(codes.Internal, "%s: failed to send object data: %v", op, err)) return diff --git a/internal/plugin/loopback/storage_test.go b/internal/plugin/loopback/storage_test.go index 3b465cd0f8..9366d1795a 100644 --- a/internal/plugin/loopback/storage_test.go +++ b/internal/plugin/loopback/storage_test.go @@ -629,6 +629,18 @@ func TestLoopbackGetObject(t *testing.T) { }, expectedErr: codes.PermissionDenied, }, + { + name: "with chunk size", + request: &plgpb.GetObjectRequest{ + Key: "mock_object", + Bucket: &storagebuckets.StorageBucket{ + BucketName: "aws_s3_mock", + Secrets: secrets, + }, + ChunkSize: 3, + }, + expectedObj: mockStorageMapData["aws_s3_mock"]["mock_object"], + }, { name: "object retrieved", request: &plgpb.GetObjectRequest{ diff --git a/internal/proto/plugin/v1/storage_plugin_service.proto b/internal/proto/plugin/v1/storage_plugin_service.proto index 585aa84666..632b189d9e 100644 --- a/internal/proto/plugin/v1/storage_plugin_service.proto +++ b/internal/proto/plugin/v1/storage_plugin_service.proto @@ -108,6 +108,9 @@ message GetObjectRequest { // Required. The path of the object. string key = 20; + + // Optional. The maximum size of the stream response message. Defaults to 64KiB. + uint32 chunk_size = 30; } message GetObjectResponse { diff --git a/internal/storage/plugin/options.go b/internal/storage/plugin/options.go index 80a791de20..46eaeaa89f 100644 --- a/internal/storage/plugin/options.go +++ b/internal/storage/plugin/options.go @@ -24,7 +24,7 @@ type Option func(*options) // options = how options are represented type options struct { - withChunkSize int + withChunkSize uint32 withName string withDescription string withAttributes *structpb.Struct @@ -47,7 +47,7 @@ func getDefaultOptions() options { // send to the plugin in a single request. If not provided, // the default is 64KiB. The recommended chunk size for // streamed messages is 16KiB to 64KiB. -func WithChunkSize(chunkSize int) Option { +func WithChunkSize(chunkSize uint32) Option { return func(o *options) { o.withChunkSize = chunkSize } diff --git a/sdk/pbs/plugin/storage_plugin_service.pb.go b/sdk/pbs/plugin/storage_plugin_service.pb.go index a22496d0c7..374a16a4b2 100644 --- a/sdk/pbs/plugin/storage_plugin_service.pb.go +++ b/sdk/pbs/plugin/storage_plugin_service.pb.go @@ -543,6 +543,8 @@ type GetObjectRequest struct { Bucket *storagebuckets.StorageBucket `protobuf:"bytes,10,opt,name=bucket,proto3" json:"bucket,omitempty"` // Required. The path of the object. Key string `protobuf:"bytes,20,opt,name=key,proto3" json:"key,omitempty"` + // Optional. The maximum size of the stream response message. Defaults to 64KiB. + ChunkSize uint32 `protobuf:"varint,30,opt,name=chunk_size,json=chunkSize,proto3" json:"chunk_size,omitempty"` } func (x *GetObjectRequest) Reset() { @@ -591,6 +593,13 @@ func (x *GetObjectRequest) GetKey() string { return "" } +func (x *GetObjectRequest) GetChunkSize() uint32 { + if x != nil { + return x.ChunkSize + } + return 0 +} + type GetObjectResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -848,15 +857,17 @@ var file_plugin_v1_storage_plugin_service_proto_rawDesc = []byte{ 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x6d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x18, 0x14, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x52, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x22, 0x77, - 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x51, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x0a, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x2e, - 0x61, 0x70, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x73, 0x74, - 0x6f, 0x72, 0x61, 0x67, 0x65, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, - 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x52, 0x06, 0x62, - 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x14, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x32, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x4f, 0x62, + 0x52, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x4d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x22, 0x96, + 0x01, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x51, 0x0a, 0x06, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x0a, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, + 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x2e, 0x73, + 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, 0x31, + 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x52, 0x06, + 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x14, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x68, 0x75, 0x6e, + 0x6b, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x1e, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x63, 0x68, + 0x75, 0x6e, 0x6b, 0x53, 0x69, 0x7a, 0x65, 0x22, 0x32, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x66, 0x69, 0x6c, 0x65, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x22, 0x8b, 0x01, 0x0a, 0x10,