520 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			520 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
// Copyright 2010 The Go Authors. All rights reserved.
 | 
						|
// Use of this source code is governed by a BSD-style
 | 
						|
// license that can be found in the LICENSE file.
 | 
						|
 | 
						|
package pgzip
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"hash"
 | 
						|
	"hash/crc32"
 | 
						|
	"io"
 | 
						|
	"runtime"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/klauspost/compress/flate"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	defaultBlockSize = 1 << 20
 | 
						|
	tailSize         = 16384
 | 
						|
	defaultBlocks    = 4
 | 
						|
)
 | 
						|
 | 
						|
// These constants are copied from the flate package, so that code that imports
 | 
						|
// "compress/gzip" does not also have to import "compress/flate".
 | 
						|
const (
 | 
						|
	NoCompression       = flate.NoCompression
 | 
						|
	BestSpeed           = flate.BestSpeed
 | 
						|
	BestCompression     = flate.BestCompression
 | 
						|
	DefaultCompression  = flate.DefaultCompression
 | 
						|
	ConstantCompression = flate.ConstantCompression
 | 
						|
	HuffmanOnly         = flate.HuffmanOnly
 | 
						|
)
 | 
						|
 | 
						|
// A Writer is an io.WriteCloser.
 | 
						|
// Writes to a Writer are compressed and written to w.
 | 
						|
type Writer struct {
 | 
						|
	Header
 | 
						|
	w             io.Writer
 | 
						|
	level         int
 | 
						|
	wroteHeader   bool
 | 
						|
	blockSize     int
 | 
						|
	blocks        int
 | 
						|
	currentBuffer []byte
 | 
						|
	prevTail      []byte
 | 
						|
	digest        hash.Hash32
 | 
						|
	size          int
 | 
						|
	closed        bool
 | 
						|
	buf           [10]byte
 | 
						|
	errMu         sync.RWMutex
 | 
						|
	err           error
 | 
						|
	pushedErr     chan struct{}
 | 
						|
	results       chan result
 | 
						|
	dictFlatePool sync.Pool
 | 
						|
	dstPool       sync.Pool
 | 
						|
	wg            sync.WaitGroup
 | 
						|
}
 | 
						|
 | 
						|
type result struct {
 | 
						|
	result        chan []byte
 | 
						|
	notifyWritten chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
// Use SetConcurrency to finetune the concurrency level if needed.
 | 
						|
//
 | 
						|
// With this you can control the approximate size of your blocks,
 | 
						|
// as well as how many you want to be processing in parallel.
 | 
						|
//
 | 
						|
// Default values for this is SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)),
 | 
						|
// meaning blocks are split at 1 MB and up to the number of CPU threads
 | 
						|
// can be processing at once before the writer blocks.
 | 
						|
func (z *Writer) SetConcurrency(blockSize, blocks int) error {
 | 
						|
	if blockSize <= tailSize {
 | 
						|
		return fmt.Errorf("gzip: block size cannot be less than or equal to %d", tailSize)
 | 
						|
	}
 | 
						|
	if blocks <= 0 {
 | 
						|
		return errors.New("gzip: blocks cannot be zero or less")
 | 
						|
	}
 | 
						|
	if blockSize == z.blockSize && blocks == z.blocks {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	z.blockSize = blockSize
 | 
						|
	z.results = make(chan result, blocks)
 | 
						|
	z.blocks = blocks
 | 
						|
	z.dstPool.New = func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// NewWriter returns a new Writer.
 | 
						|
// Writes to the returned writer are compressed and written to w.
 | 
						|
//
 | 
						|
// It is the caller's responsibility to call Close on the WriteCloser when done.
 | 
						|
// Writes may be buffered and not flushed until Close.
 | 
						|
//
 | 
						|
// Callers that wish to set the fields in Writer.Header must do so before
 | 
						|
// the first call to Write or Close. The Comment and Name header fields are
 | 
						|
// UTF-8 strings in Go, but the underlying format requires NUL-terminated ISO
 | 
						|
// 8859-1 (Latin-1). NUL or non-Latin-1 runes in those strings will lead to an
 | 
						|
// error on Write.
 | 
						|
func NewWriter(w io.Writer) *Writer {
 | 
						|
	z, _ := NewWriterLevel(w, DefaultCompression)
 | 
						|
	return z
 | 
						|
}
 | 
						|
 | 
						|
// NewWriterLevel is like NewWriter but specifies the compression level instead
 | 
						|
// of assuming DefaultCompression.
 | 
						|
//
 | 
						|
// The compression level can be DefaultCompression, NoCompression, or any
 | 
						|
// integer value between BestSpeed and BestCompression inclusive. The error
 | 
						|
// returned will be nil if the level is valid.
 | 
						|
func NewWriterLevel(w io.Writer, level int) (*Writer, error) {
 | 
						|
	if level < ConstantCompression || level > BestCompression {
 | 
						|
		return nil, fmt.Errorf("gzip: invalid compression level: %d", level)
 | 
						|
	}
 | 
						|
	z := new(Writer)
 | 
						|
	z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
 | 
						|
	z.init(w, level)
 | 
						|
	return z, nil
 | 
						|
}
 | 
						|
 | 
						|
// This function must be used by goroutines to set an
 | 
						|
// error condition, since z.err access is restricted
 | 
						|
// to the callers goruotine.
 | 
						|
func (z *Writer) pushError(err error) {
 | 
						|
	z.errMu.Lock()
 | 
						|
	if z.err != nil {
 | 
						|
		z.errMu.Unlock()
 | 
						|
		return
 | 
						|
	}
 | 
						|
	z.err = err
 | 
						|
	close(z.pushedErr)
 | 
						|
	z.errMu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (z *Writer) init(w io.Writer, level int) {
 | 
						|
	z.wg.Wait()
 | 
						|
	digest := z.digest
 | 
						|
	if digest != nil {
 | 
						|
		digest.Reset()
 | 
						|
	} else {
 | 
						|
		digest = crc32.NewIEEE()
 | 
						|
	}
 | 
						|
	z.Header = Header{OS: 255}
 | 
						|
	z.w = w
 | 
						|
	z.level = level
 | 
						|
	z.digest = digest
 | 
						|
	z.pushedErr = make(chan struct{}, 0)
 | 
						|
	z.results = make(chan result, z.blocks)
 | 
						|
	z.err = nil
 | 
						|
	z.closed = false
 | 
						|
	z.Comment = ""
 | 
						|
	z.Extra = nil
 | 
						|
	z.ModTime = time.Time{}
 | 
						|
	z.wroteHeader = false
 | 
						|
	z.currentBuffer = nil
 | 
						|
	z.buf = [10]byte{}
 | 
						|
	z.prevTail = nil
 | 
						|
	z.size = 0
 | 
						|
	if z.dictFlatePool.New == nil {
 | 
						|
		z.dictFlatePool.New = func() interface{} {
 | 
						|
			f, _ := flate.NewWriterDict(w, level, nil)
 | 
						|
			return f
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Reset discards the Writer z's state and makes it equivalent to the
 | 
						|
// result of its original state from NewWriter or NewWriterLevel, but
 | 
						|
// writing to w instead. This permits reusing a Writer rather than
 | 
						|
// allocating a new one.
 | 
						|
func (z *Writer) Reset(w io.Writer) {
 | 
						|
	if z.results != nil && !z.closed {
 | 
						|
		close(z.results)
 | 
						|
	}
 | 
						|
	z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
 | 
						|
	z.init(w, z.level)
 | 
						|
}
 | 
						|
 | 
						|
// GZIP (RFC 1952) is little-endian, unlike ZLIB (RFC 1950).
 | 
						|
func put2(p []byte, v uint16) {
 | 
						|
	p[0] = uint8(v >> 0)
 | 
						|
	p[1] = uint8(v >> 8)
 | 
						|
}
 | 
						|
 | 
						|
func put4(p []byte, v uint32) {
 | 
						|
	p[0] = uint8(v >> 0)
 | 
						|
	p[1] = uint8(v >> 8)
 | 
						|
	p[2] = uint8(v >> 16)
 | 
						|
	p[3] = uint8(v >> 24)
 | 
						|
}
 | 
						|
 | 
						|
// writeBytes writes a length-prefixed byte slice to z.w.
 | 
						|
func (z *Writer) writeBytes(b []byte) error {
 | 
						|
	if len(b) > 0xffff {
 | 
						|
		return errors.New("gzip.Write: Extra data is too large")
 | 
						|
	}
 | 
						|
	put2(z.buf[0:2], uint16(len(b)))
 | 
						|
	_, err := z.w.Write(z.buf[0:2])
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	_, err = z.w.Write(b)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// writeString writes a UTF-8 string s in GZIP's format to z.w.
 | 
						|
// GZIP (RFC 1952) specifies that strings are NUL-terminated ISO 8859-1 (Latin-1).
 | 
						|
func (z *Writer) writeString(s string) (err error) {
 | 
						|
	// GZIP stores Latin-1 strings; error if non-Latin-1; convert if non-ASCII.
 | 
						|
	needconv := false
 | 
						|
	for _, v := range s {
 | 
						|
		if v == 0 || v > 0xff {
 | 
						|
			return errors.New("gzip.Write: non-Latin-1 header string")
 | 
						|
		}
 | 
						|
		if v > 0x7f {
 | 
						|
			needconv = true
 | 
						|
		}
 | 
						|
	}
 | 
						|
	if needconv {
 | 
						|
		b := make([]byte, 0, len(s))
 | 
						|
		for _, v := range s {
 | 
						|
			b = append(b, byte(v))
 | 
						|
		}
 | 
						|
		_, err = z.w.Write(b)
 | 
						|
	} else {
 | 
						|
		_, err = io.WriteString(z.w, s)
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	// GZIP strings are NUL-terminated.
 | 
						|
	z.buf[0] = 0
 | 
						|
	_, err = z.w.Write(z.buf[0:1])
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// compressCurrent will compress the data currently buffered
 | 
						|
// This should only be called from the main writer/flush/closer
 | 
						|
func (z *Writer) compressCurrent(flush bool) {
 | 
						|
	c := z.currentBuffer
 | 
						|
	if len(c) > z.blockSize {
 | 
						|
		// This can never happen through the public interface.
 | 
						|
		panic("len(z.currentBuffer) > z.blockSize (most likely due to concurrent Write race)")
 | 
						|
	}
 | 
						|
 | 
						|
	r := result{}
 | 
						|
	r.result = make(chan []byte, 1)
 | 
						|
	r.notifyWritten = make(chan struct{}, 0)
 | 
						|
	// Reserve a result slot
 | 
						|
	select {
 | 
						|
	case z.results <- r:
 | 
						|
	case <-z.pushedErr:
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	z.wg.Add(1)
 | 
						|
	tail := z.prevTail
 | 
						|
	if len(c) > tailSize {
 | 
						|
		buf := z.dstPool.Get().([]byte) // Put in .compressBlock
 | 
						|
		// Copy tail from current buffer before handing the buffer over to the
 | 
						|
		// compressBlock goroutine.
 | 
						|
		buf = append(buf[:0], c[len(c)-tailSize:]...)
 | 
						|
		z.prevTail = buf
 | 
						|
	} else {
 | 
						|
		z.prevTail = nil
 | 
						|
	}
 | 
						|
	go z.compressBlock(c, tail, r, z.closed)
 | 
						|
 | 
						|
	z.currentBuffer = z.dstPool.Get().([]byte) // Put in .compressBlock
 | 
						|
	z.currentBuffer = z.currentBuffer[:0]
 | 
						|
 | 
						|
	// Wait if flushing
 | 
						|
	if flush {
 | 
						|
		<-r.notifyWritten
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Returns an error if it has been set.
 | 
						|
// Cannot be used by functions that are from internal goroutines.
 | 
						|
func (z *Writer) checkError() error {
 | 
						|
	z.errMu.RLock()
 | 
						|
	err := z.err
 | 
						|
	z.errMu.RUnlock()
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Write writes a compressed form of p to the underlying io.Writer. The
 | 
						|
// compressed bytes are not necessarily flushed to output until
 | 
						|
// the Writer is closed or Flush() is called.
 | 
						|
//
 | 
						|
// The function will return quickly, if there are unused buffers.
 | 
						|
// The sent slice (p) is copied, and the caller is free to re-use the buffer
 | 
						|
// when the function returns.
 | 
						|
//
 | 
						|
// Errors that occur during compression will be reported later, and a nil error
 | 
						|
// does not signify that the compression succeeded (since it is most likely still running)
 | 
						|
// That means that the call that returns an error may not be the call that caused it.
 | 
						|
// Only Flush and Close functions are guaranteed to return any errors up to that point.
 | 
						|
func (z *Writer) Write(p []byte) (int, error) {
 | 
						|
	if err := z.checkError(); err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	// Write the GZIP header lazily.
 | 
						|
	if !z.wroteHeader {
 | 
						|
		z.wroteHeader = true
 | 
						|
		z.buf[0] = gzipID1
 | 
						|
		z.buf[1] = gzipID2
 | 
						|
		z.buf[2] = gzipDeflate
 | 
						|
		z.buf[3] = 0
 | 
						|
		if z.Extra != nil {
 | 
						|
			z.buf[3] |= 0x04
 | 
						|
		}
 | 
						|
		if z.Name != "" {
 | 
						|
			z.buf[3] |= 0x08
 | 
						|
		}
 | 
						|
		if z.Comment != "" {
 | 
						|
			z.buf[3] |= 0x10
 | 
						|
		}
 | 
						|
		put4(z.buf[4:8], uint32(z.ModTime.Unix()))
 | 
						|
		if z.level == BestCompression {
 | 
						|
			z.buf[8] = 2
 | 
						|
		} else if z.level == BestSpeed {
 | 
						|
			z.buf[8] = 4
 | 
						|
		} else {
 | 
						|
			z.buf[8] = 0
 | 
						|
		}
 | 
						|
		z.buf[9] = z.OS
 | 
						|
		var n int
 | 
						|
		var err error
 | 
						|
		n, err = z.w.Write(z.buf[0:10])
 | 
						|
		if err != nil {
 | 
						|
			z.pushError(err)
 | 
						|
			return n, err
 | 
						|
		}
 | 
						|
		if z.Extra != nil {
 | 
						|
			err = z.writeBytes(z.Extra)
 | 
						|
			if err != nil {
 | 
						|
				z.pushError(err)
 | 
						|
				return n, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if z.Name != "" {
 | 
						|
			err = z.writeString(z.Name)
 | 
						|
			if err != nil {
 | 
						|
				z.pushError(err)
 | 
						|
				return n, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if z.Comment != "" {
 | 
						|
			err = z.writeString(z.Comment)
 | 
						|
			if err != nil {
 | 
						|
				z.pushError(err)
 | 
						|
				return n, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
		// Start receiving data from compressors
 | 
						|
		go func() {
 | 
						|
			listen := z.results
 | 
						|
			var failed bool
 | 
						|
			for {
 | 
						|
				r, ok := <-listen
 | 
						|
				// If closed, we are finished.
 | 
						|
				if !ok {
 | 
						|
					return
 | 
						|
				}
 | 
						|
				if failed {
 | 
						|
					close(r.notifyWritten)
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				buf := <-r.result
 | 
						|
				n, err := z.w.Write(buf)
 | 
						|
				if err != nil {
 | 
						|
					z.pushError(err)
 | 
						|
					close(r.notifyWritten)
 | 
						|
					failed = true
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				if n != len(buf) {
 | 
						|
					z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf)))
 | 
						|
					failed = true
 | 
						|
					close(r.notifyWritten)
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				z.dstPool.Put(buf)
 | 
						|
				close(r.notifyWritten)
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		z.currentBuffer = z.dstPool.Get().([]byte)
 | 
						|
		z.currentBuffer = z.currentBuffer[:0]
 | 
						|
	}
 | 
						|
	q := p
 | 
						|
	for len(q) > 0 {
 | 
						|
		length := len(q)
 | 
						|
		if length+len(z.currentBuffer) > z.blockSize {
 | 
						|
			length = z.blockSize - len(z.currentBuffer)
 | 
						|
		}
 | 
						|
		z.digest.Write(q[:length])
 | 
						|
		z.currentBuffer = append(z.currentBuffer, q[:length]...)
 | 
						|
		if len(z.currentBuffer) > z.blockSize {
 | 
						|
			panic("z.currentBuffer too large (most likely due to concurrent Write race)")
 | 
						|
		}
 | 
						|
		if len(z.currentBuffer) == z.blockSize {
 | 
						|
			z.compressCurrent(false)
 | 
						|
			if err := z.checkError(); err != nil {
 | 
						|
				return len(p) - len(q), err
 | 
						|
			}
 | 
						|
		}
 | 
						|
		z.size += length
 | 
						|
		q = q[length:]
 | 
						|
	}
 | 
						|
	return len(p), z.checkError()
 | 
						|
}
 | 
						|
 | 
						|
// Step 1: compresses buffer to buffer
 | 
						|
// Step 2: send writer to channel
 | 
						|
// Step 3: Close result channel to indicate we are done
 | 
						|
func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) {
 | 
						|
	defer func() {
 | 
						|
		close(r.result)
 | 
						|
		z.wg.Done()
 | 
						|
	}()
 | 
						|
	buf := z.dstPool.Get().([]byte) // Corresponding Put in .Write's result writer
 | 
						|
	dest := bytes.NewBuffer(buf[:0])
 | 
						|
 | 
						|
	compressor := z.dictFlatePool.Get().(*flate.Writer) // Put below
 | 
						|
	compressor.ResetDict(dest, prevTail)
 | 
						|
	compressor.Write(p)
 | 
						|
	z.dstPool.Put(p) // Corresponding Get in .Write and .compressCurrent
 | 
						|
 | 
						|
	err := compressor.Flush()
 | 
						|
	if err != nil {
 | 
						|
		z.pushError(err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if closed {
 | 
						|
		err = compressor.Close()
 | 
						|
		if err != nil {
 | 
						|
			z.pushError(err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
	z.dictFlatePool.Put(compressor) // Get above
 | 
						|
 | 
						|
	if prevTail != nil {
 | 
						|
		z.dstPool.Put(prevTail) // Get in .compressCurrent
 | 
						|
	}
 | 
						|
 | 
						|
	// Read back buffer
 | 
						|
	buf = dest.Bytes()
 | 
						|
	r.result <- buf
 | 
						|
}
 | 
						|
 | 
						|
// Flush flushes any pending compressed data to the underlying writer.
 | 
						|
//
 | 
						|
// It is useful mainly in compressed network protocols, to ensure that
 | 
						|
// a remote reader has enough data to reconstruct a packet. Flush does
 | 
						|
// not return until the data has been written. If the underlying
 | 
						|
// writer returns an error, Flush returns that error.
 | 
						|
//
 | 
						|
// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
 | 
						|
func (z *Writer) Flush() error {
 | 
						|
	if err := z.checkError(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if z.closed {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if !z.wroteHeader {
 | 
						|
		_, err := z.Write(nil)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	// We send current block to compression
 | 
						|
	z.compressCurrent(true)
 | 
						|
 | 
						|
	return z.checkError()
 | 
						|
}
 | 
						|
 | 
						|
// UncompressedSize will return the number of bytes written.
 | 
						|
// pgzip only, not a function in the official gzip package.
 | 
						|
func (z *Writer) UncompressedSize() int {
 | 
						|
	return z.size
 | 
						|
}
 | 
						|
 | 
						|
// Close closes the Writer, flushing any unwritten data to the underlying
 | 
						|
// io.Writer, but does not close the underlying io.Writer.
 | 
						|
func (z *Writer) Close() error {
 | 
						|
	if err := z.checkError(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if z.closed {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	z.closed = true
 | 
						|
	if !z.wroteHeader {
 | 
						|
		z.Write(nil)
 | 
						|
		if err := z.checkError(); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	z.compressCurrent(true)
 | 
						|
	if err := z.checkError(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	close(z.results)
 | 
						|
	put4(z.buf[0:4], z.digest.Sum32())
 | 
						|
	put4(z.buf[4:8], uint32(z.size))
 | 
						|
	_, err := z.w.Write(z.buf[0:8])
 | 
						|
	if err != nil {
 | 
						|
		z.pushError(err)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |