129 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			129 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Go
		
	
	
	
| package nodb
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/lunny/log"
 | |
| 	"github.com/lunny/nodb/config"
 | |
| 	"github.com/lunny/nodb/store"
 | |
| )
 | |
| 
 | |
| type Nodb struct {
 | |
| 	cfg *config.Config
 | |
| 
 | |
| 	ldb *store.DB
 | |
| 	dbs [MaxDBNumber]*DB
 | |
| 
 | |
| 	quit chan struct{}
 | |
| 	jobs *sync.WaitGroup
 | |
| 
 | |
| 	binlog *BinLog
 | |
| 
 | |
| 	wLock      sync.RWMutex //allow one write at same time
 | |
| 	commitLock sync.Mutex   //allow one write commit at same time
 | |
| }
 | |
| 
 | |
| func Open(cfg *config.Config) (*Nodb, error) {
 | |
| 	if len(cfg.DataDir) == 0 {
 | |
| 		cfg.DataDir = config.DefaultDataDir
 | |
| 	}
 | |
| 
 | |
| 	ldb, err := store.Open(cfg)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	l := new(Nodb)
 | |
| 
 | |
| 	l.quit = make(chan struct{})
 | |
| 	l.jobs = new(sync.WaitGroup)
 | |
| 
 | |
| 	l.ldb = ldb
 | |
| 
 | |
| 	if cfg.BinLog.MaxFileNum > 0 && cfg.BinLog.MaxFileSize > 0 {
 | |
| 		l.binlog, err = NewBinLog(cfg)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	} else {
 | |
| 		l.binlog = nil
 | |
| 	}
 | |
| 
 | |
| 	for i := uint8(0); i < MaxDBNumber; i++ {
 | |
| 		l.dbs[i] = l.newDB(i)
 | |
| 	}
 | |
| 
 | |
| 	l.activeExpireCycle()
 | |
| 
 | |
| 	return l, nil
 | |
| }
 | |
| 
 | |
| func (l *Nodb) Close() {
 | |
| 	close(l.quit)
 | |
| 	l.jobs.Wait()
 | |
| 
 | |
| 	l.ldb.Close()
 | |
| 
 | |
| 	if l.binlog != nil {
 | |
| 		l.binlog.Close()
 | |
| 		l.binlog = nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *Nodb) Select(index int) (*DB, error) {
 | |
| 	if index < 0 || index >= int(MaxDBNumber) {
 | |
| 		return nil, fmt.Errorf("invalid db index %d", index)
 | |
| 	}
 | |
| 
 | |
| 	return l.dbs[index], nil
 | |
| }
 | |
| 
 | |
| func (l *Nodb) FlushAll() error {
 | |
| 	for index, db := range l.dbs {
 | |
| 		if _, err := db.FlushAll(); err != nil {
 | |
| 			log.Error("flush db %d error %s", index, err.Error())
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // very dangerous to use
 | |
| func (l *Nodb) DataDB() *store.DB {
 | |
| 	return l.ldb
 | |
| }
 | |
| 
 | |
| func (l *Nodb) activeExpireCycle() {
 | |
| 	var executors []*elimination = make([]*elimination, len(l.dbs))
 | |
| 	for i, db := range l.dbs {
 | |
| 		executors[i] = db.newEliminator()
 | |
| 	}
 | |
| 
 | |
| 	l.jobs.Add(1)
 | |
| 	go func() {
 | |
| 		tick := time.NewTicker(1 * time.Second)
 | |
| 		end := false
 | |
| 		done := make(chan struct{})
 | |
| 		for !end {
 | |
| 			select {
 | |
| 			case <-tick.C:
 | |
| 				go func() {
 | |
| 					for _, eli := range executors {
 | |
| 						eli.active()
 | |
| 					}
 | |
| 					done <- struct{}{}
 | |
| 				}()
 | |
| 				<-done
 | |
| 			case <-l.quit:
 | |
| 				end = true
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		tick.Stop()
 | |
| 		l.jobs.Done()
 | |
| 	}()
 | |
| }
 |