diff --git a/docs/plugin-protocol/tfplugin6.proto b/docs/plugin-protocol/tfplugin6.proto index 66d7563b2e..30980ea515 100644 --- a/docs/plugin-protocol/tfplugin6.proto +++ b/docs/plugin-protocol/tfplugin6.proto @@ -943,6 +943,7 @@ message ReadStateBytes { } message Response { bytes bytes = 1; + // total_length is the overall size of all of the state byte chunks that will be received int64 total_length = 2; StateRange range = 3; repeated Diagnostic diagnostics = 4; @@ -955,6 +956,7 @@ message WriteStateBytes { optional RequestChunkMeta meta = 1; bytes bytes = 2; + // total_length is the overall size of all of the state byte chunks that will be sent. int64 total_length = 3; StateRange range = 4; } @@ -969,7 +971,11 @@ message RequestChunkMeta { } message StateRange { + // start is the starting byte index for a chunk of state byte data. + // This index is in relation to the entire byte array that will be sent or received. int64 start = 1; + // end is the ending byte index for a chunk of state byte data. + // This index is in relation to the entire byte array that will be sent or received. int64 end = 2; } diff --git a/internal/grpcwrap/provider6.go b/internal/grpcwrap/provider6.go index 35a6679860..63197428c8 100644 --- a/internal/grpcwrap/provider6.go +++ b/internal/grpcwrap/provider6.go @@ -1045,7 +1045,7 @@ func (p *provider6) ReadStateBytes(req *tfplugin6.ReadStateBytes_Request, srv tf TotalLength: int64(totalLength), Range: &proto6.StateRange{ Start: int64(rangeStart), - End: int64(rangeStart + byteCount), + End: int64(rangeStart+byteCount) - 1, }, Diagnostics: diags, }) diff --git a/internal/plugin6/grpc_provider.go b/internal/plugin6/grpc_provider.go index b460f711a4..986e651058 100644 --- a/internal/plugin6/grpc_provider.go +++ b/internal/plugin6/grpc_provider.go @@ -1601,7 +1601,7 @@ func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp p resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("Unable to determine chunk size for provider %s; this is a bug in Terraform - please report it", r.TypeName)) return resp } - if chunk.Range.End < chunk.TotalLength { + if chunk.Range.End < chunk.TotalLength-1 { // all but last chunk must match exactly if len(chunk.Bytes) != chunkSize { resp.Diagnostics = resp.Diagnostics.Append(&hcl.Diagnostic{ @@ -1733,7 +1733,7 @@ func (p *GRPCProvider) WriteStateBytes(r providers.WriteStateBytesRequest) (resp TotalLength: totalLength, Range: &proto6.StateRange{ Start: int64(totalBytesProcessed), - End: int64(totalBytesProcessed + len(chunk)), + End: int64(totalBytesProcessed+len(chunk)) - 1, }, } err = client.Send(protoReq) diff --git a/internal/plugin6/grpc_provider_test.go b/internal/plugin6/grpc_provider_test.go index 946472dc4f..07b05c968b 100644 --- a/internal/plugin6/grpc_provider_test.go +++ b/internal/plugin6/grpc_provider_test.go @@ -2622,7 +2622,7 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { TotalLength: int64(totalLength), Range: &proto.StateRange{ Start: 0, - End: int64(len(chunks[0])), + End: int64(len(chunks[0])) - 1, }, }, err: nil, @@ -2633,7 +2633,7 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { TotalLength: int64(totalLength), Range: &proto.StateRange{ Start: int64(len(chunks[0])), - End: int64(len(chunks[1])), + End: int64(len(chunks[1])) - 1, }, }, err: nil, @@ -2667,6 +2667,107 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { } }) + t.Run("can process multiple chunks when last chunk size is one byte", func(t *testing.T) { + client := mockProviderClient(t) + p := &GRPCProvider{ + client: client, + ctx: context.Background(), + } + p.SetStateStoreChunkSize("mock_store", 5) + + // Call to ReadStateBytes + // > Assert the arguments received + // > Define the returned mock client + mockReadBytesClient := mockReadStateBytesClient(t) + + expectedReq := &proto.ReadStateBytes_Request{ + TypeName: "mock_store", + StateId: backend.DefaultStateName, + } + client.EXPECT().ReadStateBytes( + gomock.Any(), + gomock.Eq(expectedReq), + gomock.Any(), + ).Return(mockReadBytesClient, nil) + + // Define what will be returned by each call to Recv + chunk := "helloworld!" + totalLength := len(chunk) + mockResp := map[int]struct { + resp *proto.ReadStateBytes_Response + err error + }{ + 0: { + resp: &proto.ReadStateBytes_Response{ + Bytes: []byte(chunk[:5]), + TotalLength: int64(totalLength), + Range: &proto.StateRange{ + Start: 0, + End: 4, + }, + Diagnostics: []*proto.Diagnostic{}, + }, + err: nil, + }, + 1: { + resp: &proto.ReadStateBytes_Response{ + Bytes: []byte(chunk[5:10]), + TotalLength: int64(totalLength), + Range: &proto.StateRange{ + Start: 5, + End: 9, + }, + Diagnostics: []*proto.Diagnostic{}, + }, + err: nil, + }, + 2: { + resp: &proto.ReadStateBytes_Response{ + Bytes: []byte(chunk[10:]), + TotalLength: int64(totalLength), + Range: &proto.StateRange{ + Start: 10, + End: 10, + }, + Diagnostics: []*proto.Diagnostic{}, + }, + err: nil, + }, + 3: { + resp: &proto.ReadStateBytes_Response{}, + err: io.EOF, + }, + } + var count int + mockReadBytesClient.EXPECT().Recv().DoAndReturn(func() (*proto.ReadStateBytes_Response, error) { + ret := mockResp[count] + count++ + return ret.resp, ret.err + }).Times(4) + + // There will be a call to CloseSend to close the stream + mockReadBytesClient.EXPECT().CloseSend().Return(nil).Times(1) + + // Act + request := providers.ReadStateBytesRequest{ + TypeName: expectedReq.TypeName, + StateId: expectedReq.StateId, + } + resp := p.ReadStateBytes(request) + + // Assert returned values + checkDiags(t, resp.Diagnostics) + + // Chunk size mismatches are warnings so ensure there aren't any + if resp.Diagnostics.HasWarnings() { + t.Fatal(resp.Diagnostics.ErrWithWarnings()) + } + + if string(resp.Bytes) != "helloworld!" { + t.Fatalf("expected data to be %q, got: %q", "helloworld!", string(resp.Bytes)) + } + }) + t.Run("an error diagnostic is returned when final length does not match expectations", func(t *testing.T) { client := mockProviderClient(t) p := &GRPCProvider{ @@ -2703,7 +2804,7 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { TotalLength: incorrectLength, Range: &proto.StateRange{ Start: 0, - End: int64(len(chunks[0])), + End: int64(len(chunks[0])) - 1, }, }, err: nil, @@ -2714,7 +2815,7 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { TotalLength: incorrectLength, Range: &proto.StateRange{ Start: int64(len(chunks[0])), - End: int64(len(chunks[1])), + End: int64(len(chunks[1])) - 1, }, }, err: nil, @@ -2868,7 +2969,7 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { TotalLength: int64(totalLength), Range: &proto.StateRange{ Start: 0, - End: int64(len(chunk)), + End: int64(len(chunk)) - 1, }, Diagnostics: []*proto.Diagnostic{ { @@ -3039,7 +3140,7 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { TotalLength: int64(totalLength), Range: &proto.StateRange{ Start: 0, - End: 4, + End: 3, }, Diagnostics: []*proto.Diagnostic{}, }, @@ -3051,7 +3152,7 @@ func TestGRPCProvider_ReadStateBytes(t *testing.T) { TotalLength: int64(totalLength), Range: &proto.StateRange{ Start: 4, - End: 10, + End: 9, }, Diagnostics: []*proto.Diagnostic{}, }, @@ -3133,7 +3234,7 @@ func TestGRPCProvider_WriteStateBytes(t *testing.T) { TotalLength: int64(len(data)), Range: &proto.StateRange{ Start: 0, - End: int64(len(data)), + End: int64(len(data)) - 1, }, } mockWriteClient.EXPECT().Send(gomock.Eq(expectedReq)).Times(1).Return(nil) @@ -3193,7 +3294,7 @@ func TestGRPCProvider_WriteStateBytes(t *testing.T) { TotalLength: int64(len(data)), Range: &proto.StateRange{ Start: 0, - End: int64(chunkSize), + End: int64(chunkSize) - 1, }, } req2 := &proto.WriteStateBytes_RequestChunk{ @@ -3202,7 +3303,7 @@ func TestGRPCProvider_WriteStateBytes(t *testing.T) { TotalLength: int64(len(data)), Range: &proto.StateRange{ Start: int64(chunkSize), - End: int64(chunkSize + 10), + End: int64(chunkSize+10) - 1, }, } mockWriteClient.EXPECT().Send(gomock.AnyOf(req1, req2)).Times(2).Return(nil) @@ -3281,7 +3382,7 @@ func TestGRPCProvider_WriteStateBytes(t *testing.T) { TotalLength: int64(len(data)), Range: &proto.StateRange{ Start: 0, - End: int64(len(data)), + End: int64(len(data)) - 1, }, } mockResp := &proto.WriteStateBytes_Response{ @@ -3337,7 +3438,7 @@ func TestGRPCProvider_WriteStateBytes(t *testing.T) { TotalLength: int64(len(data)), Range: &proto.StateRange{ Start: 0, - End: int64(len(data)), + End: int64(len(data)) - 1, }, } mockResp := &proto.WriteStateBytes_Response{ diff --git a/internal/tfplugin6/tfplugin6.pb.go b/internal/tfplugin6/tfplugin6.pb.go index bb76eff6f1..ab54f3405a 100644 --- a/internal/tfplugin6/tfplugin6.pb.go +++ b/internal/tfplugin6/tfplugin6.pb.go @@ -2283,9 +2283,13 @@ func (x *RequestChunkMeta) GetStateId() string { } type StateRange struct { - state protoimpl.MessageState `protogen:"open.v1"` - Start int64 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"` - End int64 `protobuf:"varint,2,opt,name=end,proto3" json:"end,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + // start is the starting byte index for a chunk of state byte data. + // This index is in relation to the entire byte array that will be sent or received. + Start int64 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"` + // end is the ending byte index for a chunk of state byte data. + // This index is in relation to the entire byte array that will be sent or received. + End int64 `protobuf:"varint,2,opt,name=end,proto3" json:"end,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -7027,11 +7031,12 @@ func (x *ReadStateBytes_Request) GetStateId() string { } type ReadStateBytes_Response struct { - state protoimpl.MessageState `protogen:"open.v1"` - Bytes []byte `protobuf:"bytes,1,opt,name=bytes,proto3" json:"bytes,omitempty"` - TotalLength int64 `protobuf:"varint,2,opt,name=total_length,json=totalLength,proto3" json:"total_length,omitempty"` - Range *StateRange `protobuf:"bytes,3,opt,name=range,proto3" json:"range,omitempty"` - Diagnostics []*Diagnostic `protobuf:"bytes,4,rep,name=diagnostics,proto3" json:"diagnostics,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Bytes []byte `protobuf:"bytes,1,opt,name=bytes,proto3" json:"bytes,omitempty"` + // total_length is the overall size of all of the state byte chunks that will be received + TotalLength int64 `protobuf:"varint,2,opt,name=total_length,json=totalLength,proto3" json:"total_length,omitempty"` + Range *StateRange `protobuf:"bytes,3,opt,name=range,proto3" json:"range,omitempty"` + Diagnostics []*Diagnostic `protobuf:"bytes,4,rep,name=diagnostics,proto3" json:"diagnostics,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -7097,10 +7102,11 @@ func (x *ReadStateBytes_Response) GetDiagnostics() []*Diagnostic { type WriteStateBytes_RequestChunk struct { state protoimpl.MessageState `protogen:"open.v1"` // meta is sent with the first chunk only - Meta *RequestChunkMeta `protobuf:"bytes,1,opt,name=meta,proto3,oneof" json:"meta,omitempty"` - Bytes []byte `protobuf:"bytes,2,opt,name=bytes,proto3" json:"bytes,omitempty"` - TotalLength int64 `protobuf:"varint,3,opt,name=total_length,json=totalLength,proto3" json:"total_length,omitempty"` - Range *StateRange `protobuf:"bytes,4,opt,name=range,proto3" json:"range,omitempty"` + Meta *RequestChunkMeta `protobuf:"bytes,1,opt,name=meta,proto3,oneof" json:"meta,omitempty"` + Bytes []byte `protobuf:"bytes,2,opt,name=bytes,proto3" json:"bytes,omitempty"` + // total_length is the overall size of all of the state byte chunks that will be sent. + TotalLength int64 `protobuf:"varint,3,opt,name=total_length,json=totalLength,proto3" json:"total_length,omitempty"` + Range *StateRange `protobuf:"bytes,4,opt,name=range,proto3" json:"range,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache }