455 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			455 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
| package hbase
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"crypto/md5"
 | |
| 	"encoding/binary"
 | |
| 	"encoding/hex"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	pb "github.com/golang/protobuf/proto"
 | |
| 	"github.com/juju/errors"
 | |
| 	"github.com/ngaut/go-zookeeper/zk"
 | |
| 	"github.com/ngaut/log"
 | |
| 	"github.com/pingcap/go-hbase/proto"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	zkRootRegionPath = "/meta-region-server"
 | |
| 	zkMasterAddrPath = "/master"
 | |
| 
 | |
| 	magicHeadByte           = 0xff
 | |
| 	magicHeadSize           = 1
 | |
| 	idLengthSize            = 4
 | |
| 	md5HexSize              = 32
 | |
| 	servernameSeparator     = ","
 | |
| 	rpcTimeout              = 30000
 | |
| 	pingTimeout             = 30000
 | |
| 	callTimeout             = 5000
 | |
| 	defaultMaxActionRetries = 3
 | |
| 	// Some operations can take a long time such as disable of big table.
 | |
| 	// numRetries is for 'normal' stuff... Multiply by this factor when
 | |
| 	// want to wait a long time.
 | |
| 	retryLongerMultiplier    = 31
 | |
| 	socketDefaultRetryWaitMs = 200
 | |
| 	defaultRetryWaitMs       = 100
 | |
| 	// always >= any unix timestamp(hbase version)
 | |
| 	beyondMaxTimestamp = "99999999999999"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	hbaseHeaderBytes []byte = []byte("HBas")
 | |
| 	metaTableName    []byte = []byte("hbase:meta")
 | |
| 	metaRegionName   []byte = []byte("hbase:meta,,1")
 | |
| )
 | |
| 
 | |
| var retryPauseTime = []int64{1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200}
 | |
| 
 | |
| type RegionInfo struct {
 | |
| 	Server         string
 | |
| 	StartKey       []byte
 | |
| 	EndKey         []byte
 | |
| 	Name           string
 | |
| 	Ts             string
 | |
| 	TableNamespace string
 | |
| 	TableName      string
 | |
| 	Offline        bool
 | |
| 	Split          bool
 | |
| }
 | |
| 
 | |
| type tableInfo struct {
 | |
| 	tableName string
 | |
| 	families  []string
 | |
| }
 | |
| 
 | |
| // export client interface
 | |
| type HBaseClient interface {
 | |
| 	Get(tbl string, g *Get) (*ResultRow, error)
 | |
| 	Put(tbl string, p *Put) (bool, error)
 | |
| 	Delete(tbl string, d *Delete) (bool, error)
 | |
| 	TableExists(tbl string) (bool, error)
 | |
| 	DropTable(t string) error
 | |
| 	DisableTable(t string) error
 | |
| 	EnableTable(t string) error
 | |
| 	CreateTable(t *TableDescriptor, splits [][]byte) error
 | |
| 	ServiceCall(table string, call *CoprocessorServiceCall) (*proto.CoprocessorServiceResponse, error)
 | |
| 	LocateRegion(table, row []byte, useCache bool) (*RegionInfo, error)
 | |
| 	GetRegions(table []byte, useCache bool) ([]*RegionInfo, error)
 | |
| 	Split(tblOrRegion, splitPoint string) error
 | |
| 	CleanRegionCache(table []byte)
 | |
| 	CleanAllRegionCache()
 | |
| 	Close() error
 | |
| }
 | |
| 
 | |
| // hbase client implemetation
 | |
| var _ HBaseClient = (*client)(nil)
 | |
| 
 | |
| type client struct {
 | |
| 	mu               sync.RWMutex // for read/update region info
 | |
| 	zkClient         *zk.Conn
 | |
| 	zkHosts          []string
 | |
| 	zkRoot           string
 | |
| 	prefetched       map[string]bool
 | |
| 	cachedConns      map[string]*connection
 | |
| 	cachedRegionInfo map[string]map[string]*RegionInfo
 | |
| 	maxRetries       int
 | |
| 	rootServerName   *proto.ServerName
 | |
| 	masterServerName *proto.ServerName
 | |
| }
 | |
| 
 | |
| func serverNameToAddr(server *proto.ServerName) string {
 | |
| 	return fmt.Sprintf("%s:%d", server.GetHostName(), server.GetPort())
 | |
| }
 | |
| 
 | |
| func cachedConnKey(addr string, srvType ServiceType) string {
 | |
| 	return fmt.Sprintf("%s|%d", addr, srvType)
 | |
| }
 | |
| 
 | |
| func NewClient(zkHosts []string, zkRoot string) (HBaseClient, error) {
 | |
| 	cl := &client{
 | |
| 		zkHosts:          zkHosts,
 | |
| 		zkRoot:           zkRoot,
 | |
| 		cachedConns:      make(map[string]*connection),
 | |
| 		cachedRegionInfo: make(map[string]map[string]*RegionInfo),
 | |
| 		prefetched:       make(map[string]bool),
 | |
| 		maxRetries:       defaultMaxActionRetries,
 | |
| 	}
 | |
| 
 | |
| 	err := cl.init()
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	return cl, nil
 | |
| }
 | |
| 
 | |
| func (c *client) decodeMeta(data []byte) (*proto.ServerName, error) {
 | |
| 	if data[0] != magicHeadByte {
 | |
| 		return nil, errors.New("unknown packet")
 | |
| 	}
 | |
| 
 | |
| 	var n int32
 | |
| 	err := binary.Read(bytes.NewBuffer(data[1:]), binary.BigEndian, &n)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	dataOffset := magicHeadSize + idLengthSize + int(n)
 | |
| 	data = data[(dataOffset + 4):]
 | |
| 
 | |
| 	var mrs proto.MetaRegionServer
 | |
| 	err = pb.Unmarshal(data, &mrs)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	return mrs.GetServer(), nil
 | |
| }
 | |
| 
 | |
| // init and get root region server addr and master addr
 | |
| func (c *client) init() error {
 | |
| 	zkclient, _, err := zk.Connect(c.zkHosts, time.Second*30)
 | |
| 	if err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 	c.zkClient = zkclient
 | |
| 
 | |
| 	res, _, _, err := c.zkClient.GetW(c.zkRoot + zkRootRegionPath)
 | |
| 	if err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	c.rootServerName, err = c.decodeMeta(res)
 | |
| 	if err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	log.Debug("connect root region server...", c.rootServerName)
 | |
| 	serverAddr := serverNameToAddr(c.rootServerName)
 | |
| 	conn, err := newConnection(serverAddr, ClientService)
 | |
| 	if err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	// Set buffered regionserver conn.
 | |
| 	cachedKey := cachedConnKey(serverAddr, ClientService)
 | |
| 	c.cachedConns[cachedKey] = conn
 | |
| 
 | |
| 	res, _, _, err = c.zkClient.GetW(c.zkRoot + zkMasterAddrPath)
 | |
| 	if err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	c.masterServerName, err = c.decodeMeta(res)
 | |
| 	if err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // get connection
 | |
| func (c *client) getConn(addr string, srvType ServiceType) (*connection, error) {
 | |
| 	connKey := cachedConnKey(addr, srvType)
 | |
| 	c.mu.RLock()
 | |
| 	conn, ok := c.cachedConns[connKey]
 | |
| 	c.mu.RUnlock()
 | |
| 
 | |
| 	if ok {
 | |
| 		return conn, nil
 | |
| 	}
 | |
| 
 | |
| 	var err error
 | |
| 	conn, err = newConnection(addr, srvType)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Errorf("create new connection failed - %v", errors.ErrorStack(err))
 | |
| 	}
 | |
| 	c.mu.Lock()
 | |
| 	c.cachedConns[connKey] = conn
 | |
| 	c.mu.Unlock()
 | |
| 	return conn, nil
 | |
| }
 | |
| 
 | |
| func (c *client) getAdminConn(addr string) (*connection, error) {
 | |
| 	return c.getConn(addr, AdminService)
 | |
| }
 | |
| 
 | |
| func (c *client) getClientConn(addr string) (*connection, error) {
 | |
| 	return c.getConn(addr, ClientService)
 | |
| }
 | |
| 
 | |
| func (c *client) getMasterConn() (*connection, error) {
 | |
| 	return c.getConn(serverNameToAddr(c.masterServerName), MasterService)
 | |
| }
 | |
| 
 | |
| func (c *client) doAction(conn *connection, req pb.Message) (chan pb.Message, error) {
 | |
| 	cl := newCall(req)
 | |
| 	err := conn.call(cl)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	return cl.responseCh, nil
 | |
| }
 | |
| 
 | |
| func (c *client) adminAction(req pb.Message) (chan pb.Message, error) {
 | |
| 	conn, err := c.getMasterConn()
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 	return c.doAction(conn, req)
 | |
| }
 | |
| 
 | |
| func (c *client) regionAction(addr string, req pb.Message) (chan pb.Message, error) {
 | |
| 	conn, err := c.getAdminConn(addr)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 	return c.doAction(conn, req)
 | |
| }
 | |
| 
 | |
| // http://stackoverflow.com/questions/27602013/correct-way-to-get-region-name-by-using-hbase-api
 | |
| func (c *client) createRegionName(table, startKey []byte, id string, newFormat bool) []byte {
 | |
| 	if len(startKey) == 0 {
 | |
| 		startKey = make([]byte, 1)
 | |
| 	}
 | |
| 
 | |
| 	b := bytes.Join([][]byte{table, startKey, []byte(id)}, []byte{','})
 | |
| 
 | |
| 	if newFormat {
 | |
| 		m := md5.Sum(b)
 | |
| 		mhex := []byte(hex.EncodeToString(m[:]))
 | |
| 		b = append(bytes.Join([][]byte{b, mhex}, []byte{'.'}), '.')
 | |
| 	}
 | |
| 
 | |
| 	return b
 | |
| }
 | |
| 
 | |
| func (c *client) parseRegion(rr *ResultRow) (*RegionInfo, error) {
 | |
| 	regionInfoCol, ok := rr.Columns["info:regioninfo"]
 | |
| 	if !ok {
 | |
| 		return nil, errors.Errorf("Unable to parse region location (no regioninfo column): %#v", rr)
 | |
| 	}
 | |
| 
 | |
| 	offset := bytes.Index(regionInfoCol.Value, []byte("PBUF")) + 4
 | |
| 	regionInfoBytes := regionInfoCol.Value[offset:]
 | |
| 
 | |
| 	var info proto.RegionInfo
 | |
| 	err := pb.Unmarshal(regionInfoBytes, &info)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Errorf("Unable to parse region location: %#v", err)
 | |
| 	}
 | |
| 
 | |
| 	ri := &RegionInfo{
 | |
| 		StartKey:       info.GetStartKey(),
 | |
| 		EndKey:         info.GetEndKey(),
 | |
| 		Name:           bytes.NewBuffer(rr.Row).String(),
 | |
| 		TableNamespace: string(info.GetTableName().GetNamespace()),
 | |
| 		TableName:      string(info.GetTableName().GetQualifier()),
 | |
| 		Offline:        info.GetOffline(),
 | |
| 		Split:          info.GetSplit(),
 | |
| 	}
 | |
| 
 | |
| 	if v, ok := rr.Columns["info:server"]; ok {
 | |
| 		ri.Server = string(v.Value)
 | |
| 	}
 | |
| 
 | |
| 	return ri, nil
 | |
| }
 | |
| 
 | |
| func (c *client) getMetaRegion() *RegionInfo {
 | |
| 	return &RegionInfo{
 | |
| 		StartKey: []byte{},
 | |
| 		EndKey:   []byte{},
 | |
| 		Name:     string(metaRegionName),
 | |
| 		Server:   serverNameToAddr(c.rootServerName),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *client) getCachedLocation(table, row []byte) *RegionInfo {
 | |
| 	c.mu.RLock()
 | |
| 	defer c.mu.RUnlock()
 | |
| 
 | |
| 	tableStr := string(table)
 | |
| 	if regions, ok := c.cachedRegionInfo[tableStr]; ok {
 | |
| 		for _, region := range regions {
 | |
| 			if (len(region.EndKey) == 0 ||
 | |
| 				bytes.Compare(row, region.EndKey) < 0) &&
 | |
| 				(len(region.StartKey) == 0 ||
 | |
| 					bytes.Compare(row, region.StartKey) >= 0) {
 | |
| 				return region
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (c *client) updateRegionCache(table []byte, region *RegionInfo) {
 | |
| 	c.mu.Lock()
 | |
| 	defer c.mu.Unlock()
 | |
| 
 | |
| 	tableStr := string(table)
 | |
| 	if _, ok := c.cachedRegionInfo[tableStr]; !ok {
 | |
| 		c.cachedRegionInfo[tableStr] = make(map[string]*RegionInfo)
 | |
| 	}
 | |
| 	c.cachedRegionInfo[tableStr][region.Name] = region
 | |
| }
 | |
| 
 | |
| func (c *client) CleanRegionCache(table []byte) {
 | |
| 	c.mu.Lock()
 | |
| 	defer c.mu.Unlock()
 | |
| 	delete(c.cachedRegionInfo, string(table))
 | |
| }
 | |
| 
 | |
| func (c *client) CleanAllRegionCache() {
 | |
| 	c.mu.Lock()
 | |
| 	defer c.mu.Unlock()
 | |
| 	c.cachedRegionInfo = map[string]map[string]*RegionInfo{}
 | |
| }
 | |
| 
 | |
| func (c *client) LocateRegion(table, row []byte, useCache bool) (*RegionInfo, error) {
 | |
| 	// If user wants to locate metaregion, just return it.
 | |
| 	if bytes.Equal(table, metaTableName) {
 | |
| 		return c.getMetaRegion(), nil
 | |
| 	}
 | |
| 
 | |
| 	// Try to get location from cache.
 | |
| 	if useCache {
 | |
| 		if r := c.getCachedLocation(table, row); r != nil {
 | |
| 			return r, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// If cache missed or not using cache, try to get and update region info.
 | |
| 	metaRegion := c.getMetaRegion()
 | |
| 	conn, err := c.getClientConn(metaRegion.Server)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	regionRow := c.createRegionName(table, row, beyondMaxTimestamp, true)
 | |
| 
 | |
| 	call := newCall(&proto.GetRequest{
 | |
| 		Region: &proto.RegionSpecifier{
 | |
| 			Type:  proto.RegionSpecifier_REGION_NAME.Enum(),
 | |
| 			Value: metaRegionName,
 | |
| 		},
 | |
| 		Get: &proto.Get{
 | |
| 			Row: regionRow,
 | |
| 			Column: []*proto.Column{&proto.Column{
 | |
| 				Family: []byte("info"),
 | |
| 			}},
 | |
| 			ClosestRowBefore: pb.Bool(true),
 | |
| 		},
 | |
| 	})
 | |
| 
 | |
| 	err = conn.call(call)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	response := <-call.responseCh
 | |
| 
 | |
| 	switch r := response.(type) {
 | |
| 	case *proto.GetResponse:
 | |
| 		res := r.GetResult()
 | |
| 		if res == nil {
 | |
| 			return nil, errors.Errorf("Empty region: [table=%s] [row=%q] [region_row=%q]", table, row, regionRow)
 | |
| 		}
 | |
| 
 | |
| 		rr := NewResultRow(res)
 | |
| 		region, err := c.parseRegion(rr)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.Trace(err)
 | |
| 		}
 | |
| 
 | |
| 		c.updateRegionCache(table, region)
 | |
| 		return region, nil
 | |
| 	case *exception:
 | |
| 		return nil, errors.New(r.msg)
 | |
| 	default:
 | |
| 		log.Warnf("Unknown response - %T - %v", r, r)
 | |
| 	}
 | |
| 
 | |
| 	return nil, errors.Errorf("Couldn't find the region: [table=%s] [row=%q] [region_row=%q]", table, row, regionRow)
 | |
| }
 | |
| 
 | |
| func (c *client) GetRegions(table []byte, useCache bool) ([]*RegionInfo, error) {
 | |
| 	var regions []*RegionInfo
 | |
| 	startKey := []byte("")
 | |
| 	// Get first region.
 | |
| 	region, err := c.LocateRegion(table, []byte(startKey), useCache)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Errorf("couldn't find any region: [table=%s] [useCache=%t]", table, useCache)
 | |
| 	}
 | |
| 	regions = append(regions, region)
 | |
| 	startKey = region.EndKey
 | |
| 	for len(startKey) > 0 {
 | |
| 		region, err = c.LocateRegion(table, []byte(startKey), useCache)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.Trace(err)
 | |
| 		}
 | |
| 		regions = append(regions, region)
 | |
| 		startKey = region.EndKey
 | |
| 	}
 | |
| 	return regions, nil
 | |
| }
 | |
| 
 | |
| func (c *client) Close() error {
 | |
| 	if c.zkClient != nil {
 | |
| 		c.zkClient.Close()
 | |
| 	}
 | |
| 
 | |
| 	for _, conn := range c.cachedConns {
 | |
| 		err := conn.close()
 | |
| 		if err != nil {
 | |
| 			return errors.Trace(err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |