mirror of https://github.com/hashicorp/boundary
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
829 lines
29 KiB
829 lines
29 KiB
// Copyright IBM Corp. 2020, 2026
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package loopback
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/boundary/sdk/pbs/controller/api/resources/storagebuckets"
|
|
plgpb "github.com/hashicorp/boundary/sdk/pbs/plugin"
|
|
"github.com/hashicorp/go-secure-stdlib/parseutil"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/structpb"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
var _ plgpb.StoragePluginServiceServer = (*TestPluginStorageServer)(nil)
|
|
|
|
// TestPluginStorageServer provides a storage plugin service server where each method can be overwritten for tests.
|
|
type TestPluginStorageServer struct {
|
|
NormalizeStorageBucketDataFn func(context.Context, *plgpb.NormalizeStorageBucketDataRequest) (*plgpb.NormalizeStorageBucketDataResponse, error)
|
|
OnCreateStorageBucketFn func(context.Context, *plgpb.OnCreateStorageBucketRequest) (*plgpb.OnCreateStorageBucketResponse, error)
|
|
OnUpdateStorageBucketFn func(context.Context, *plgpb.OnUpdateStorageBucketRequest) (*plgpb.OnUpdateStorageBucketResponse, error)
|
|
OnDeleteStorageBucketFn func(context.Context, *plgpb.OnDeleteStorageBucketRequest) (*plgpb.OnDeleteStorageBucketResponse, error)
|
|
ValidatePermissionsFn func(context.Context, *plgpb.ValidatePermissionsRequest) (*plgpb.ValidatePermissionsResponse, error)
|
|
HeadObjectFn func(context.Context, *plgpb.HeadObjectRequest) (*plgpb.HeadObjectResponse, error)
|
|
GetObjectFn func(*plgpb.GetObjectRequest, plgpb.StoragePluginService_GetObjectServer) error
|
|
PutObjectFn func(context.Context, *plgpb.PutObjectRequest) (*plgpb.PutObjectResponse, error)
|
|
DeleteObjectsFn func(context.Context, *plgpb.DeleteObjectsRequest) (*plgpb.DeleteObjectsResponse, error)
|
|
ListObjectsFn func(context.Context, *plgpb.ListObjectsRequest) (*plgpb.ListObjectsResponse, error)
|
|
plgpb.UnimplementedStoragePluginServiceServer
|
|
}
|
|
|
|
func (t TestPluginStorageServer) NormalizeStorageBucketData(ctx context.Context, req *plgpb.NormalizeStorageBucketDataRequest) (*plgpb.NormalizeStorageBucketDataResponse, error) {
|
|
if t.NormalizeStorageBucketDataFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.NormalizeStorageBucketData(ctx, req)
|
|
}
|
|
return t.NormalizeStorageBucketDataFn(ctx, req)
|
|
}
|
|
|
|
func (t TestPluginStorageServer) OnCreateStorageBucket(ctx context.Context, req *plgpb.OnCreateStorageBucketRequest) (*plgpb.OnCreateStorageBucketResponse, error) {
|
|
if t.OnCreateStorageBucketFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.OnCreateStorageBucket(ctx, req)
|
|
}
|
|
return t.OnCreateStorageBucketFn(ctx, req)
|
|
}
|
|
|
|
func (t TestPluginStorageServer) OnUpdateStorageBucket(ctx context.Context, req *plgpb.OnUpdateStorageBucketRequest) (*plgpb.OnUpdateStorageBucketResponse, error) {
|
|
if t.OnUpdateStorageBucketFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.OnUpdateStorageBucket(ctx, req)
|
|
}
|
|
return t.OnUpdateStorageBucketFn(ctx, req)
|
|
}
|
|
|
|
func (t TestPluginStorageServer) OnDeleteStorageBucket(ctx context.Context, req *plgpb.OnDeleteStorageBucketRequest) (*plgpb.OnDeleteStorageBucketResponse, error) {
|
|
if t.OnDeleteStorageBucketFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.OnDeleteStorageBucket(ctx, req)
|
|
}
|
|
return t.OnDeleteStorageBucketFn(ctx, req)
|
|
}
|
|
|
|
func (t TestPluginStorageServer) ValidatePermissions(ctx context.Context, req *plgpb.ValidatePermissionsRequest) (*plgpb.ValidatePermissionsResponse, error) {
|
|
if t.ValidatePermissionsFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.ValidatePermissions(ctx, req)
|
|
}
|
|
return t.ValidatePermissionsFn(ctx, req)
|
|
}
|
|
|
|
func (t TestPluginStorageServer) HeadObject(ctx context.Context, req *plgpb.HeadObjectRequest) (*plgpb.HeadObjectResponse, error) {
|
|
if t.HeadObjectFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.HeadObject(ctx, req)
|
|
}
|
|
return t.HeadObjectFn(ctx, req)
|
|
}
|
|
|
|
func (t TestPluginStorageServer) GetObject(req *plgpb.GetObjectRequest, stream plgpb.StoragePluginService_GetObjectServer) error {
|
|
if t.GetObjectFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.GetObject(req, stream)
|
|
}
|
|
return t.GetObjectFn(req, stream)
|
|
}
|
|
|
|
func (t TestPluginStorageServer) PutObject(ctx context.Context, req *plgpb.PutObjectRequest) (*plgpb.PutObjectResponse, error) {
|
|
if t.PutObjectFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.PutObject(ctx, req)
|
|
}
|
|
return t.PutObjectFn(ctx, req)
|
|
}
|
|
|
|
func (t TestPluginStorageServer) DeleteObjects(ctx context.Context, req *plgpb.DeleteObjectsRequest) (*plgpb.DeleteObjectsResponse, error) {
|
|
if t.DeleteObjectsFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.DeleteObjects(ctx, req)
|
|
}
|
|
return t.DeleteObjectsFn(ctx, req)
|
|
}
|
|
|
|
func (t TestPluginStorageServer) ListObjects(ctx context.Context, req *plgpb.ListObjectsRequest) (*plgpb.ListObjectsResponse, error) {
|
|
if t.ListObjectsFn == nil {
|
|
return t.UnimplementedStoragePluginServiceServer.ListObjects(ctx, req)
|
|
}
|
|
return t.ListObjectsFn(ctx, req)
|
|
}
|
|
|
|
type (
|
|
BucketName string
|
|
ObjectName string
|
|
Chunk []byte
|
|
)
|
|
|
|
type Bucket map[ObjectName]*storagePluginStorageInfo
|
|
|
|
// storagePluginStorageInfo is an in-memory representation of an object
|
|
// that would be normally stored in an external object store.
|
|
type storagePluginStorageInfo struct {
|
|
DataChunks []Chunk `mapstructure:"dataChunks"`
|
|
|
|
lastModified *time.Time `mapstructure:"lastModified"`
|
|
contentLength *int64 `mapstructure:"contentLength"`
|
|
}
|
|
|
|
// LoopbackStorage provides a storage plugin with functionality useful for certain
|
|
// kinds of testing.
|
|
//
|
|
// It is thread-safe.
|
|
type LoopbackStorage struct {
|
|
m sync.Mutex
|
|
|
|
chunksSize int
|
|
buckets map[BucketName]Bucket
|
|
errs []PluginMockError
|
|
putObjectResponse []PluginMockPutObjectResponse
|
|
normalizations int
|
|
}
|
|
|
|
func (l *LoopbackStorage) normalizeStorageBucketData(ctx context.Context, req *plgpb.NormalizeStorageBucketDataRequest) (*plgpb.NormalizeStorageBucketDataResponse, error) {
|
|
const op = "loopback.(LoopbackStorage).normalizeStorageBucketData"
|
|
if req == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetAttributes() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing attributes", op)
|
|
}
|
|
attrs := req.GetAttributes()
|
|
if endpoint, ok := attrs.GetFields()["endpoint_url"]; ok {
|
|
if endpoint, err := parseutil.NormalizeAddr(endpoint.GetStringValue()); err == nil {
|
|
attrs.Fields["endpoint_url"] = structpb.NewStringValue(endpoint)
|
|
}
|
|
}
|
|
l.normalizations++
|
|
return &plgpb.NormalizeStorageBucketDataResponse{
|
|
Attributes: attrs,
|
|
}, nil
|
|
}
|
|
|
|
// ResetNormalizations sets the number of times that NormalizeStorageBucketData
|
|
// has been called to 0. Useful for unit tests.
|
|
func (l *LoopbackStorage) ResetNormalizations() {
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
l.normalizations = 0
|
|
}
|
|
|
|
// ResetMockErrors removes all mock errors.
|
|
func (l *LoopbackStorage) ResetMockErrors() {
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
l.errs = []PluginMockError{}
|
|
}
|
|
|
|
// SetMockErrors overrides the existing mock errors.
|
|
func (l *LoopbackStorage) SetMockErrors(mockErrs ...PluginMockError) {
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
l.errs = mockErrs
|
|
}
|
|
|
|
// GetNormalizations returns the number of times that NormalizeStorageBucketData
|
|
// has been called via the loopback plugin. Useful for unit tests.
|
|
func (l *LoopbackStorage) GetNormalizations() int {
|
|
return l.normalizations
|
|
}
|
|
|
|
func (l *LoopbackStorage) onCreateStorageBucket(ctx context.Context, req *plgpb.OnCreateStorageBucketRequest) (*plgpb.OnCreateStorageBucketResponse, error) {
|
|
const op = "loopback.(LoopbackStorage).onCreateStorageBucket"
|
|
if req == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetBucket() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing storage bucket", op)
|
|
}
|
|
if req.GetBucket().GetAttributes() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing attributes", op)
|
|
}
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
if _, ok := l.buckets[BucketName(req.GetBucket().GetBucketName())]; !ok {
|
|
return nil, status.Errorf(codes.NotFound, "%s: bucket not found", op)
|
|
}
|
|
for _, err := range l.errs {
|
|
if err.match(req.GetBucket(), "", OnCreateStorageBucket) {
|
|
return nil, status.Error(err.ErrCode, err.ErrMsg)
|
|
}
|
|
}
|
|
secrets := &structpb.Struct{
|
|
Fields: make(map[string]*structpb.Value),
|
|
}
|
|
var hasDynamicCreds bool
|
|
attrs := req.GetBucket().GetAttributes()
|
|
if attrs != nil {
|
|
_, hasDynamicCreds = attrs.Fields[ConstDynamicCredentials]
|
|
}
|
|
var hasStaticCreds bool
|
|
if req.GetBucket().GetSecrets() != nil && len(req.GetBucket().GetSecrets().AsMap()) > 0 {
|
|
hasStaticCreds = true
|
|
secrets = req.GetBucket().GetSecrets()
|
|
}
|
|
if hasDynamicCreds && hasStaticCreds {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: cannot use both dynamic and static credentials", op)
|
|
}
|
|
return &plgpb.OnCreateStorageBucketResponse{
|
|
Persisted: &storagebuckets.StorageBucketPersisted{
|
|
Data: secrets,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (l *LoopbackStorage) onUpdateStorageBucket(ctx context.Context, req *plgpb.OnUpdateStorageBucketRequest) (*plgpb.OnUpdateStorageBucketResponse, error) {
|
|
const op = "loopback.(LoopbackStorage).onUpdateStorageBucket"
|
|
if req == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetNewBucket() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing storage bucket", op)
|
|
}
|
|
if req.GetNewBucket().GetAttributes() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing attributes", op)
|
|
}
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
if _, ok := l.buckets[BucketName(req.GetNewBucket().GetBucketName())]; !ok {
|
|
return nil, status.Errorf(codes.NotFound, "%s: bucket not found", op)
|
|
}
|
|
for _, err := range l.errs {
|
|
if err.match(req.GetNewBucket(), "", OnUpdateStorageBucket) {
|
|
return nil, status.Errorf(err.ErrCode, "%s: %s", op, err.ErrMsg)
|
|
}
|
|
}
|
|
secrets := &structpb.Struct{
|
|
Fields: make(map[string]*structpb.Value),
|
|
}
|
|
var hasDynamicCreds bool
|
|
attrs := req.GetNewBucket().GetAttributes()
|
|
if attrs != nil {
|
|
_, hasDynamicCreds = attrs.Fields[ConstDynamicCredentials]
|
|
}
|
|
var hasStaticCreds bool
|
|
if req.GetNewBucket().GetSecrets() != nil && len(req.GetNewBucket().GetSecrets().AsMap()) > 0 {
|
|
hasStaticCreds = true
|
|
secrets = req.GetNewBucket().GetSecrets()
|
|
}
|
|
if hasDynamicCreds && hasStaticCreds {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: cannot use both dynamic and static credentials", op)
|
|
}
|
|
if !hasDynamicCreds && len(secrets.AsMap()) == 0 {
|
|
if req.GetPersisted() != nil && req.GetPersisted().GetData() != nil && len(req.GetPersisted().GetData().AsMap()) > 0 {
|
|
secrets = req.GetPersisted().GetData()
|
|
}
|
|
}
|
|
return &plgpb.OnUpdateStorageBucketResponse{
|
|
Persisted: &storagebuckets.StorageBucketPersisted{
|
|
Data: secrets,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (l *LoopbackStorage) onDeleteStorageBucket(ctx context.Context, req *plgpb.OnDeleteStorageBucketRequest) (*plgpb.OnDeleteStorageBucketResponse, error) {
|
|
const op = "loopback.(LoopbackStorage).onDeleteStorageBucket"
|
|
if req == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetBucket() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing storage bucket", op)
|
|
}
|
|
if req.GetBucket().GetAttributes() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing attributes", op)
|
|
}
|
|
return &plgpb.OnDeleteStorageBucketResponse{}, nil
|
|
}
|
|
|
|
func (l *LoopbackStorage) validatePermissions(ctx context.Context, req *plgpb.ValidatePermissionsRequest) (*plgpb.ValidatePermissionsResponse, error) {
|
|
const op = "loopback.(LoopbackStorage).validatePermissions"
|
|
if req == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetBucket() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing storage bucket", op)
|
|
}
|
|
if req.GetBucket().GetAttributes() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing attributes", op)
|
|
}
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
if _, ok := l.buckets[BucketName(req.GetBucket().GetBucketName())]; !ok {
|
|
return nil, status.Errorf(codes.NotFound, "%s: bucket not found", op)
|
|
}
|
|
|
|
for _, err := range l.errs {
|
|
if err.match(req.GetBucket(), "", ValidatePermissions) {
|
|
if err.StorageBucketCredentialState != nil {
|
|
return nil, createErrorWithBucketCredentialState(err.ErrCode, fmt.Sprintf("%s: %s", op, err.ErrMsg), err.StorageBucketCredentialState)
|
|
}
|
|
return nil, status.Errorf(err.ErrCode, "%s: %s", op, err.ErrMsg)
|
|
}
|
|
}
|
|
return &plgpb.ValidatePermissionsResponse{}, nil
|
|
}
|
|
|
|
func (l *LoopbackStorage) headObject(ctx context.Context, req *plgpb.HeadObjectRequest) (*plgpb.HeadObjectResponse, error) {
|
|
const op = "loopback.(LoopbackStorage).headObject"
|
|
if req == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetBucket() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "$%s: missing storage bucket", op)
|
|
}
|
|
if req.GetBucket().GetAttributes() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing attributes", op)
|
|
}
|
|
if req.GetKey() == "" {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s; missing object key", op)
|
|
}
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
bucket, ok := l.buckets[BucketName(req.GetBucket().GetBucketName())]
|
|
if !ok {
|
|
return nil, status.Errorf(codes.NotFound, "%s: bucket not found", op)
|
|
}
|
|
objectPath := ObjectName(req.GetBucket().GetBucketPrefix() + req.GetKey())
|
|
object, ok := bucket[objectPath]
|
|
if !ok {
|
|
return nil, status.Errorf(codes.NotFound, "%s: object %s not found", op, objectPath)
|
|
}
|
|
for _, err := range l.errs {
|
|
if err.match(req.GetBucket(), req.GetKey(), HeadObject) {
|
|
if err.StorageBucketCredentialState != nil {
|
|
return nil, createErrorWithBucketCredentialState(err.ErrCode, fmt.Sprintf("%s: %s", op, err.ErrMsg), err.StorageBucketCredentialState)
|
|
}
|
|
return nil, status.Errorf(err.ErrCode, "%s: %s", op, err.ErrMsg)
|
|
}
|
|
}
|
|
var contentLength int64
|
|
if object.contentLength != nil {
|
|
contentLength = *object.contentLength
|
|
}
|
|
return &plgpb.HeadObjectResponse{
|
|
ContentLength: contentLength,
|
|
LastModified: timestamppb.New(*object.lastModified),
|
|
}, nil
|
|
}
|
|
|
|
func (l *LoopbackStorage) getObject(req *plgpb.GetObjectRequest, stream plgpb.StoragePluginService_GetObjectServer) error {
|
|
const op = "loopback.(LoopbackStorage).getObject"
|
|
if req == nil {
|
|
return status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetBucket() == nil {
|
|
return status.Errorf(codes.InvalidArgument, "%s: missing storage bucket", op)
|
|
}
|
|
if req.GetBucket().GetAttributes() == nil {
|
|
return status.Errorf(codes.InvalidArgument, "%s: missing attributes", op)
|
|
}
|
|
if req.GetKey() == "" {
|
|
return status.Errorf(codes.InvalidArgument, "%s; missing object key", op)
|
|
}
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
bucket, ok := l.buckets[BucketName(req.GetBucket().GetBucketName())]
|
|
if !ok {
|
|
return status.Errorf(codes.NotFound, "%s: bucket not found", op)
|
|
}
|
|
objectPath := ObjectName(req.GetBucket().GetBucketPrefix() + req.GetKey())
|
|
object, ok := bucket[objectPath]
|
|
if !ok {
|
|
return status.Errorf(codes.NotFound, "%s: object %s not found", op, objectPath)
|
|
}
|
|
for _, err := range l.errs {
|
|
if err.match(req.GetBucket(), req.GetKey(), GetObject) {
|
|
if err.StorageBucketCredentialState != nil {
|
|
return createErrorWithBucketCredentialState(err.ErrCode, fmt.Sprintf("%s: %s", op, err.ErrMsg), err.StorageBucketCredentialState)
|
|
}
|
|
return status.Errorf(err.ErrCode, "%s: %s", op, err.ErrMsg)
|
|
}
|
|
}
|
|
go func() {
|
|
chunkSize := req.GetChunkSize()
|
|
if chunkSize == 0 {
|
|
chunkSize = defaultStreamChunkSize
|
|
}
|
|
data := []byte{}
|
|
for _, chunk := range object.DataChunks {
|
|
data = append(data, chunk...)
|
|
}
|
|
for i := 0; i < len(data); i += int(chunkSize) {
|
|
end := i + int(chunkSize)
|
|
if end > len(data) {
|
|
end = len(data)
|
|
}
|
|
if err := stream.Send(&plgpb.GetObjectResponse{
|
|
FileChunk: append([]byte{}, data[i:end]...),
|
|
}); err != nil {
|
|
stream.SendMsg(status.Errorf(codes.Internal, "%s: failed to send object data: %v", op, err))
|
|
return
|
|
}
|
|
}
|
|
stream.SendMsg(io.EOF)
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (l *LoopbackStorage) putObject(ctx context.Context, req *plgpb.PutObjectRequest) (*plgpb.PutObjectResponse, error) {
|
|
const op = "loopback.(LoopbackStorage).putObject"
|
|
if req == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetBucket() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing storage bucket", op)
|
|
}
|
|
if req.GetBucket().GetAttributes() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing attributes", op)
|
|
}
|
|
if req.GetBucket().GetBucketName() == "" {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing bucket name", op)
|
|
}
|
|
if req.GetKey() == "" {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s; missing object key", op)
|
|
}
|
|
if req.GetPath() == "" {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s; missing path", op)
|
|
}
|
|
info, err := os.Stat(req.Path)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: file check failed: %v", op, err)
|
|
}
|
|
if info == nil {
|
|
return nil, status.Errorf(codes.Internal, "%s: failed to get file info", op)
|
|
}
|
|
if info.IsDir() {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: path is a directory", op)
|
|
}
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
bucket, ok := l.buckets[BucketName(req.GetBucket().GetBucketName())]
|
|
if !ok {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: bucket not found", op)
|
|
}
|
|
// return an expected mock error if one was provided
|
|
for _, err := range l.errs {
|
|
if err.match(req.GetBucket(), req.GetKey(), PutObject) {
|
|
if err.StorageBucketCredentialState != nil {
|
|
return nil, createErrorWithBucketCredentialState(err.ErrCode, fmt.Sprintf("%s: %s", op, err.ErrMsg), err.StorageBucketCredentialState)
|
|
}
|
|
return nil, status.Errorf(err.ErrCode, "%s: %s", op, err.ErrMsg)
|
|
}
|
|
}
|
|
// return an expected mock response if one was provided
|
|
for _, mock := range l.putObjectResponse {
|
|
if mock.match(req.GetBucket(), req.GetKey()) {
|
|
return mock.Response, nil
|
|
}
|
|
}
|
|
|
|
lastModified := time.Now()
|
|
objectData, err := os.ReadFile(req.Path)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "%s: failed to read file", op)
|
|
}
|
|
if len(objectData) == 0 {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing object data", op)
|
|
}
|
|
contentLength := int64(len(objectData))
|
|
|
|
objectChunks := []Chunk{}
|
|
for i := 0; i < len(objectData); i = i + l.chunksSize {
|
|
j := i + l.chunksSize
|
|
if j > len(objectData) {
|
|
j = len(objectData)
|
|
}
|
|
objectChunks = append(objectChunks, copyBytes(objectData[i:j]))
|
|
}
|
|
|
|
// Now insert the object
|
|
objectPath := ObjectName(path.Join(req.GetBucket().GetBucketPrefix(), req.GetKey()))
|
|
bucket[objectPath] = &storagePluginStorageInfo{
|
|
DataChunks: objectChunks,
|
|
contentLength: &contentLength,
|
|
lastModified: &lastModified,
|
|
}
|
|
|
|
hash := sha256.New()
|
|
if _, err := io.Copy(hash, bytes.NewReader(objectData)); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "%s: failed to hash object: %v", op, err)
|
|
}
|
|
|
|
return &plgpb.PutObjectResponse{
|
|
ChecksumSha_256: hash.Sum(nil),
|
|
}, nil
|
|
}
|
|
|
|
func (l *LoopbackStorage) deleteObjects(ctx context.Context, req *plgpb.DeleteObjectsRequest) (*plgpb.DeleteObjectsResponse, error) {
|
|
const op = "loopback.(LoopbackStorage).deleteObjects"
|
|
if req == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetBucket() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing storage bucket", op)
|
|
}
|
|
if req.GetBucket().GetAttributes() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing attributes", op)
|
|
}
|
|
if req.GetKeyPrefix() == "" {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s; missing key prefix", op)
|
|
}
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
bucket, ok := l.buckets[BucketName(req.GetBucket().GetBucketName())]
|
|
if !ok {
|
|
return nil, status.Errorf(codes.NotFound, "%s: bucket not found", op)
|
|
}
|
|
for _, err := range l.errs {
|
|
if err.match(req.GetBucket(), req.GetKeyPrefix(), DeleteObjects) {
|
|
return nil, status.Errorf(err.ErrCode, "%s: %s", op, err.ErrMsg)
|
|
}
|
|
}
|
|
prefix := ObjectName(req.GetBucket().GetBucketPrefix() + req.GetKeyPrefix())
|
|
var deleted uint32 = 0
|
|
if req.GetRecursive() {
|
|
for key := range bucket {
|
|
if strings.HasPrefix(string(key), string(prefix)) {
|
|
delete(bucket, key)
|
|
deleted++
|
|
}
|
|
}
|
|
} else {
|
|
_, ok := bucket[prefix]
|
|
if ok {
|
|
delete(bucket, prefix)
|
|
}
|
|
deleted++ // this is outside the if statement because aws always returns a success
|
|
}
|
|
return &plgpb.DeleteObjectsResponse{
|
|
ObjectsDeleted: deleted,
|
|
}, nil
|
|
}
|
|
|
|
func (l *LoopbackStorage) listObjects(ctx context.Context, req *plgpb.ListObjectsRequest) (*plgpb.ListObjectsResponse, error) {
|
|
const op = "loopback.(LoopbackStorage).listObjects"
|
|
|
|
if req == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: request is nil", op)
|
|
}
|
|
if req.GetBucket() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing storage bucket", op)
|
|
}
|
|
if req.GetBucket().GetAttributes() == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "%s: missing bucket attributes", op)
|
|
}
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
bucket, ok := l.buckets[BucketName(req.GetBucket().GetBucketName())]
|
|
if !ok {
|
|
return nil, status.Errorf(codes.NotFound, "%s: bucket not found", op)
|
|
}
|
|
|
|
// return an expected mock error if one was provided
|
|
for _, err := range l.errs {
|
|
if err.match(req.GetBucket(), req.GetKeyPrefix(), ListObjects) {
|
|
if err.StorageBucketCredentialState != nil {
|
|
return nil, createErrorWithBucketCredentialState(err.ErrCode, fmt.Sprintf("%s: %s", op, err.ErrMsg), err.StorageBucketCredentialState)
|
|
}
|
|
return nil, status.Errorf(err.ErrCode, "%s: %s", op, err.ErrMsg)
|
|
}
|
|
}
|
|
|
|
searchPrefix := path.Join(req.GetBucket().GetBucketPrefix(), req.GetKeyPrefix())
|
|
// path.Join strips trailing slashes, but we need to preserve them for directory prefixes
|
|
// to ensure proper filtering behavior in listObjects functions
|
|
if strings.HasSuffix(req.GetKeyPrefix(), "/") || (req.GetKeyPrefix() == "" && strings.HasSuffix(req.GetBucket().GetBucketPrefix(), "/")) {
|
|
searchPrefix = searchPrefix + "/"
|
|
}
|
|
|
|
var objects []*plgpb.Object
|
|
if req.GetRecursive() {
|
|
objects = listObjectsRecursive(bucket, searchPrefix)
|
|
} else {
|
|
objects = listObjectsNonRecursive(bucket, searchPrefix)
|
|
}
|
|
return &plgpb.ListObjectsResponse{
|
|
Objects: objects,
|
|
}, nil
|
|
}
|
|
|
|
// CloneBucket returns a clone of the bucket.
|
|
// returns nil when the bucket is not found.
|
|
func (l *LoopbackStorage) CloneBucket(name string) Bucket {
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
if _, ok := l.buckets[BucketName(name)]; !ok {
|
|
return nil
|
|
}
|
|
bucket := Bucket{}
|
|
for objName, obj := range l.buckets[BucketName(name)] {
|
|
if obj != nil {
|
|
bucket[objName] = copyStorageInfo(obj)
|
|
}
|
|
}
|
|
return bucket
|
|
}
|
|
|
|
// CloneStorageInfo returns a clone of the object stored in memory.
|
|
// Returns nil when the bucket or object is not found.
|
|
func (l *LoopbackStorage) CloneStorageInfo(bucketName, objectName string) *storagePluginStorageInfo {
|
|
l.m.Lock()
|
|
defer l.m.Unlock()
|
|
bucket, ok := l.buckets[BucketName(bucketName)]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
obj, ok := bucket[ObjectName(objectName)]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return copyStorageInfo(obj)
|
|
}
|
|
|
|
func copyStorageInfo(obj *storagePluginStorageInfo) *storagePluginStorageInfo {
|
|
chunks := make([]Chunk, len(obj.DataChunks))
|
|
for i, c := range obj.DataChunks {
|
|
chunks[i] = copyBytes(c)
|
|
}
|
|
contentLength := *obj.contentLength
|
|
lastModified := *obj.lastModified
|
|
return &storagePluginStorageInfo{
|
|
DataChunks: chunks,
|
|
lastModified: &lastModified,
|
|
contentLength: &contentLength,
|
|
}
|
|
}
|
|
|
|
func MockObject(data []Chunk) *storagePluginStorageInfo {
|
|
lastModified := time.Now()
|
|
dataChunks := make([]Chunk, len(data))
|
|
var contentLength int64
|
|
for i, chunk := range data {
|
|
dataChunks[i] = copyBytes(chunk)
|
|
contentLength += int64(len(chunk))
|
|
}
|
|
return &storagePluginStorageInfo{
|
|
DataChunks: dataChunks,
|
|
contentLength: &contentLength,
|
|
lastModified: &lastModified,
|
|
}
|
|
}
|
|
|
|
func copyBytes(in []byte) []byte {
|
|
return append([]byte{}, in...)
|
|
}
|
|
|
|
func createErrorWithBucketCredentialState(errCode codes.Code, msg string, sbcState *plgpb.StorageBucketCredentialState) error {
|
|
st := status.New(errCode, msg)
|
|
stWithDetails, stErr := st.WithDetails(sbcState)
|
|
if stErr == nil {
|
|
st = stWithDetails
|
|
}
|
|
return st.Err()
|
|
}
|
|
|
|
// listObjectsRecursive returns recursive list of all files and directories found under a prefix from a provided bucket
|
|
func listObjectsRecursive(bucket Bucket, searchPrefix string) []*plgpb.Object {
|
|
var objects []*plgpb.Object
|
|
dirs := make(map[string]struct{})
|
|
seenKeys := make(map[string]struct{})
|
|
|
|
for objectName := range bucket {
|
|
key := string(objectName)
|
|
// Skip any objects that do not match the search prefix
|
|
if !strings.HasPrefix(key, searchPrefix) {
|
|
continue
|
|
}
|
|
|
|
// Skip any objects that have already been seen
|
|
if _, exists := seenKeys[key]; exists {
|
|
continue
|
|
}
|
|
|
|
isDirFile := strings.HasSuffix(key, "/")
|
|
// If the object is a directory and matches the search prefix, skip it
|
|
// This can happen if the prefix itself is a directory
|
|
// e.g. prefix = "foo/", object key = "foo/"
|
|
// In this case we don't want to return the directory itself as an object
|
|
if isDirFile && key == searchPrefix {
|
|
continue
|
|
}
|
|
|
|
dirPaths := buildRecursiveDirectoryPaths(key, searchPrefix)
|
|
for _, dirPath := range dirPaths {
|
|
dirs[dirPath] = struct{}{}
|
|
seenKeys[dirPath] = struct{}{}
|
|
}
|
|
|
|
// Only add the object if it hasn't already been added as a directory
|
|
// This can happen if the object key ends with a '/' and is added as a directory
|
|
// in the previous step
|
|
if _, exists := seenKeys[key]; !exists {
|
|
objects = append(objects, &plgpb.Object{
|
|
Key: key,
|
|
IsDir: isDirFile,
|
|
})
|
|
}
|
|
seenKeys[key] = struct{}{}
|
|
}
|
|
|
|
for dirKey := range dirs {
|
|
objects = append(objects, &plgpb.Object{
|
|
Key: dirKey,
|
|
IsDir: true,
|
|
})
|
|
}
|
|
|
|
return objects
|
|
}
|
|
|
|
// listObjectsNonRecursive returns only top-level files and directories within the prefix
|
|
func listObjectsNonRecursive(bucket Bucket, searchPrefix string) []*plgpb.Object {
|
|
var objects []*plgpb.Object
|
|
seenKeys := make(map[string]struct{})
|
|
|
|
for objectName := range bucket {
|
|
key := string(objectName)
|
|
// Skip any objects that do not match the search prefix
|
|
if !strings.HasPrefix(key, searchPrefix) {
|
|
continue
|
|
}
|
|
|
|
// Skip any objects that have already been seen
|
|
if _, exists := seenKeys[key]; exists {
|
|
continue
|
|
}
|
|
|
|
isDirFile := strings.HasSuffix(key, "/")
|
|
// If the object is a directory and matches the search prefix, skip it
|
|
// This can happen if the prefix itself is a directory
|
|
// e.g. prefix = "foo/", object key = "foo/"
|
|
// In this case we don't want to return the directory itself as an object
|
|
if isDirFile && key == searchPrefix {
|
|
continue
|
|
}
|
|
|
|
// Trim the prefix and and trailing '/' after which we split the path
|
|
// to determine if this object is in a subdirectory or a file at the root prefix
|
|
relativeFullPath := strings.TrimSuffix(strings.TrimPrefix(key, searchPrefix), "/")
|
|
split := strings.Split(relativeFullPath, "/")
|
|
// Adding this to guardrail against the case that the stdlib updates the behavior
|
|
// of strings.Split() causing it to not always return at least a 1 length slice
|
|
if len(split) == 0 {
|
|
continue
|
|
}
|
|
relativeParentPath := searchPrefix + split[0]
|
|
|
|
// If the original object key ends with a '/' we treat it as a directory
|
|
// even if it is at the root of the prefix.
|
|
// If the split contains more than one entry, it is also a directory as it has sub-paths
|
|
if len(split) > 1 || isDirFile {
|
|
relativeParentPath += "/"
|
|
}
|
|
|
|
// Skip any constructed parent paths that have already been seen
|
|
if _, exists := seenKeys[relativeParentPath]; exists {
|
|
continue
|
|
}
|
|
|
|
// If the split contains more than one entry, it is a directory as it has sub-paths
|
|
// If the original object key ends with a '/' we treat it as a directory
|
|
// even if it is at the root of the prefix.
|
|
if len(split) > 1 {
|
|
objects = append(objects, &plgpb.Object{
|
|
Key: relativeParentPath,
|
|
IsDir: true,
|
|
})
|
|
} else {
|
|
objects = append(objects, &plgpb.Object{
|
|
Key: relativeParentPath,
|
|
IsDir: strings.HasSuffix(relativeParentPath, "/"),
|
|
})
|
|
}
|
|
seenKeys[relativeParentPath] = struct{}{}
|
|
}
|
|
|
|
return objects
|
|
}
|
|
|
|
// buildDirectoryPaths collects all parent directory paths from an object key
|
|
// that are within the given prefix. This is used to simulate directory
|
|
// structures in object storage.
|
|
func buildRecursiveDirectoryPaths(objectKey, prefix string) []string {
|
|
var dirPaths []string
|
|
prefixDir := strings.TrimSuffix(prefix, "/")
|
|
|
|
pathParts := strings.Split(objectKey, "/")
|
|
// Build all possible parent directory paths from the split parts
|
|
for i := 1; i < len(pathParts); i++ {
|
|
dirPath := strings.Join(pathParts[:i], "/")
|
|
// Only include directories that are deeper than the search prefix and add a trailing slash to denote a directory
|
|
if len(dirPath) > len(prefixDir) && strings.HasPrefix(dirPath+"/", prefix) {
|
|
dirPaths = append(dirPaths, fmt.Sprintf("%s/", dirPath))
|
|
}
|
|
}
|
|
return dirPaths
|
|
}
|