diff --git a/internal/bsr/encode.go b/internal/bsr/encode.go index df23bd1d80..34282cd236 100644 --- a/internal/bsr/encode.go +++ b/internal/bsr/encode.go @@ -9,10 +9,37 @@ import ( "context" "encoding/binary" "fmt" + "hash" "hash/crc32" "io" + "sync" ) +type encodeCache struct { + crced [crcDataSize]byte + compress *bytes.Buffer + crc hash.Hash32 +} + +// Reset clears all existing values in the cache item, preventing dirty reads. +// This function should be called when retrieving items from the encodeCachePool. +func (e *encodeCache) Reset() { + e.compress.Reset() + e.crc.Reset() +} + +// encodeCachePool is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. +// encodeCachePool is safe for use by multiple goroutines simultaneously. +// encodeCachePool must not be copied after first use. +var encodeCachePool = &sync.Pool{ + New: func() interface{} { + return &encodeCache{ + compress: bytes.NewBuffer(make([]byte, 0, 1024)), + crc: crc32.NewIEEE(), + } + }, +} + // ChunkEncoder will encode a chunk and write it to the writer. // It will compress the chunk data based on the compression. type ChunkEncoder struct { @@ -46,12 +73,15 @@ func NewChunkEncoder(ctx context.Context, w io.Writer, c Compression, e Encrypti // Encode serializes a Chunk and writes it with the encoder's writer. func (e ChunkEncoder) Encode(ctx context.Context, c Chunk) (int, error) { + encode := encodeCachePool.Get().(*encodeCache) + encode.Reset() + defer encodeCachePool.Put(encode) + data, err := c.MarshalData(ctx) if err != nil { return 0, err } - var buf bytes.Buffer var compressor io.WriteCloser switch c.GetType() { // Header should not be compressed since we need to read it prior to knowing @@ -59,13 +89,13 @@ func (e ChunkEncoder) Encode(ctx context.Context, c Chunk) (int, error) { // End should not be compressed since it has no data and compressing an empty // byte slice just adds data in the form of the compression magic strings. case ChunkHeader, ChunkEnd: - compressor = newNullCompressionWriter(&buf) + compressor = newNullCompressionWriter(encode.compress) default: switch e.compression { case GzipCompression: - compressor = gzip.NewWriter(&buf) + compressor = gzip.NewWriter(encode.compress) default: - compressor = newNullCompressionWriter(&buf) + compressor = newNullCompressionWriter(encode.compress) } } @@ -76,30 +106,28 @@ func (e ChunkEncoder) Encode(ctx context.Context, c Chunk) (int, error) { if err != nil { return 0, err } - length := buf.Len() - - t := c.GetTimestamp().marshal() + length := encode.compress.Len() - // calculate CRC for protocol+type+dir+timestamp+data - crced := make([]byte, 0, chunkBaseSize+length) - crced = append(crced, c.GetProtocol()...) - crced = append(crced, c.GetType()...) - crced = append(crced, byte(c.GetDirection())) - crced = append(crced, t...) - crced = append(crced, buf.Bytes()...) + copy(encode.crced[0:], []byte(c.GetProtocol())) + copy(encode.crced[protocolSize:], []byte(c.GetType())) + encode.crced[protocolSize+chunkTypeSize] = byte(c.GetDirection()) + copy(encode.crced[protocolSize+chunkTypeSize+directionSize:], c.GetTimestamp().marshal()) - crc := crc32.NewIEEE() - _, err = crc.Write(crced) - if err != nil { + if _, err := encode.crc.Write(encode.crced[0:]); err != nil { + return 0, err + } + if _, err := encode.crc.Write(encode.compress.Bytes()); err != nil { return 0, err } + sum := encode.crc.Sum32() - d := make([]byte, 0, chunkBaseSize+length+crcSize) - d = binary.BigEndian.AppendUint32(d, uint32(length)) - d = append(d, crced...) - d = binary.BigEndian.AppendUint32(d, crc.Sum32()) + encodedChunk := make([]byte, chunkBaseSize+length+crcSize) + binary.BigEndian.PutUint32(encodedChunk[0:], uint32(length)) + copy(encodedChunk[lengthSize:], encode.crced[0:]) + copy(encodedChunk[chunkBaseSize:], encode.compress.Bytes()) + binary.BigEndian.PutUint32(encodedChunk[chunkBaseSize+length:], sum) - return e.w.Write(d) + return e.w.Write(encodedChunk) } // Close closes the encoder. diff --git a/internal/bsr/encode_bench_test.go b/internal/bsr/encode_bench_test.go new file mode 100644 index 0000000000..00149341ff --- /dev/null +++ b/internal/bsr/encode_bench_test.go @@ -0,0 +1,89 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package bsr + +import ( + "bytes" + "context" + "fmt" + "testing" + "time" +) + +type testChunk struct { + *BaseChunk + Data []byte +} + +// MarshalData serializes the data portion of a chunk. +func (t *testChunk) MarshalData(_ context.Context) ([]byte, error) { + return t.Data, nil +} + +func newTestChunk(d int) *testChunk { + data := make([]byte, d) + ts := time.Date(2023, time.March, 16, 10, 47, 3, 14, time.UTC) + return &testChunk{ + BaseChunk: &BaseChunk{ + Protocol: "TEST", + Direction: Inbound, + Timestamp: NewTimestamp(ts), + Type: "TEST", + }, + Data: data, + } +} + +func BenchmarkEncodeParallel(b *testing.B) { + b.ReportAllocs() + cases := []int{16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536} + for _, chunkSize := range cases { + b.StopTimer() + ctx := context.Background() + chunks := make([]Chunk, 250) + for i := range chunks { + chunks[i] = newTestChunk(chunkSize) + } + b.StartTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var buf bytes.Buffer + enc, _ := NewChunkEncoder(ctx, &buf, NoCompression, NoEncryption) + for _, c := range chunks { + if _, err := enc.Encode(ctx, c); err != nil { + b.Fatal("Encode:", err) + } + } + b.SetBytes(int64(len(buf.Bytes()))) + } + }) + } +} + +func BenchmarkEncodeSequential(b *testing.B) { + cases := []int{16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536} + for _, chunkSize := range cases { + b.Run(fmt.Sprintf("%d", chunkSize), func(b *testing.B) { + b.ReportAllocs() + b.StopTimer() + ctx := context.Background() + chunks := make([]Chunk, 250) + for i := range chunks { + chunks[i] = newTestChunk(chunkSize) + } + b.StartTimer() + + for i := 0; i < b.N; i++ { + var buf bytes.Buffer + enc, _ := NewChunkEncoder(ctx, &buf, NoCompression, NoEncryption) + for _, c := range chunks { + if _, err := enc.Encode(ctx, c); err != nil { + b.Fatal("Encode:", err) + } + } + b.SetBytes(int64(len(buf.Bytes()))) + } + }) + } +}