520 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			520 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright 2010 The Go Authors. All rights reserved.
 | |
| // Use of this source code is governed by a BSD-style
 | |
| // license that can be found in the LICENSE file.
 | |
| 
 | |
| package pgzip
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"hash"
 | |
| 	"hash/crc32"
 | |
| 	"io"
 | |
| 	"runtime"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/klauspost/compress/flate"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	defaultBlockSize = 1 << 20
 | |
| 	tailSize         = 16384
 | |
| 	defaultBlocks    = 4
 | |
| )
 | |
| 
 | |
| // These constants are copied from the flate package, so that code that imports
 | |
| // "compress/gzip" does not also have to import "compress/flate".
 | |
| const (
 | |
| 	NoCompression       = flate.NoCompression
 | |
| 	BestSpeed           = flate.BestSpeed
 | |
| 	BestCompression     = flate.BestCompression
 | |
| 	DefaultCompression  = flate.DefaultCompression
 | |
| 	ConstantCompression = flate.ConstantCompression
 | |
| 	HuffmanOnly         = flate.HuffmanOnly
 | |
| )
 | |
| 
 | |
| // A Writer is an io.WriteCloser.
 | |
| // Writes to a Writer are compressed and written to w.
 | |
| type Writer struct {
 | |
| 	Header
 | |
| 	w             io.Writer
 | |
| 	level         int
 | |
| 	wroteHeader   bool
 | |
| 	blockSize     int
 | |
| 	blocks        int
 | |
| 	currentBuffer []byte
 | |
| 	prevTail      []byte
 | |
| 	digest        hash.Hash32
 | |
| 	size          int
 | |
| 	closed        bool
 | |
| 	buf           [10]byte
 | |
| 	errMu         sync.RWMutex
 | |
| 	err           error
 | |
| 	pushedErr     chan struct{}
 | |
| 	results       chan result
 | |
| 	dictFlatePool sync.Pool
 | |
| 	dstPool       sync.Pool
 | |
| 	wg            sync.WaitGroup
 | |
| }
 | |
| 
 | |
| type result struct {
 | |
| 	result        chan []byte
 | |
| 	notifyWritten chan struct{}
 | |
| }
 | |
| 
 | |
| // Use SetConcurrency to finetune the concurrency level if needed.
 | |
| //
 | |
| // With this you can control the approximate size of your blocks,
 | |
| // as well as how many you want to be processing in parallel.
 | |
| //
 | |
| // Default values for this is SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)),
 | |
| // meaning blocks are split at 1 MB and up to the number of CPU threads
 | |
| // can be processing at once before the writer blocks.
 | |
