129 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			129 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
| package bufpipe
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"errors"
 | |
| 	"io"
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| // ErrClosedPipe is the error used for read or write operations on a closed pipe.
 | |
| var ErrClosedPipe = errors.New("bufpipe: read/write on closed pipe")
 | |
| 
 | |
| type pipe struct {
 | |
| 	cond       *sync.Cond
 | |
| 	buf        *bytes.Buffer
 | |
| 	rerr, werr error
 | |
| }
 | |
| 
 | |
| // A PipeReader is the read half of a pipe.
 | |
| type PipeReader struct {
 | |
| 	*pipe
 | |
| }
 | |
| 
 | |
| // A PipeWriter is the write half of a pipe.
 | |
| type PipeWriter struct {
 | |
| 	*pipe
 | |
| }
 | |
| 
 | |
| // New creates a synchronous pipe using buf as its initial contents. It can be
 | |
| // used to connect code expecting an io.Reader with code expecting an io.Writer.
 | |
| //
 | |
| // Unlike io.Pipe, writes never block because the internal buffer has variable
 | |
| // size. Reads block only when the buffer is empty.
 | |
| //
 | |
| // It is safe to call Read and Write in parallel with each other or with Close.
 | |
| // Parallel calls to Read and parallel calls to Write are also safe: the
 | |
| // individual calls will be gated sequentially.
 | |
| //
 | |
| // The new pipe takes ownership of buf, and the caller should not use buf after
 | |
| // this call. New is intended to prepare a PipeReader to read existing data. It
 | |
| // can also be used to set the initial size of the internal buffer for writing.
 | |
| // To do that, buf should have the desired capacity but a length of zero.
 | |
| func New(buf []byte) (*PipeReader, *PipeWriter) {
 | |
| 	p := &pipe{
 | |
| 		buf:  bytes.NewBuffer(buf),
 | |
| 		cond: sync.NewCond(new(sync.Mutex)),
 | |
| 	}
 | |
| 	return &PipeReader{
 | |
| 			pipe: p,
 | |
| 		}, &PipeWriter{
 | |
| 			pipe: p,
 | |
| 		}
 | |
| }
 | |
| 
 | |
| // Read implements the standard Read interface: it reads data from the pipe,
 | |
| // reading from the internal buffer, otherwise blocking until a writer arrives
 | |
| // or the write end is closed. If the write end is closed with an error, that
 | |
| // error is returned as err; otherwise err is io.EOF.
 | |
| func (r *PipeReader) Read(data []byte) (int, error) {
 | |
| 	r.cond.L.Lock()
 | |
| 	defer r.cond.L.Unlock()
 | |
| 
 | |
| RETRY:
 | |
| 	n, err := r.buf.Read(data)
 | |
| 	// If not closed and no read, wait for writing.
 | |
| 	if err == io.EOF && r.rerr == nil && n == 0 {
 | |
| 		r.cond.Wait()
 | |
| 		goto RETRY
 | |
| 	}
 | |
| 	if err == io.EOF {
 | |
| 		return n, r.rerr
 | |
| 	}
 | |
| 	return n, err
 | |
| }
 | |
| 
 | |
| // Close closes the reader; subsequent writes from the write half of the pipe
 | |
| // will return error ErrClosedPipe.
 | |
| func (r *PipeReader) Close() error {
 | |
| 	return r.CloseWithError(nil)
 | |
| }
 | |
| 
 | |
| // CloseWithError closes the reader; subsequent writes to the write half of the
 | |
| // pipe will return the error err.
 | |
| func (r *PipeReader) CloseWithError(err error) error {
 | |
| 	r.cond.L.Lock()
 | |
| 	defer r.cond.L.Unlock()
 | |
| 
 | |
| 	if err == nil {
 | |
| 		err = ErrClosedPipe
 | |
| 	}
 | |
| 	r.werr = err
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Write implements the standard Write interface: it writes data to the internal
 | |
| // buffer. If the read end is closed with an error, that err is returned as err;
 | |
| // otherwise err is ErrClosedPipe.
 | |
| func (w *PipeWriter) Write(data []byte) (int, error) {
 | |
| 	w.cond.L.Lock()
 | |
| 	defer w.cond.L.Unlock()
 | |
| 
 | |
| 	if w.werr != nil {
 | |
| 		return 0, w.werr
 | |
| 	}
 | |
| 
 | |
| 	n, err := w.buf.Write(data)
 | |
| 	w.cond.Signal()
 | |
| 	return n, err
 | |
| }
 | |
| 
 | |
| // Close closes the writer; subsequent reads from the read half of the pipe will
 | |
| // return io.EOF once the internal buffer get empty.
 | |
| func (w *PipeWriter) Close() error {
 | |
| 	return w.CloseWithError(nil)
 | |
| }
 | |
| 
 | |
| // Close closes the writer; subsequent reads from the read half of the pipe will
 | |
| // return err once the internal buffer get empty.
 | |
| func (w *PipeWriter) CloseWithError(err error) error {
 | |
| 	w.cond.L.Lock()
 | |
| 	defer w.cond.L.Unlock()
 | |
| 
 | |
| 	if err == nil {
 | |
| 		err = io.EOF
 | |
| 	}
 | |
| 	w.rerr = err
 | |
| 	return nil
 | |
| }
 |