refactor(bsr): optimize ChunkEncoder.Encode() (#3476)

pull/3512/head
Damian Debkowski 3 years ago committed by GitHub
parent 1f6e000357
commit a67e42b00f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,10 +9,37 @@ import (
"context"
"encoding/binary"
"fmt"
"hash"
"hash/crc32"
"io"
"sync"
)
type encodeCache struct {
crced [crcDataSize]byte
compress *bytes.Buffer
crc hash.Hash32
}
// Reset clears all existing values in the cache item, preventing dirty reads.
// This function should be called when retrieving items from the encodeCachePool.
func (e *encodeCache) Reset() {
e.compress.Reset()
e.crc.Reset()
}
// encodeCachePool is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector.
// encodeCachePool is safe for use by multiple goroutines simultaneously.
// encodeCachePool must not be copied after first use.
var encodeCachePool = &sync.Pool{
New: func() interface{} {
return &encodeCache{
compress: bytes.NewBuffer(make([]byte, 0, 1024)),
crc: crc32.NewIEEE(),
}
},
}
// ChunkEncoder will encode a chunk and write it to the writer.
// It will compress the chunk data based on the compression.
type ChunkEncoder struct {
@ -46,12 +73,15 @@ func NewChunkEncoder(ctx context.Context, w io.Writer, c Compression, e Encrypti
// Encode serializes a Chunk and writes it with the encoder's writer.
func (e ChunkEncoder) Encode(ctx context.Context, c Chunk) (int, error) {
encode := encodeCachePool.Get().(*encodeCache)
encode.Reset()
defer encodeCachePool.Put(encode)
data, err := c.MarshalData(ctx)
if err != nil {
return 0, err
}
var buf bytes.Buffer
var compressor io.WriteCloser
switch c.GetType() {
// Header should not be compressed since we need to read it prior to knowing
@ -59,13 +89,13 @@ func (e ChunkEncoder) Encode(ctx context.Context, c Chunk) (int, error) {
// End should not be compressed since it has no data and compressing an empty
// byte slice just adds data in the form of the compression magic strings.
case ChunkHeader, ChunkEnd:
compressor = newNullCompressionWriter(&buf)
compressor = newNullCompressionWriter(encode.compress)
default:
switch e.compression {
case GzipCompression:
compressor = gzip.NewWriter(&buf)
compressor = gzip.NewWriter(encode.compress)
default:
compressor = newNullCompressionWriter(&buf)
compressor = newNullCompressionWriter(encode.compress)
}
}
@ -76,30 +106,28 @@ func (e ChunkEncoder) Encode(ctx context.Context, c Chunk) (int, error) {
if err != nil {
return 0, err
}
length := buf.Len()
t := c.GetTimestamp().marshal()
length := encode.compress.Len()
// calculate CRC for protocol+type+dir+timestamp+data
crced := make([]byte, 0, chunkBaseSize+length)
crced = append(crced, c.GetProtocol()...)
crced = append(crced, c.GetType()...)
crced = append(crced, byte(c.GetDirection()))
crced = append(crced, t...)
crced = append(crced, buf.Bytes()...)
copy(encode.crced[0:], []byte(c.GetProtocol()))
copy(encode.crced[protocolSize:], []byte(c.GetType()))
encode.crced[protocolSize+chunkTypeSize] = byte(c.GetDirection())
copy(encode.crced[protocolSize+chunkTypeSize+directionSize:], c.GetTimestamp().marshal())
crc := crc32.NewIEEE()
_, err = crc.Write(crced)
if err != nil {
if _, err := encode.crc.Write(encode.crced[0:]); err != nil {
return 0, err
}
if _, err := encode.crc.Write(encode.compress.Bytes()); err != nil {
return 0, err
}
sum := encode.crc.Sum32()
d := make([]byte, 0, chunkBaseSize+length+crcSize)
d = binary.BigEndian.AppendUint32(d, uint32(length))
d = append(d, crced...)
d = binary.BigEndian.AppendUint32(d, crc.Sum32())
encodedChunk := make([]byte, chunkBaseSize+length+crcSize)
binary.BigEndian.PutUint32(encodedChunk[0:], uint32(length))
copy(encodedChunk[lengthSize:], encode.crced[0:])
copy(encodedChunk[chunkBaseSize:], encode.compress.Bytes())
binary.BigEndian.PutUint32(encodedChunk[chunkBaseSize+length:], sum)
return e.w.Write(d)
return e.w.Write(encodedChunk)
}
// Close closes the encoder.

@ -0,0 +1,89 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package bsr
import (
"bytes"
"context"
"fmt"
"testing"
"time"
)
type testChunk struct {
*BaseChunk
Data []byte
}
// MarshalData serializes the data portion of a chunk.
func (t *testChunk) MarshalData(_ context.Context) ([]byte, error) {
return t.Data, nil
}
func newTestChunk(d int) *testChunk {
data := make([]byte, d)
ts := time.Date(2023, time.March, 16, 10, 47, 3, 14, time.UTC)
return &testChunk{
BaseChunk: &BaseChunk{
Protocol: "TEST",
Direction: Inbound,
Timestamp: NewTimestamp(ts),
Type: "TEST",
},
Data: data,
}
}
func BenchmarkEncodeParallel(b *testing.B) {
b.ReportAllocs()
cases := []int{16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536}
for _, chunkSize := range cases {
b.StopTimer()
ctx := context.Background()
chunks := make([]Chunk, 250)
for i := range chunks {
chunks[i] = newTestChunk(chunkSize)
}
b.StartTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var buf bytes.Buffer
enc, _ := NewChunkEncoder(ctx, &buf, NoCompression, NoEncryption)
for _, c := range chunks {
if _, err := enc.Encode(ctx, c); err != nil {
b.Fatal("Encode:", err)
}
}
b.SetBytes(int64(len(buf.Bytes())))
}
})
}
}
func BenchmarkEncodeSequential(b *testing.B) {
cases := []int{16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536}
for _, chunkSize := range cases {
b.Run(fmt.Sprintf("%d", chunkSize), func(b *testing.B) {
b.ReportAllocs()
b.StopTimer()
ctx := context.Background()
chunks := make([]Chunk, 250)
for i := range chunks {
chunks[i] = newTestChunk(chunkSize)
}
b.StartTimer()
for i := 0; i < b.N; i++ {
var buf bytes.Buffer
enc, _ := NewChunkEncoder(ctx, &buf, NoCompression, NoEncryption)
for _, c := range chunks {
if _, err := enc.Encode(ctx, c); err != nil {
b.Fatal("Encode:", err)
}
}
b.SetBytes(int64(len(buf.Bytes())))
}
})
}
}
Loading…
Cancel
Save