From 6a5f6938efae15bc2541c2b9f006005c9775529c Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 8 May 2016 14:11:54 +0300 Subject: [PATCH] post-processor/compress: add bgzf format support * add bgzf compressor (it allows seek inside compressed file) * add optional format config variable to specify archive format * Update pgzip to get sse4 and avx optimizations. Signed-off-by: Vasiliy Tolstov --- post-processor/compress/artifact.go | 14 +- post-processor/compress/post-processor.go | 35 +- vendor/github.com/biogo/hts/LICENSE | 23 + vendor/github.com/biogo/hts/bgzf/bgzf.go | 104 +++ vendor/github.com/biogo/hts/bgzf/cache.go | 196 +++++ vendor/github.com/biogo/hts/bgzf/reader.go | 703 ++++++++++++++++++ vendor/github.com/biogo/hts/bgzf/writer.go | 282 +++++++ .../docs/post-processors/compress.html.md | 3 + 8 files changed, 1344 insertions(+), 16 deletions(-) create mode 100644 vendor/github.com/biogo/hts/LICENSE create mode 100644 vendor/github.com/biogo/hts/bgzf/bgzf.go create mode 100644 vendor/github.com/biogo/hts/bgzf/cache.go create mode 100644 vendor/github.com/biogo/hts/bgzf/reader.go create mode 100644 vendor/github.com/biogo/hts/bgzf/writer.go diff --git a/post-processor/compress/artifact.go b/post-processor/compress/artifact.go index 56a5ce402..4b19f779f 100644 --- a/post-processor/compress/artifact.go +++ b/post-processor/compress/artifact.go @@ -8,16 +8,8 @@ import ( const BuilderId = "packer.post-processor.compress" type Artifact struct { - Path string - Provider string - files []string -} - -func NewArtifact(provider, path string) *Artifact { - return &Artifact{ - Path: path, - Provider: provider, - } + Path string + files []string } func (a *Artifact) BuilderId() string { @@ -33,7 +25,7 @@ func (a *Artifact) Files() []string { } func (a *Artifact) String() string { - return fmt.Sprintf("'%s' compressing: %s", a.Provider, a.Path) + return fmt.Sprintf("compressed artifacts in: %s", a.Path) } func (*Artifact) State(name string) interface{} { diff --git a/post-processor/compress/post-processor.go b/post-processor/compress/post-processor.go index b95b27bde..2d99e0ea0 100644 --- a/post-processor/compress/post-processor.go +++ b/post-processor/compress/post-processor.go @@ -11,6 +11,7 @@ import ( "regexp" "runtime" + "github.com/biogo/hts/bgzf" "github.com/klauspost/pgzip" "github.com/mitchellh/packer/common" "github.com/mitchellh/packer/helper/config" @@ -36,6 +37,7 @@ type Config struct { // Fields from config file OutputPath string `mapstructure:"output"` + Format string `mapstructure:"format"` CompressionLevel int `mapstructure:"compression_level"` KeepInputArtifact bool `mapstructure:"keep_input_artifact"` @@ -115,6 +117,10 @@ func (p *PostProcessor) PostProcess(ui packer.Ui, artifact packer.Artifact) (pac keep := p.config.KeepInputArtifact newArtifact := &Artifact{Path: target} + if err = os.MkdirAll(filepath.Dir(target), os.FileMode(0755)); err != nil { + return nil, false, fmt.Errorf( + "Unable to create dir for archive %s: %s", target, err) + } outputFile, err := os.Create(target) if err != nil { return nil, false, fmt.Errorf( @@ -126,6 +132,11 @@ func (p *PostProcessor) PostProcess(ui packer.Ui, artifact packer.Artifact) (pac // compression writer. Otherwise it's just a file. var output io.WriteCloser switch p.config.Algorithm { + case "bgzf": + ui.Say(fmt.Sprintf("Using bgzf compression with %d cores for %s", + runtime.GOMAXPROCS(-1), target)) + output, err = makeBGZFWriter(outputFile, p.config.CompressionLevel) + defer output.Close() case "lz4": ui.Say(fmt.Sprintf("Using lz4 compression with %d cores for %s", runtime.GOMAXPROCS(-1), target)) @@ -190,15 +201,21 @@ func (p *PostProcessor) PostProcess(ui packer.Ui, artifact packer.Artifact) (pac } func (config *Config) detectFromFilename() { + var result [][]string extensions := map[string]string{ - "tar": "tar", - "zip": "zip", - "gz": "pgzip", - "lz4": "lz4", + "tar": "tar", + "zip": "zip", + "gz": "pgzip", + "lz4": "lz4", + "bgzf": "bgzf", } - result := filenamePattern.FindAllStringSubmatch(config.OutputPath, -1) + if config.Format == "" { + result = filenamePattern.FindAllStringSubmatch(config.OutputPath, -1) + } else { + result = filenamePattern.FindAllStringSubmatch(fmt.Sprintf("%s.%s", config.OutputPath, config.Format), -1) + } // No dots. Bail out with defaults. if len(result) == 0 { @@ -240,6 +257,14 @@ func (config *Config) detectFromFilename() { return } +func makeBGZFWriter(output io.WriteCloser, compressionLevel int) (io.WriteCloser, error) { + bgzfWriter, err := bgzf.NewWriterLevel(output, compressionLevel, runtime.GOMAXPROCS(-1)) + if err != nil { + return nil, ErrInvalidCompressionLevel + } + return bgzfWriter, nil +} + func makeLZ4Writer(output io.WriteCloser, compressionLevel int) (io.WriteCloser, error) { lzwriter := lz4.NewWriter(output) if compressionLevel > gzip.DefaultCompression { diff --git a/vendor/github.com/biogo/hts/LICENSE b/vendor/github.com/biogo/hts/LICENSE new file mode 100644 index 000000000..75cc58be3 --- /dev/null +++ b/vendor/github.com/biogo/hts/LICENSE @@ -0,0 +1,23 @@ +Copyright ©2012 The bíogo Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the bíogo project nor the names of its authors and + contributors may be used to endorse or promote products derived from this + software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/biogo/hts/bgzf/bgzf.go b/vendor/github.com/biogo/hts/bgzf/bgzf.go new file mode 100644 index 000000000..a0956d64f --- /dev/null +++ b/vendor/github.com/biogo/hts/bgzf/bgzf.go @@ -0,0 +1,104 @@ +// Copyright ©2012 The bíogo Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package bgzf implements BGZF format reading and writing according to the +// SAM specification. +// +// The specification is available at https://github.com/samtools/hts-specs. +package bgzf + +import ( + "errors" + "io" + "os" + "time" +) + +const ( + BlockSize = 0x0ff00 // The maximum size of an uncompressed input data block. + MaxBlockSize = 0x10000 // The maximum size of a compressed output block. +) + +const ( + bgzfExtra = "BC\x02\x00\x00\x00" + minFrame = 20 + len(bgzfExtra) // Minimum bgzf header+footer length. + + // Magic EOF block. + magicBlock = "\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00" +) + +var ( + bgzfExtraPrefix = []byte(bgzfExtra[:4]) + unixEpoch = time.Unix(0, 0) +) + +func compressBound(srcLen int) int { + return srcLen + srcLen>>12 + srcLen>>14 + srcLen>>25 + 13 + minFrame +} + +func init() { + if compressBound(BlockSize) > MaxBlockSize { + panic("bam: BlockSize too large") + } +} + +var ( + ErrClosed = errors.New("bgzf: use of closed writer") + ErrBlockOverflow = errors.New("bgzf: block overflow") + ErrWrongFileType = errors.New("bgzf: file is a directory") + ErrNoEnd = errors.New("bgzf: cannot determine offset from end") + ErrNotASeeker = errors.New("bgzf: not a seeker") + ErrContaminatedCache = errors.New("bgzf: cache owner mismatch") + ErrNoBlockSize = errors.New("bgzf: could not determine block size") + ErrBlockSizeMismatch = errors.New("bgzf: unexpected block size") +) + +// HasEOF checks for the presence of a BGZF magic EOF block. +// The magic block is defined in the SAM specification. A magic block +// is written by a Writer on calling Close. The ReaderAt must provide +// some method for determining valid ReadAt offsets. +func HasEOF(r io.ReaderAt) (bool, error) { + type sizer interface { + Size() int64 + } + type stater interface { + Stat() (os.FileInfo, error) + } + type lenSeeker interface { + io.Seeker + Len() int + } + var size int64 + switch r := r.(type) { + case sizer: + size = r.Size() + case stater: + fi, err := r.Stat() + if err != nil { + return false, err + } + size = fi.Size() + case lenSeeker: + var err error + size, err = r.Seek(0, 1) + if err != nil { + return false, err + } + size += int64(r.Len()) + default: + return false, ErrNoEnd + } + + b := make([]byte, len(magicBlock)) + _, err := r.ReadAt(b, size-int64(len(magicBlock))) + if err != nil { + return false, err + } + for i, c := range b { + if c != magicBlock[i] { + return false, nil + } + } + return true, nil +} diff --git a/vendor/github.com/biogo/hts/bgzf/cache.go b/vendor/github.com/biogo/hts/bgzf/cache.go new file mode 100644 index 000000000..596bc0a86 --- /dev/null +++ b/vendor/github.com/biogo/hts/bgzf/cache.go @@ -0,0 +1,196 @@ +// Copyright ©2012 The bíogo Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package bgzf + +import ( + "bytes" + "compress/gzip" + "io" +) + +// Cache is a Block caching type. Basic cache implementations are provided +// in the cache package. A Cache must be safe for concurrent use. +// +// If a Cache is a Wrapper, its Wrap method is called on newly created blocks. +type Cache interface { + // Get returns the Block in the Cache with the specified + // base or a nil Block if it does not exist. The returned + // Block must be removed from the Cache. + Get(base int64) Block + + // Put inserts a Block into the Cache, returning the Block + // that was evicted or nil if no eviction was necessary and + // a boolean indicating whether the put Block was retained + // by the Cache. + Put(Block) (evicted Block, retained bool) + + // Peek returns whether a Block exists in the cache for the + // given base. If a Block satisfies the request, then exists + // is returned as true with the offset for the next Block in + // the stream, otherwise false and -1. + Peek(base int64) (exists bool, next int64) +} + +// Wrapper defines Cache types that need to modify a Block at its creation. +type Wrapper interface { + Wrap(Block) Block +} + +// Block wraps interaction with decompressed BGZF data blocks. +type Block interface { + // Base returns the file offset of the start of + // the gzip member from which the Block data was + // decompressed. + Base() int64 + + io.Reader + + // Used returns whether one or more bytes have + // been read from the Block. + Used() bool + + // header returns the gzip.Header of the gzip member + // from which the Block data was decompressed. + header() gzip.Header + + // isMagicBlock returns whether the Block is a BGZF + // magic EOF marker block. + isMagicBlock() bool + + // ownedBy returns whether the Block is owned by + // the given Reader. + ownedBy(*Reader) bool + + // setOwner changes the owner to the given Reader, + // reseting other data to its zero state. + setOwner(*Reader) + + // hasData returns whether the Block has read data. + hasData() bool + + // The following are unexported equivalents + // of the io interfaces. seek is limited to + // the file origin offset case and does not + // return the new offset. + seek(offset int64) error + readFrom(io.ReadCloser) error + + // len returns the number of remaining + // bytes that can be read from the Block. + len() int + + // setBase sets the file offset of the start + // and of the gzip member that the Block data + // was decompressed from. + setBase(int64) + + // NextBase returns the expected position of the next + // BGZF block. It returns -1 if the Block is not valid. + NextBase() int64 + + // setHeader sets the file header of of the gzip + // member that the Block data was decompressed from. + setHeader(gzip.Header) + + // txOffset returns the current vitual offset. + txOffset() Offset +} + +type block struct { + owner *Reader + used bool + + base int64 + h gzip.Header + magic bool + + offset Offset + + buf *bytes.Reader + data [MaxBlockSize]byte +} + +func (b *block) Base() int64 { return b.base } + +func (b *block) Used() bool { return b.used } + +func (b *block) Read(p []byte) (int, error) { + n, err := b.buf.Read(p) + b.offset.Block += uint16(n) + if n > 0 { + b.used = true + } + return n, err +} + +func (b *block) readFrom(r io.ReadCloser) error { + o := b.owner + b.owner = nil + buf := bytes.NewBuffer(b.data[:0]) + _, err := io.Copy(buf, r) + if err != nil { + return err + } + b.buf = bytes.NewReader(buf.Bytes()) + b.owner = o + b.magic = b.magic && b.len() == 0 + return r.Close() +} + +func (b *block) seek(offset int64) error { + _, err := b.buf.Seek(offset, 0) + if err == nil { + b.offset.Block = uint16(offset) + } + return err +} + +func (b *block) len() int { + if b.buf == nil { + return 0 + } + return b.buf.Len() +} + +func (b *block) setBase(n int64) { + b.base = n + b.offset = Offset{File: n} +} + +func (b *block) NextBase() int64 { + size := int64(expectedMemberSize(b.h)) + if size == -1 { + return -1 + } + return b.base + size +} + +func (b *block) setHeader(h gzip.Header) { + b.h = h + b.magic = h.OS == 0xff && + h.ModTime.Equal(unixEpoch) && + h.Name == "" && + h.Comment == "" && + bytes.Equal(h.Extra, []byte("BC\x02\x00\x1b\x00")) +} + +func (b *block) header() gzip.Header { return b.h } + +func (b *block) isMagicBlock() bool { return b.magic } + +func (b *block) setOwner(r *Reader) { + b.owner = r + b.used = false + b.base = -1 + b.h = gzip.Header{} + b.offset = Offset{} + b.buf = nil +} + +func (b *block) ownedBy(r *Reader) bool { return b.owner == r } + +func (b *block) hasData() bool { return b.buf != nil } + +func (b *block) txOffset() Offset { return b.offset } diff --git a/vendor/github.com/biogo/hts/bgzf/reader.go b/vendor/github.com/biogo/hts/bgzf/reader.go new file mode 100644 index 000000000..af52b8ae3 --- /dev/null +++ b/vendor/github.com/biogo/hts/bgzf/reader.go @@ -0,0 +1,703 @@ +// Copyright ©2012 The bíogo Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package bgzf + +import ( + "bufio" + "bytes" + "compress/flate" + "compress/gzip" + "io" + "runtime" + "sync" +) + +// countReader wraps flate.Reader, adding support for querying current offset. +type countReader struct { + // Underlying Reader. + fr flate.Reader + + // Offset within the underlying reader. + off int64 +} + +// newCountReader returns a new countReader. +func newCountReader(r io.Reader) *countReader { + switch r := r.(type) { + case *countReader: + panic("bgzf: illegal use of internal type") + case flate.Reader: + return &countReader{fr: r} + default: + return &countReader{fr: bufio.NewReader(r)} + } +} + +// Read is required to satisfy flate.Reader. +func (r *countReader) Read(p []byte) (int, error) { + n, err := r.fr.Read(p) + r.off += int64(n) + return n, err +} + +// ReadByte is required to satisfy flate.Reader. +func (r *countReader) ReadByte() (byte, error) { + b, err := r.fr.ReadByte() + if err == nil { + r.off++ + } + return b, err +} + +// offset returns the current offset in the underlying reader. +func (r *countReader) offset() int64 { return r.off } + +// seek moves the countReader to the specified offset using rs as the +// underlying reader. +func (r *countReader) seek(rs io.ReadSeeker, off int64) error { + _, err := rs.Seek(off, 0) + if err != nil { + return err + } + + type reseter interface { + Reset(io.Reader) + } + switch cr := r.fr.(type) { + case reseter: + cr.Reset(rs) + default: + r.fr = newCountReader(rs) + } + r.off = off + + return nil +} + +// buffer is a flate.Reader used by a decompressor to store read-ahead data. +type buffer struct { + // Buffered compressed data from read ahead. + off int // Current position in buffered data. + size int // Total size of buffered data. + data [MaxBlockSize]byte +} + +// Read provides the flate.Decompressor Read method. +func (r *buffer) Read(b []byte) (int, error) { + if r.off >= r.size { + return 0, io.EOF + } + if n := r.size - r.off; len(b) > n { + b = b[:n] + } + n := copy(b, r.data[r.off:]) + r.off += n + return n, nil +} + +// ReadByte provides the flate.Decompressor ReadByte method. +func (r *buffer) ReadByte() (byte, error) { + if r.off == r.size { + return 0, io.EOF + } + b := r.data[r.off] + r.off++ + return b, nil +} + +// reset makes the buffer available to store data. +func (r *buffer) reset() { r.size = 0 } + +// hasData returns whether the buffer has any data buffered. +func (r *buffer) hasData() bool { return r.size != 0 } + +// readLimited reads n bytes into the buffer from the given source. +func (r *buffer) readLimited(n int, src *countReader) error { + if r.hasData() { + panic("bgzf: read into non-empty buffer") + } + r.off = 0 + var err error + r.size, err = io.ReadFull(src, r.data[:n]) + return err +} + +// equals returns a boolean indicating the equality between +// the buffered data and the given byte slice. +func (r *buffer) equals(b []byte) bool { return bytes.Equal(r.data[:r.size], b) } + +// decompressor is a gzip member decompressor worker. +type decompressor struct { + owner *Reader + + gz gzip.Reader + + cr *countReader + + // Current block size. + blockSize int + + // Buffered compressed data from read ahead. + buf buffer + + // Decompressed data. + wg sync.WaitGroup + blk Block + + err error +} + +// Read provides the Read method for the decompressor's gzip.Reader. +func (d *decompressor) Read(b []byte) (int, error) { + if d.buf.hasData() { + return d.buf.Read(b) + } + return d.cr.Read(b) +} + +// ReadByte provides the ReadByte method for the decompressor's gzip.Reader. +func (d *decompressor) ReadByte() (byte, error) { + if d.buf.hasData() { + return d.buf.ReadByte() + } + return d.cr.ReadByte() +} + +// lazyBlock conditionally creates a ready to use Block. +func (d *decompressor) lazyBlock() { + if d.blk == nil { + if w, ok := d.owner.cache.(Wrapper); ok { + d.blk = w.Wrap(&block{owner: d.owner}) + } else { + d.blk = &block{owner: d.owner} + } + return + } + if !d.blk.ownedBy(d.owner) { + d.blk.setOwner(d.owner) + } +} + +// acquireHead gains the read head from the decompressor's owner. +func (d *decompressor) acquireHead() { + d.wg.Add(1) + d.cr = <-d.owner.head +} + +// releaseHead releases the read head back to the decompressor's owner. +func (d *decompressor) releaseHead() { + d.owner.head <- d.cr + d.cr = nil // Defensively zero the reader. +} + +// wait waits for the current member to be decompressed or fail, and returns +// the resulting error state. +func (d *decompressor) wait() (Block, error) { + d.wg.Wait() + blk := d.blk + d.blk = nil + return blk, d.err +} + +// using sets the Block for the decompressor to work with. +func (d *decompressor) using(b Block) *decompressor { d.blk = b; return d } + +// nextBlockAt makes the decompressor ready for reading decompressed data +// from its Block. It checks if there is a cached Block for the nextBase, +// otherwise it seeks to the correct location if decompressor is not +// correctly positioned, and then reads the compressed data and fills +// the decompressed Block. +// After nextBlockAt returns without error, the decompressor's Block +// holds a valid gzip.Header and base offset. +func (d *decompressor) nextBlockAt(off int64, rs io.ReadSeeker) *decompressor { + d.err = nil + for { + exists, next := d.owner.cacheHasBlockFor(off) + if !exists { + break + } + off = next + } + + d.lazyBlock() + + d.acquireHead() + defer d.releaseHead() + + if d.cr.offset() != off { + if rs == nil { + // It should not be possible for the expected next block base + // to be out of register with the count reader unless Seek + // has been called, so we know the base reader must be an + // io.ReadSeeker. + var ok bool + rs, ok = d.owner.r.(io.ReadSeeker) + if !ok { + panic("bgzf: unexpected offset without seek") + } + } + d.err = d.cr.seek(rs, off) + if d.err != nil { + d.wg.Done() + return d + } + } + + d.blk.setBase(d.cr.offset()) + d.err = d.readMember() + if d.err != nil { + d.wg.Done() + return d + } + d.blk.setHeader(d.gz.Header) + d.gz.Header = gzip.Header{} // Prevent retention of header field in next use. + + // Decompress data into the decompressor's Block. + go func() { + d.err = d.blk.readFrom(&d.gz) + d.wg.Done() + }() + + return d +} + +// expectedMemberSize returns the size of the BGZF conformant gzip member. +// It returns -1 if no BGZF block size field is found. +func expectedMemberSize(h gzip.Header) int { + i := bytes.Index(h.Extra, bgzfExtraPrefix) + if i < 0 || i+5 >= len(h.Extra) { + return -1 + } + return (int(h.Extra[i+4]) | int(h.Extra[i+5])<<8) + 1 +} + +// readMember buffers the gzip member starting the current decompressor offset. +func (d *decompressor) readMember() error { + // Set the decompressor to Read from the underlying flate.Reader + // and mark the starting offset from which the underlying reader + // was used. + d.buf.reset() + mark := d.cr.offset() + + err := d.gz.Reset(d) + if err != nil { + d.blockSize = -1 + return err + } + + d.blockSize = expectedMemberSize(d.gz.Header) + if d.blockSize < 0 { + return ErrNoBlockSize + } + skipped := int(d.cr.offset() - mark) + + // Read compressed data into the decompressor buffer until the + // underlying flate.Reader is positioned at the end of the gzip + // member in which the readMember call was made. + return d.buf.readLimited(d.blockSize-skipped, d.cr) +} + +// Offset is a BGZF virtual offset. +type Offset struct { + File int64 + Block uint16 +} + +// Chunk is a region of a BGZF file. +type Chunk struct { + Begin Offset + End Offset +} + +// Reader implements BGZF blocked gzip decompression. +type Reader struct { + gzip.Header + r io.Reader + + // head serialises access to the underlying + // io.Reader. + head chan *countReader + + // lastChunk is the virtual file offset + // interval of the last successful read + // or seek operation. + lastChunk Chunk + + // Blocked specifies the behaviour of the + // Reader at the end of a BGZF member. + // If the Reader is Blocked, a Read that + // reaches the end of a BGZF block will + // return io.EOF. This error is not sticky, + // so a subsequent Read will progress to + // the next block if it is available. + Blocked bool + + // Non-concurrent work decompressor. + dec *decompressor + + // Concurrent work fields. + waiting chan *decompressor + working chan *decompressor + control chan int64 + + current Block + + // cache is the Reader block cache. If Cache is not nil, + // the cache is queried for blocks before an attempt to + // read from the underlying io.Reader. + mu sync.RWMutex + cache Cache + + err error +} + +// NewReader returns a new BGZF reader. +// +// The number of concurrent read decompressors is specified by rd. +// If rd is 0, GOMAXPROCS concurrent will be created. The returned +// Reader should be closed after use to avoid leaking resources. +func NewReader(r io.Reader, rd int) (*Reader, error) { + if rd == 0 { + rd = runtime.GOMAXPROCS(0) + } + bg := &Reader{ + r: r, + + head: make(chan *countReader, 1), + } + bg.head <- newCountReader(r) + + // Make work loop control structures. + if rd > 1 { + bg.waiting = make(chan *decompressor, rd) + bg.working = make(chan *decompressor, rd) + bg.control = make(chan int64, 1) + for ; rd > 1; rd-- { + bg.waiting <- &decompressor{owner: bg} + } + } + + // Read the first block now so we can fail before + // the first Read call if there is a problem. + bg.dec = &decompressor{owner: bg} + blk, err := bg.dec.nextBlockAt(0, nil).wait() + if err != nil { + return nil, err + } + bg.current = blk + bg.Header = bg.current.header() + + // Set up work loop if rd was > 1. + if bg.control != nil { + bg.waiting <- bg.dec + bg.dec = nil + next := blk.NextBase() + go func() { + defer func() { + bg.mu.Lock() + bg.cache = nil + bg.mu.Unlock() + }() + for dec := range bg.waiting { + var open bool + if next < 0 { + next, open = <-bg.control + if !open { + return + } + } else { + select { + case next, open = <-bg.control: + if !open { + return + } + default: + } + } + dec.nextBlockAt(next, nil) + next = dec.blk.NextBase() + bg.working <- dec + } + }() + } + + return bg, nil +} + +// SetCache sets the cache to be used by the Reader. +func (bg *Reader) SetCache(c Cache) { + bg.mu.Lock() + bg.cache = c + bg.mu.Unlock() +} + +// Seek performs a seek operation to the given virtual offset. +func (bg *Reader) Seek(off Offset) error { + rs, ok := bg.r.(io.ReadSeeker) + if !ok { + return ErrNotASeeker + } + + if off.File != bg.current.Base() || !bg.current.hasData() { + ok := bg.cacheSwap(off.File) + if !ok { + var dec *decompressor + if bg.dec != nil { + dec = bg.dec + } else { + select { + case dec = <-bg.waiting: + case dec = <-bg.working: + blk, err := dec.wait() + if err == nil { + bg.keep(blk) + } + } + } + bg.current, bg.err = dec. + using(bg.current). + nextBlockAt(off.File, rs). + wait() + if bg.dec == nil { + select { + case <-bg.control: + default: + } + bg.control <- bg.current.NextBase() + bg.waiting <- dec + } + bg.Header = bg.current.header() + if bg.err != nil { + return bg.err + } + } + } + + bg.err = bg.current.seek(int64(off.Block)) + if bg.err == nil { + bg.lastChunk = Chunk{Begin: off, End: off} + } + + return bg.err +} + +// LastChunk returns the region of the BGZF file read by the last read +// operation or the resulting virtual offset of the last successful +// seek operation. +func (bg *Reader) LastChunk() Chunk { return bg.lastChunk } + +// BlockLen returns the number of bytes remaining to be read from the +// current BGZF block. +func (bg *Reader) BlockLen() int { return bg.current.len() } + +// Close closes the reader and releases resources. +func (bg *Reader) Close() error { + if bg.control != nil { + close(bg.control) + close(bg.waiting) + } + if bg.err == io.EOF { + return nil + } + return bg.err +} + +// Read implements the io.Reader interface. +func (bg *Reader) Read(p []byte) (int, error) { + if bg.err != nil { + return 0, bg.err + } + + // Discard leading empty blocks. This is an indexing + // optimisation to avoid retaining useless members + // in a BAI/CSI. + for bg.current.len() == 0 { + bg.err = bg.nextBlock() + if bg.err != nil { + return 0, bg.err + } + } + + bg.lastChunk.Begin = bg.current.txOffset() + + var n int + for n < len(p) && bg.err == nil { + var _n int + _n, bg.err = bg.current.Read(p[n:]) + n += _n + if bg.err == io.EOF { + if n == len(p) { + bg.err = nil + break + } + + if bg.Blocked { + bg.err = nil + bg.lastChunk.End = bg.current.txOffset() + return n, io.EOF + } + + bg.err = bg.nextBlock() + if bg.err != nil { + break + } + } + } + + bg.lastChunk.End = bg.current.txOffset() + return n, bg.err +} + +// nextBlock swaps the current decompressed block for the next +// in the stream. If the block is available from the cache +// no additional work is done, otherwise a decompressor is +// used or waited on. +func (bg *Reader) nextBlock() error { + base := bg.current.NextBase() + ok := bg.cacheSwap(base) + if ok { + bg.Header = bg.current.header() + return nil + } + + var err error + if bg.dec != nil { + bg.dec.using(bg.current).nextBlockAt(base, nil) + bg.current, err = bg.dec.wait() + } else { + var ok bool + for i := 0; i < cap(bg.working); i++ { + dec := <-bg.working + bg.current, err = dec.wait() + bg.waiting <- dec + if bg.current.Base() == base { + ok = true + break + } + if err == nil { + bg.keep(bg.current) + bg.current = nil + } + } + if !ok { + panic("bgzf: unexpected block") + } + } + if err != nil { + return err + } + + // Only set header if there was no error. + h := bg.current.header() + if bg.current.isMagicBlock() { + // TODO(kortschak): Do this more carefully. It may be that + // someone actually has extra data in this field that we are + // clobbering. + bg.Header.Extra = h.Extra + } else { + bg.Header = h + } + + return nil +} + +// cacheSwap attempts to swap the current Block for a cached Block +// for the given base offset. It returns true if successful. +func (bg *Reader) cacheSwap(base int64) bool { + bg.mu.RLock() + defer bg.mu.RUnlock() + if bg.cache == nil { + return false + } + + blk, err := bg.cachedBlockFor(base) + if err != nil { + return false + } + if blk != nil { + // TODO(kortschak): Under some conditions, e.g. FIFO + // cache we will be discarding a non-nil evicted Block. + // Consider retaining these in a sync.Pool. + bg.cachePut(bg.current) + bg.current = blk + return true + } + var retained bool + bg.current, retained = bg.cachePut(bg.current) + if retained { + bg.current = nil + } + return false +} + +// cacheHasBlockFor returns whether the Reader's cache has a block +// for the given base offset. If the requested Block exists, the base +// offset of the following Block is returned. +func (bg *Reader) cacheHasBlockFor(base int64) (exists bool, next int64) { + bg.mu.RLock() + defer bg.mu.RUnlock() + if bg.cache == nil { + return false, -1 + } + return bg.cache.Peek(base) +} + +// cachedBlockFor returns a non-nil Block if the Reader has access to a +// cache and the cache holds the block with the given base and the +// correct owner, otherwise it returns nil. If the Block's owner is not +// correct, or the Block cannot seek to the start of its data, a non-nil +// error is returned. +func (bg *Reader) cachedBlockFor(base int64) (Block, error) { + blk := bg.cache.Get(base) + if blk != nil { + if !blk.ownedBy(bg) { + return nil, ErrContaminatedCache + } + err := blk.seek(0) + if err != nil { + return nil, err + } + } + return blk, nil +} + +// cachePut puts the given Block into the cache if it exists, it returns +// the Block that was evicted or b if it was not retained, and whether +// the Block was retained by the cache. +func (bg *Reader) cachePut(b Block) (evicted Block, retained bool) { + if b == nil || !b.hasData() { + return b, false + } + return bg.cache.Put(b) +} + +// keep puts the given Block into the cache if it exists. +func (bg *Reader) keep(b Block) { + if b == nil || !b.hasData() { + return + } + bg.mu.RLock() + defer bg.mu.RUnlock() + if bg.cache != nil { + bg.cache.Put(b) + } +} + +// Begin returns a Tx that starts at the current virtual offset. +func (bg *Reader) Begin() Tx { return Tx{begin: bg.lastChunk.Begin, r: bg} } + +// Tx represents a multi-read transaction. +type Tx struct { + begin Offset + r *Reader +} + +// End returns the Chunk spanning the transaction. After return the Tx is +// no longer valid. +func (t *Tx) End() Chunk { + c := Chunk{Begin: t.begin, End: t.r.lastChunk.End} + t.r = nil + return c +} diff --git a/vendor/github.com/biogo/hts/bgzf/writer.go b/vendor/github.com/biogo/hts/bgzf/writer.go new file mode 100644 index 000000000..8d8991252 --- /dev/null +++ b/vendor/github.com/biogo/hts/bgzf/writer.go @@ -0,0 +1,282 @@ +// Copyright ©2012 The bíogo Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package bgzf + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + "sync" +) + +// Writer implements BGZF blocked gzip compression. +type Writer struct { + gzip.Header + w io.Writer + + active *compressor + + queue chan *compressor + qwg sync.WaitGroup + + waiting chan *compressor + + wg sync.WaitGroup + + closed bool + + m sync.Mutex + err error +} + +// NewWriter returns a new Writer. Writes to the returned writer are +// compressed and written to w. +// +// The number of concurrent write compressors is specified by wc. +func NewWriter(w io.Writer, wc int) *Writer { + bg, _ := NewWriterLevel(w, gzip.DefaultCompression, wc) + return bg +} + +// NewWriterLevel returns a new Writer using the specified compression level +// instead of gzip.DefaultCompression. Allowable level options are integer +// values between between gzip.BestSpeed and gzip.BestCompression inclusive. +// +// The number of concurrent write compressors is specified by wc. +func NewWriterLevel(w io.Writer, level, wc int) (*Writer, error) { + if level < gzip.DefaultCompression || level > gzip.BestCompression { + return nil, fmt.Errorf("bgzf: invalid compression level: %d", level) + } + wc++ // We count one for the active compressor. + if wc < 2 { + wc = 2 + } + bg := &Writer{ + w: w, + waiting: make(chan *compressor, wc), + queue: make(chan *compressor, wc), + } + + c := make([]compressor, wc) + for i := range c { + c[i].Header = &bg.Header + c[i].level = level + c[i].waiting = bg.waiting + c[i].flush = make(chan *compressor, 1) + c[i].qwg = &bg.qwg + bg.waiting <- &c[i] + } + bg.active = <-bg.waiting + + bg.wg.Add(1) + go func() { + defer bg.wg.Done() + for qw := range bg.queue { + if !writeOK(bg, <-qw.flush) { + break + } + } + }() + + return bg, nil +} + +func writeOK(bg *Writer, c *compressor) bool { + defer func() { bg.waiting <- c }() + + if c.err != nil { + bg.setErr(c.err) + return false + } + if c.buf.Len() == 0 { + return true + } + + _, err := io.Copy(bg.w, &c.buf) + bg.qwg.Done() + if err != nil { + bg.setErr(err) + return false + } + c.next = 0 + + return true +} + +type compressor struct { + *gzip.Header + gz *gzip.Writer + level int + + next int + block [BlockSize]byte + buf bytes.Buffer + + flush chan *compressor + qwg *sync.WaitGroup + + waiting chan *compressor + + err error +} + +func (c *compressor) writeBlock() { + defer func() { c.flush <- c }() + + if c.gz == nil { + c.gz, c.err = gzip.NewWriterLevel(&c.buf, c.level) + if c.err != nil { + return + } + } else { + c.gz.Reset(&c.buf) + } + c.gz.Header = gzip.Header{ + Comment: c.Comment, + Extra: append([]byte(bgzfExtra), c.Extra...), + ModTime: c.ModTime, + Name: c.Name, + OS: c.OS, + } + + _, c.err = c.gz.Write(c.block[:c.next]) + if c.err != nil { + return + } + c.err = c.gz.Close() + if c.err != nil { + return + } + c.next = 0 + + b := c.buf.Bytes() + i := bytes.Index(b, bgzfExtraPrefix) + if i < 0 { + c.err = gzip.ErrHeader + return + } + size := len(b) - 1 + if size >= MaxBlockSize { + c.err = ErrBlockOverflow + return + } + b[i+4], b[i+5] = byte(size), byte(size>>8) +} + +// Next returns the index of the start of the next write within the +// decompressed data block. +func (bg *Writer) Next() (int, error) { + if bg.closed { + return 0, ErrClosed + } + if err := bg.Error(); err != nil { + return 0, err + } + + return bg.active.next, nil +} + +// Write writes the compressed form of b to the underlying io.Writer. +// Decompressed data blocks are limited to BlockSize, so individual +// byte slices may span block boundaries, however the Writer attempts +// to keep each write within a single data block. +func (bg *Writer) Write(b []byte) (int, error) { + if bg.closed { + return 0, ErrClosed + } + err := bg.Error() + if err != nil { + return 0, err + } + + c := bg.active + var n int + for ; len(b) > 0 && err == nil; err = bg.Error() { + var _n int + if c.next == 0 || c.next+len(b) <= len(c.block) { + _n = copy(c.block[c.next:], b) + b = b[_n:] + c.next += _n + n += _n + } + + if c.next == len(c.block) || _n == 0 { + bg.queue <- c + bg.qwg.Add(1) + go c.writeBlock() + c = <-bg.waiting + } + } + bg.active = c + + return n, bg.Error() +} + +// Flush writes unwritten data to the underlying io.Writer. Flush does not block. +func (bg *Writer) Flush() error { + if bg.closed { + return ErrClosed + } + if err := bg.Error(); err != nil { + return err + } + + if bg.active.next == 0 { + return nil + } + + var c *compressor + c, bg.active = bg.active, <-bg.waiting + bg.queue <- c + bg.qwg.Add(1) + go c.writeBlock() + + return bg.Error() +} + +// Wait waits for all pending writes to complete and returns the subsequent +// error state of the Writer. +func (bg *Writer) Wait() error { + if err := bg.Error(); err != nil { + return err + } + bg.qwg.Wait() + return bg.Error() +} + +// Error returns the error state of the Writer. +func (bg *Writer) Error() error { + bg.m.Lock() + defer bg.m.Unlock() + return bg.err +} + +func (bg *Writer) setErr(err error) { + bg.m.Lock() + defer bg.m.Unlock() + if bg.err == nil { + bg.err = err + } +} + +// Close closes the Writer, waiting for any pending writes before returning +// the final error of the Writer. +func (bg *Writer) Close() error { + if !bg.closed { + c := bg.active + bg.queue <- c + bg.qwg.Add(1) + <-bg.waiting + c.writeBlock() + bg.closed = true + close(bg.queue) + bg.wg.Wait() + if bg.err == nil { + _, bg.err = bg.w.Write([]byte(magicBlock)) + } + } + return bg.err +} diff --git a/website/source/docs/post-processors/compress.html.md b/website/source/docs/post-processors/compress.html.md index 1b36774fc..c4ec5193f 100644 --- a/website/source/docs/post-processors/compress.html.md +++ b/website/source/docs/post-processors/compress.html.md @@ -31,6 +31,9 @@ you will need to specify the `output` option. you are executing multiple builders in parallel you should make sure `output` is unique for each one. For example `packer_{{.BuildName}}.zip`. +- `format` (string) - Disable archive format autodetection and use provided + string. + - `compression_level` (integer) - Specify the compression level, for algorithms that support it, from 1 through 9 inclusive. Typically higher compression levels take longer but produce smaller files. Defaults to `6`