| func (z *Writer) SetConcurrency(blockSize, blocks int) error {
 | |
| 	if blockSize <= tailSize {
 | |
| 		return fmt.Errorf("gzip: block size cannot be less than or equal to %d", tailSize)
 | |
| 	}
 | |
| 	if blocks <= 0 {
 | |
| 		return errors.New("gzip: blocks cannot be zero or less")
 | |
| 	}
 | |
| 	if blockSize == z.blockSize && blocks == z.blocks {
 | |
| 		return nil
 | |
| 	}
 | |
| 	z.blockSize = blockSize
 | |
| 	z.results = make(chan result, blocks)
 | |
| 	z.blocks = blocks
 | |
| 	z.dstPool.New = func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // NewWriter returns a new Writer.
 | |
| // Writes to the returned writer are compressed and written to w.
 | |
| //
 | |
| // It is the caller's responsibility to call Close on the WriteCloser when done.
 | |
| // Writes may be buffered and not flushed until Close.
 | |
| //
 | |
| // Callers that wish to set the fields in Writer.Header must do so before
 | |
| // the first call to Write or Close. The Comment and Name header fields are
 | |
| // UTF-8 strings in Go, but the underlying format requires NUL-terminated ISO
 | |
| // 8859-1 (Latin-1). NUL or non-Latin-1 runes in those strings will lead to an
 | |
| // error on Write.
 | |
| func NewWriter(w io.Writer) *Writer {
 | |
| 	z, _ := NewWriterLevel(w, DefaultCompression)
 | |
| 	return z
 | |
| }
 | |
| 
 | |
| // NewWriterLevel is like NewWriter but specifies the compression level instead
 | |
| // of assuming DefaultCompression.
 | |
| //
 | |
| // The compression level can be DefaultCompression, NoCompression, or any
 | |
| // integer value between BestSpeed and BestCompression inclusive. The error
 | |
| // returned will be nil if the level is valid.
 | |
| func NewWriterLevel(w io.Writer, level int) (*Writer, error) {
 | |
| 	if level < ConstantCompression || level > BestCompression {
 | |
| 		return nil, fmt.Errorf("gzip: invalid compression level: %d", level)
 | |
| 	}
 | |
| 	z := new(Writer)
 | |
| 	z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
 | |
| 	z.init(w, level)
 | |
| 	return z, nil
 | |
| }
 | |
| 
 | |
| // This function must be used by goroutines to set an
 | |
| // error condition, since z.err access is restricted
 | |
| // to the callers goruotine.
 | |
| func (z *Writer) pushError(err error) {
 | |
| 	z.errMu.Lock()
 | |
| 	if z.err != nil {
 | |
| 		z.errMu.Unlock()
 | |
| 		return
 | |
| 	}
 | |
| 	z.err = err
 | |
| 	close(z.pushedErr)
 | |
| 	z.errMu.Unlock()
 | |
| }
 | |
| 
 | |
| func (z *Writer) init(w io.Writer, level int) {
 | |
| 	z.wg.Wait()
 | |
| 	digest := z.digest
 | |
| 	if digest != nil {
 | |
| 		digest.Reset()
 | |
| 	} else {
 | |
| 		digest = crc32.NewIEEE()
 | |
| 	}
 | |
| 	z.Header = Header{OS: 255}
 | |
| 	z.w = w
 | |
| 	z.level = level
 | |
| 	z.digest = digest
 | |
| 	z.pushedErr = make(chan struct{}, 0)
 | |
| 	z.results = make(chan result, z.blocks)
 | |
| 	z.err = nil
 | |
| 	z.closed = false
 | |
| 	z.Comment = ""
 | |
| 	z.Extra = nil
 | |
| 	z.ModTime = time.Time{}
 | |
| 	z.wroteHeader = false
 | |
| 	z.currentBuffer = nil
 | |
| 	z.buf = [10]byte{}
 | |
| 	z.prevTail = nil
 | |
| 	z.size = 0
 | |
| 	if z.dictFlatePool.New == nil {
 | |
| 		z.dictFlatePool.New = func() interface{} {
 | |
| 			f, _ := flate.NewWriterDict(w, level, nil)
 | |
| 			return f
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Reset discards the Writer z's state and makes it equivalent to the
 | |
| // result of its original state from NewWriter or NewWriterLevel, but
 | |
| // writing to w instead. This permits reusing a Writer rather than
 | |
| // allocating a new one.
 | |
| func (z *Writer) Reset(w io.Writer) {
 | |
| 	if z.results != nil && !z.closed {
 | |
| 		close(z.results)
 | |
| 	}
 | |
| 	z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
 | |
| 	z.init(w, z.level)
 | |
| }
 | |
| 
 | |
| // GZIP (RFC 1952) is little-endian, unlike ZLIB (RFC 1950).
 | |
| func put2(p []byte, v uint16) {
 | |
| 	p[0] = uint8(v >> 0)
 | |
| 	p[1] = uint8(v >> 8)
 | |
| }
 | |
| 
 | |
| func put4(p []byte, v uint32) {
 | |
| 	p[0] = uint8(v >> 0)
 | |
| 	p[1] = uint8(v >> 8)
 | |
| 	p[2] = uint8(v >> 16)
 | |
| 	p[3] = uint8(v >> 24)
 | |
| }
 | |
| 
 | |
| // writeBytes writes a length-prefixed byte slice to z.w.
 | |
| func (z *Writer) writeBytes(b []byte) error {
 | |
| 	if len(b) > 0xffff {
 | |
| 		return errors.New("gzip.Write: Extra data is too large")
 | |
| 	}
 | |
| 	put2(z.buf[0:2], uint16(len(b)))
 | |
| 	_, err := z.w.Write(z.buf[0:2])
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	_, err = z.w.Write(b)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // writeString writes a UTF-8 string s in GZIP's format to z.w.
 | |
| // GZIP (RFC 1952) specifies that strings are NUL-terminated ISO 8859-1 (Latin-1).
 | |
| func (z *Writer) writeString(s string) (err error) {
 | |
| 	// GZIP stores Latin-1 strings; error if non-Latin-1; convert if non-ASCII.
 | |
| 	needconv := false
 | |
| 	for _, v := range s {
 | |
| 		if v == 0 || v > 0xff {
 | |
| 			return errors.New("gzip.Write: non-Latin-1 header string")
 | |
| 		}
 | |
| 		if v > 0x7f {
 | |
| 			needconv = true
 | |
| 		}
 | |
| 	}
 | |
| 	if needconv {
 | |
| 		b := make([]byte, 0, len(s))
 | |
| 		for _, v := range s {
 | |
| 			b = append(b, byte(v))
 | |
| 		}
 | |
| 		_, err = z.w.Write(b)
 | |
| 	} else {
 | |
| 		_, err = io.WriteString(z.w, s)
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// GZIP strings are NUL-terminated.
 | |
| 	z.buf[0] = 0
 | |
| 	_, err = z.w.Write(z.buf[0:1])
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // compressCurrent will compress the data currently buffered
 | |
| // This should only be called from the main writer/flush/closer
 | |
| func (z *Writer) compressCurrent(flush bool) {
 | |
| 	c := z.currentBuffer
 | |
| 	if len(c) > z.blockSize {
 | |
| 		// This can never happen through the public interface.
 | |
| 		panic("len(z.currentBuffer) > z.blockSize (most likely due to concurrent Write race)")
 | |
| 	}
 | |
| 
 | |
| 	r := result{}
 | |
| 	r.result = make(chan []byte, 1)
 | |
| 	r.notifyWritten = make(chan struct{}, 0)
 | |
| 	// Reserve a result slot
 | |
| 	select {
 | |
| 	case z.results <- r:
 | |
| 	case <-z.pushedErr:
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	z.wg.Add(1)
 | |
| 	tail := z.prevTail
 | |
| 	if len(c) > tailSize {
 | |
| 		buf := z.dstPool.Get().([]byte) // Put in .compressBlock
 | |
| 		// Copy tail from current buffer before handing the buffer over to the
 | |
| 		// compressBlock goroutine.
 | |
| 		buf = append(buf[:0], c[len(c)-tailSize:]...)
 | |
| 		z.prevTail = buf
 | |
| 	} else {
 | |
| 		z.prevTail = nil
 | |
| 	}
 | |
| 	go z.compressBlock(c, tail, r, z.closed)
 | |
| 
 | |
| 	z.currentBuffer = z.dstPool.Get().([]byte) // Put in .compressBlock
 | |
| 	z.currentBuffer = z.currentBuffer[:0]
 | |
| 
 | |
| 	// Wait if flushing
 | |
| 	if flush {
 | |
| 		<-r.notifyWritten
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Returns an error if it has been set.
 | |
| // Cannot be used by functions that are from internal goroutines.
 | |
| func (z *Writer) checkError() error {
 | |
| 	z.errMu.RLock()
 | |
| 	err := z.err
 | |
| 	z.errMu.RUnlock()
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Write writes a compressed form of p to the underlying io.Writer. The
 | |
| // compressed bytes are not necessarily flushed to output until
 | |
| // the Writer is closed or Flush() is called.
 | |
| //
 | |
| // The function will return quickly, if there are unused buffers.
 | |
| // The sent slice (p) is copied, and the caller is free to re-use the buffer
 | |
| // when the function returns.
 | |
| //
 | |
| // Errors that occur during compression will be reported later, and a nil error
 | |
| // does not signify that the compression succeeded (since it is most likely still running)
 | |
| // That means that the call that returns an error may not be the call that caused it.
 | |
| // Only Flush and Close functions are guaranteed to return any errors up to that point.
 | |
| func (z *Writer) Write(p []byte) (int, error) {
 | |
| 	if err := z.checkError(); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	// Write the GZIP header lazily.
 | |
| 	if !z.wroteHeader {
 | |
| 		z.wroteHeader = true
 | |
| 		z.buf[0] = gzipID1
 | |
| 		z.buf[1] = gzipID2
 | |
| 		z.buf[2] = gzipDeflate
 | |
| 		z.buf[3] = 0
 | |
| 		if z.Extra != nil {
 | |
| 			z.buf[3] |= 0x04
 | |
| 		}
 | |
| 		if z.Name != "" {
 | |
| 			z.buf[3] |= 0x08
 | |
| 		}
 | |
| 		if z.Comment != "" {
 | |
| 			z.buf[3] |= 0x10
 | |
| 		}
 | |
| 		put4(z.buf[4:8], uint32(z.ModTime.Unix()))
 | |
| 		if z.level == BestCompression {
 | |
| 			z.buf[8] = 2
 | |
| 		} else if z.level == BestSpeed {
 | |
| 			z.buf[8] = 4
 | |
| 		} else {
 | |
| 			z.buf[8] = 0
 | |
| 		}
 | |
| 		z.buf[9] = z.OS
 | |
| 		var n int
 | |
| 		var err error
 | |
| 		n, err = z.w.Write(z.buf[0:10])
 | |
| 		if err != nil {
 | |
| 			z.pushError(err)
 | |
| 			return n, err
 | |
| 		}
 | |
| 		if z.Extra != nil {
 | |
| 			err = z.writeBytes(z.Extra)
 | |
| 			if err != nil {
 | |
| 				z.pushError(err)
 | |
| 				return n, err
 | |
| 			}
 | |
| 		}
 | |
| 		if z.Name != "" {
 | |
| 			err = z.writeString(z.Name)
 | |
| 			if err != nil {
 | |
| 				z.pushError(err)
 | |
| 				return n, err
 | |
| 			}
 | |
| 		}
 | |
| 		if z.Comment != "" {
 | |
| 			err = z.writeString(z.Comment)
 | |
| 			if err != nil {
 | |
| 				z.pushError(err)
 | |
| 				return n, err
 | |
| 			}
 | |
| 		}
 | |
| 		// Start receiving data from compressors
 | |
| 		go func() {
 | |
| 			listen := z.results
 | |
| 			var failed bool
 | |
| 			for {
 | |
| 				r, ok := <-listen
 | |
| 				// If closed, we are finished.
 | |
| 				if !ok {
 | |
| 					return
 | |
| 				}
 | |
| 				if failed {
 | |
| 					close(r.notifyWritten)
 | |
| 					continue
 | |
| 				}
 | |
| 				buf := <-r.result
 | |
| 				n, err := z.w.Write(buf)
 | |
| 				if err != nil {
 | |
| 					z.pushError(err)
 | |
| 					close(r.notifyWritten)
 | |
| 					failed = true
 | |
| 					continue
 | |
| 				}
 | |
| 				if n != len(buf) {
 | |
| 					z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf)))
 | |
| 					failed = true
 | |
| 					close(r.notifyWritten)
 | |
| 					continue
 | |
| 				}
 | |
| 				z.dstPool.Put(buf)
 | |
| 				close(r.notifyWritten)
 | |
| 			}
 | |
| 		}()
 | |
| 		z.currentBuffer = z.dstPool.Get().([]byte)
 | |
| 		z.currentBuffer = z.currentBuffer[:0]
 | |
| 	}
 | |
| 	q := p
 | |
| 	for len(q) > 0 {
 | |
| 		length := len(q)
 | |
| 		if length+len(z.currentBuffer) > z.blockSize {
 | |
| 			length = z.blockSize - len(z.currentBuffer)
 | |
| 		}
 | |
| 		z.digest.Write(q[:length])
 | |
| 		z.currentBuffer = append(z.currentBuffer, q[:length]...)
 | |
| 		if len(z.currentBuffer) > z.blockSize {
 | |
| 			panic("z.currentBuffer too large (most likely due to concurrent Write race)")
 | |
| 		}
 | |
| 		if len(z.currentBuffer) == z.blockSize {
 | |
| 			z.compressCurrent(false)
 | |
| 			if err := z.checkError(); err != nil {
 | |
| 				return len(p) - len(q), err
 | |
| 			}
 | |
| 		}
 | |
| 		z.size += length
 | |
| 		q = q[length:]
 | |
| 	}
 | |
| 	return len(p), z.checkError()
 | |
| }
 | |
| 
 | |
| // Step 1: compresses buffer to buffer
 | |
| // Step 2: send writer to channel
 | |
| // Step 3: Close result channel to indicate we are done
 | |
| func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) {
 | |
| 	defer func() {
 | |
| 		close(r.result)
 | |
| 		z.wg.Done()
 | |
| 	}()
 | |
| 	buf := z.dstPool.Get().([]byte) // Corresponding Put in .Write's result writer
 | |
| 	dest := bytes.NewBuffer(buf[:0])
 | |
| 
 | |
| 	compressor := z.dictFlatePool.Get().(*flate.Writer) // Put below
 | |
| 	compressor.ResetDict(dest, prevTail)
 | |
| 	compressor.Write(p)
 | |
| 	z.dstPool.Put(p) // Corresponding Get in .Write and .compressCurrent
 | |
| 
 | |
| 	err := compressor.Flush()
 | |
| 	if err != nil {
 | |
| 		z.pushError(err)
 | |
| 		return
 | |
| 	}
 | |
| 	if closed {
 | |
| 		err = compressor.Close()
 | |
| 		if err != nil {
 | |
| 			z.pushError(err)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	z.dictFlatePool.Put(compressor) // Get above
 | |
| 
 | |
| 	if prevTail != nil {
 | |
| 		z.dstPool.Put(prevTail) // Get in .compressCurrent
 | |
| 	}
 | |
| 
 | |
| 	// Read back buffer
 | |
| 	buf = dest.Bytes()
 | |
| 	r.result <- buf
 | |
| }
 | |
| 
 | |
| // Flush flushes any pending compressed data to the underlying writer.
 | |
| //
 | |
| // It is useful mainly in compressed network protocols, to ensure that
 | |
| // a remote reader has enough data to reconstruct a packet. Flush does
 | |
| // not return until the data has been written. If the underlying
 | |
| // writer returns an error, Flush returns that error.
 | |
| //
 | |
| // In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
 | |
| func (z *Writer) Flush() error {
 | |
| 	if err := z.checkError(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if z.closed {
 | |
| 		return nil
 | |
| 	}
 | |
| 	if !z.wroteHeader {
 | |
| 		_, err := z.Write(nil)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	// We send current block to compression
 | |
| 	z.compressCurrent(true)
 | |
| 
 | |
| 	return z.checkError()
 | |
| }
 | |
| 
 | |
| // UncompressedSize will return the number of bytes written.
 | |
| // pgzip only, not a function in the official gzip package.
 | |
| func (z *Writer) UncompressedSize() int {
 | |
| 	return z.size
 | |
| }
 | |
| 
 | |
| // Close closes the Writer, flushing any unwritten data to the underlying
 | |
| // io.Writer, but does not close the underlying io.Writer.
 | |
| func (z *Writer) Close() error {
 | |
| 	if err := z.checkError(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if z.closed {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	z.closed = true
 | |
| 	if !z.wroteHeader {
 | |
| 		z.Write(nil)
 | |
| 		if err := z.checkError(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	z.compressCurrent(true)
 | |
| 	if err := z.checkError(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	close(z.results)
 | |
| 	put4(z.buf[0:4], z.digest.Sum32())
 | |
| 	put4(z.buf[4:8], uint32(z.size))
 | |
| 	_, err := z.w.Write(z.buf[0:8])
 | |
| 	if err != nil {
 | |
| 		z.pushError(err)
 | |
| 		return err
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |