313 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			313 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Go
		
	
	
	
package nodb
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"bytes"
 | 
						|
	"errors"
 | 
						|
	"io"
 | 
						|
	"os"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"gitea.com/lunny/log"
 | 
						|
	"gitea.com/lunny/nodb/store/driver"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	maxReplBatchNum = 100
 | 
						|
	maxReplLogSize  = 1 * 1024 * 1024
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	ErrSkipEvent = errors.New("skip to next event")
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	errInvalidBinLogEvent = errors.New("invalid binglog event")
 | 
						|
	errInvalidBinLogFile  = errors.New("invalid binlog file")
 | 
						|
)
 | 
						|
 | 
						|
type replBatch struct {
 | 
						|
	wb     driver.IWriteBatch
 | 
						|
	events [][]byte
 | 
						|
	l      *Nodb
 | 
						|
 | 
						|
	lastHead *BinLogHead
 | 
						|
}
 | 
						|
 | 
						|
func (b *replBatch) Commit() error {
 | 
						|
	b.l.commitLock.Lock()
 | 
						|
	defer b.l.commitLock.Unlock()
 | 
						|
 | 
						|
	err := b.wb.Commit()
 | 
						|
	if err != nil {
 | 
						|
		b.Rollback()
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if b.l.binlog != nil {
 | 
						|
		if err = b.l.binlog.Log(b.events...); err != nil {
 | 
						|
			b.Rollback()
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	b.events = [][]byte{}
 | 
						|
	b.lastHead = nil
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (b *replBatch) Rollback() error {
 | 
						|
	b.wb.Rollback()
 | 
						|
	b.events = [][]byte{}
 | 
						|
	b.lastHead = nil
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *Nodb) replicateEvent(b *replBatch, event []byte) error {
 | 
						|
	if len(event) == 0 {
 | 
						|
		return errInvalidBinLogEvent
 | 
						|
	}
 | 
						|
 | 
						|
	b.events = append(b.events, event)
 | 
						|
 | 
						|
	logType := uint8(event[0])
 | 
						|
	switch logType {
 | 
						|
	case BinLogTypePut:
 | 
						|
		return l.replicatePutEvent(b, event)
 | 
						|
	case BinLogTypeDeletion:
 | 
						|
		return l.replicateDeleteEvent(b, event)
 | 
						|
	default:
 | 
						|
		return errInvalidBinLogEvent
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (l *Nodb) replicatePutEvent(b *replBatch, event []byte) error {
 | 
						|
	key, value, err := decodeBinLogPut(event)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	b.wb.Put(key, value)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *Nodb) replicateDeleteEvent(b *replBatch, event []byte) error {
 | 
						|
	key, err := decodeBinLogDelete(event)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	b.wb.Delete(key)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error {
 | 
						|
	head := &BinLogHead{}
 | 
						|
	var err error
 | 
						|
 | 
						|
	for {
 | 
						|
		if err = head.Read(rb); err != nil {
 | 
						|
			if err == io.EOF {
 | 
						|
				break
 | 
						|
			} else {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		var dataBuf bytes.Buffer
 | 
						|
 | 
						|
		if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		err = f(head, dataBuf.Bytes())
 | 
						|
		if err != nil && err != ErrSkipEvent {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (l *Nodb) ReplicateFromReader(rb io.Reader) error {
 | 
						|
	b := new(replBatch)
 | 
						|
 | 
						|
	b.wb = l.ldb.NewWriteBatch()
 | 
						|
	b.l = l
 | 
						|
 | 
						|
	f := func(head *BinLogHead, event []byte) error {
 | 
						|
		if b.lastHead == nil {
 | 
						|
			b.lastHead = head
 | 
						|
		} else if !b.lastHead.InSameBatch(head) {
 | 
						|
			if err := b.Commit(); err != nil {
 | 
						|
				log.Fatalf("replication error %s, skip to next", err.Error())
 | 
						|
				return ErrSkipEvent
 | 
						|
			}
 | 
						|
			b.lastHead = head
 | 
						|
		}
 | 
						|
 | 
						|
		err := l.replicateEvent(b, event)
 | 
						|
		if err != nil {
 | 
						|
			log.Fatalf("replication error %s, skip to next", err.Error())
 | 
						|
			return ErrSkipEvent
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	err := ReadEventFromReader(rb, f)
 | 
						|
	if err != nil {
 | 
						|
		b.Rollback()
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return b.Commit()
 | 
						|
}
 | 
						|
 | 
						|
func (l *Nodb) ReplicateFromData(data []byte) error {
 | 
						|
	rb := bytes.NewReader(data)
 | 
						|
 | 
						|
	err := l.ReplicateFromReader(rb)
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (l *Nodb) ReplicateFromBinLog(filePath string) error {
 | 
						|
	f, err := os.Open(filePath)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	rb := bufio.NewReaderSize(f, 4096)
 | 
						|
 | 
						|
	err = l.ReplicateFromReader(rb)
 | 
						|
 | 
						|
	f.Close()
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// try to read events, if no events read, try to wait the new event singal until timeout seconds
 | 
						|
func (l *Nodb) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) {
 | 
						|
	lastIndex := info.LogFileIndex
 | 
						|
	lastPos := info.LogPos
 | 
						|
 | 
						|
	n = 0
 | 
						|
	if l.binlog == nil {
 | 
						|
		//binlog not supported
 | 
						|
		info.LogFileIndex = 0
 | 
						|
		info.LogPos = 0
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	n, err = l.ReadEventsTo(info, w)
 | 
						|
	if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos {
 | 
						|
		//no events read
 | 
						|
		select {
 | 
						|
		case <-l.binlog.Wait():
 | 
						|
		case <-time.After(time.Duration(timeout) * time.Second):
 | 
						|
		}
 | 
						|
		return l.ReadEventsTo(info, w)
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (l *Nodb) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
 | 
						|
	n = 0
 | 
						|
	if l.binlog == nil {
 | 
						|
		//binlog not supported
 | 
						|
		info.LogFileIndex = 0
 | 
						|
		info.LogPos = 0
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	index := info.LogFileIndex
 | 
						|
	offset := info.LogPos
 | 
						|
 | 
						|
	filePath := l.binlog.FormatLogFilePath(index)
 | 
						|
 | 
						|
	var f *os.File
 | 
						|
	f, err = os.Open(filePath)
 | 
						|
	if os.IsNotExist(err) {
 | 
						|
		lastIndex := l.binlog.LogFileIndex()
 | 
						|
 | 
						|
		if index == lastIndex {
 | 
						|
			//no binlog at all
 | 
						|
			info.LogPos = 0
 | 
						|
		} else {
 | 
						|
			//slave binlog info had lost
 | 
						|
			info.LogFileIndex = -1
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		if os.IsNotExist(err) {
 | 
						|
			err = nil
 | 
						|
		}
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	defer f.Close()
 | 
						|
 | 
						|
	var fileSize int64
 | 
						|
	st, _ := f.Stat()
 | 
						|
	fileSize = st.Size()
 | 
						|
 | 
						|
	if fileSize == info.LogPos {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
 | 
						|
		//may be invliad seek offset
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	var lastHead *BinLogHead = nil
 | 
						|
 | 
						|
	head := &BinLogHead{}
 | 
						|
 | 
						|
	batchNum := 0
 | 
						|
 | 
						|
	for {
 | 
						|
		if err = head.Read(f); err != nil {
 | 
						|
			if err == io.EOF {
 | 
						|
				//we will try to use next binlog
 | 
						|
				if index < l.binlog.LogFileIndex() {
 | 
						|
					info.LogFileIndex += 1
 | 
						|
					info.LogPos = 0
 | 
						|
				}
 | 
						|
				err = nil
 | 
						|
				return
 | 
						|
			} else {
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
		}
 | 
						|
 | 
						|
		if lastHead == nil {
 | 
						|
			lastHead = head
 | 
						|
			batchNum++
 | 
						|
		} else if !lastHead.InSameBatch(head) {
 | 
						|
			lastHead = head
 | 
						|
			batchNum++
 | 
						|
			if batchNum > maxReplBatchNum || n > maxReplLogSize {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if err = head.Write(w); err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		n += (head.Len() + int(head.PayloadLen))
 | 
						|
		info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen)
 | 
						|
	}
 | 
						|
 | 
						|
	return
 | 
						|
}
 |