Tune RocksDB options and measure memory usage

pull/56/head
Martin Boehm 2018-08-22 16:20:52 +02:00
parent 59497e3c97
commit 41252d33d2
2 changed files with 106 additions and 75 deletions

View File

@ -6,6 +6,7 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"math/big"
"os"
"path/filepath"
@ -42,6 +43,7 @@ type RocksDB struct {
chainParser bchain.BlockChainParser
is *common.InternalState
metrics *common.Metrics
cache *gorocksdb.Cache
}
const (
@ -56,57 +58,49 @@ const (
var cfNames = []string{"default", "height", "addresses", "txAddresses", "addressBalance", "blockTxs", "transactions"}
func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
c := gorocksdb.NewLRUCache(8 << 30) // 8GB
fp := gorocksdb.NewBloomFilter(10)
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockSize(16 << 10) // 16kB
bbto.SetBlockCache(c)
bbto.SetFilterPolicy(fp)
func openDB(path string, c *gorocksdb.Cache) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
bloom := gorocksdb.NewBloomFilter(10)
blockOpts := gorocksdb.NewDefaultBlockBasedTableOptions()
blockOpts.SetBlockSize(16 << 10) // 16kB
blockOpts.SetBlockCache(c)
blockOpts.SetFilterPolicy(bloom)
blockOpts.SetCacheIndexAndFilterBlocks(true)
blockOpts.SetPinL0FilterAndIndexBlocksInCache(true)
optsNoCompression := gorocksdb.NewDefaultOptions()
optsNoCompression.SetBlockBasedTableFactory(bbto)
optsNoCompression.SetCreateIfMissing(true)
optsNoCompression.SetCreateIfMissingColumnFamilies(true)
optsNoCompression.SetMaxBackgroundCompactions(4)
optsNoCompression.SetMaxBackgroundFlushes(2)
optsNoCompression.SetBytesPerSync(1 << 20) // 1MB
optsNoCompression.SetWriteBufferSize(1 << 27) // 128MB
optsNoCompression.SetMaxOpenFiles(25000)
optsNoCompression.SetCompression(gorocksdb.NoCompression)
optsLZ4 := gorocksdb.NewDefaultOptions()
optsLZ4.SetBlockBasedTableFactory(bbto)
optsLZ4.SetCreateIfMissing(true)
optsLZ4.SetCreateIfMissingColumnFamilies(true)
optsLZ4.SetMaxBackgroundCompactions(4)
optsLZ4.SetMaxBackgroundFlushes(2)
optsLZ4.SetBytesPerSync(1 << 20) // 1MB
optsLZ4.SetWriteBufferSize(1 << 27) // 128MB
optsLZ4.SetMaxOpenFiles(25000)
optsLZ4.SetCompression(gorocksdb.LZ4HCCompression)
opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(blockOpts)
opts.SetCreateIfMissing(true)
opts.SetCreateIfMissingColumnFamilies(true)
opts.SetMaxBackgroundCompactions(6)
opts.SetMaxBackgroundFlushes(6)
opts.SetBytesPerSync(1 << 20) // 1MB
opts.SetWriteBufferSize(1 << 27) // 128MB
opts.SetMaxOpenFiles(25000)
opts.SetCompression(gorocksdb.LZ4HCCompression)
// opts for addresses are different:
// no bloom filter - from documentation: If most of your queries are executed using iterators, you shouldn't set bloom filter
bbtoAddresses := gorocksdb.NewDefaultBlockBasedTableOptions()
bbtoAddresses.SetBlockSize(16 << 10) // 16kB
bbtoAddresses.SetBlockCache(c) // 8GB
blockOptsAddress := gorocksdb.NewDefaultBlockBasedTableOptions()
blockOptsAddress.SetBlockSize(16 << 10) // 16kB
blockOptsAddress.SetBlockCache(c) // 8GB
blockOptsAddress.SetCacheIndexAndFilterBlocks(true)
blockOptsAddress.SetPinL0FilterAndIndexBlocksInCache(true)
optsAddresses := gorocksdb.NewDefaultOptions()
optsAddresses.SetBlockBasedTableFactory(bbtoAddresses)
optsAddresses.SetBlockBasedTableFactory(blockOptsAddress)
optsAddresses.SetCreateIfMissing(true)
optsAddresses.SetCreateIfMissingColumnFamilies(true)
optsAddresses.SetMaxBackgroundCompactions(4)
optsAddresses.SetMaxBackgroundFlushes(2)
optsAddresses.SetMaxBackgroundCompactions(6)
optsAddresses.SetMaxBackgroundFlushes(6)
optsAddresses.SetBytesPerSync(1 << 20) // 1MB
optsAddresses.SetWriteBufferSize(1 << 27) // 128MB
optsAddresses.SetMaxOpenFiles(25000)
optsAddresses.SetCompression(gorocksdb.LZ4HCCompression)
// default, height, addresses, txAddresses, addressBalance, blockTxids, transactions
fcOptions := []*gorocksdb.Options{optsLZ4, optsLZ4, optsAddresses, optsLZ4, optsLZ4, optsLZ4, optsLZ4}
fcOptions := []*gorocksdb.Options{opts, opts, optsAddresses, opts, opts, opts, opts}
db, cfh, err := gorocksdb.OpenDbColumnFamilies(optsNoCompression, path, cfNames, fcOptions)
db, cfh, err := gorocksdb.OpenDbColumnFamilies(opts, path, cfNames, fcOptions)
if err != nil {
return nil, nil, err
}
@ -117,11 +111,11 @@ func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error)
// needs to be called to release it.
func NewRocksDB(path string, parser bchain.BlockChainParser, metrics *common.Metrics) (d *RocksDB, err error) {
glog.Infof("rocksdb: open %s, version %v", path, dbVersion)
db, cfh, err := openDB(path)
c := gorocksdb.NewLRUCache(8 << 30) // 8GB
db, cfh, err := openDB(path, c)
wo := gorocksdb.NewDefaultWriteOptions()
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics}, nil
return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c}, nil
}
func (d *RocksDB) closeDB() error {
@ -159,7 +153,7 @@ func (d *RocksDB) Reopen() error {
return err
}
d.db = nil
db, cfh, err := openDB(d.path)
db, cfh, err := openDB(d.path, d.cache)
if err != nil {
return err
}
@ -167,6 +161,34 @@ func (d *RocksDB) Reopen() error {
return nil
}
func (d *RocksDB) GetMemoryStats() string {
type columnStats struct {
name string
indexAndFilter string
memtable string
}
cs := make([]columnStats, len(cfNames))
for i := 0; i < len(cfNames); i++ {
cs[i].name = cfNames[i]
cs[i].indexAndFilter = d.db.GetPropertyCF("rocksdb.estimate-table-readers-mem", d.cfh[i])
cs[i].memtable = d.db.GetPropertyCF("rocksdb.cur-size-all-mem-tables", d.cfh[i])
}
m := struct {
cacheUsage int
pinnedCacheUsage int
indexAndFilter string
memtable string
columns []columnStats
}{
cacheUsage: d.cache.GetUsage(),
pinnedCacheUsage: d.cache.GetPinnedUsage(),
indexAndFilter: d.db.GetProperty("rocksdb.estimate-table-readers-mem"),
memtable: d.db.GetProperty("rocksdb.cur-size-all-mem-tables"),
columns: cs,
}
return fmt.Sprintf("%+v", m)
}
// GetTransactions finds all input/output transactions for address
// Transaction are passed to callback function.
func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn func(txid string, vout uint32, isOutput bool) error) (err error) {
@ -308,8 +330,8 @@ type BulkConnect struct {
}
const (
maxBulkAddresses = 300000
maxBulkTxAddresses = 2000000
maxBulkAddresses = 400000
maxBulkTxAddresses = 1500000
partialStoreAddresses = maxBulkTxAddresses / 10
maxBulkBalances = 2500000
partialStoreBalances = maxBulkBalances / 10
@ -445,40 +467,36 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
if !b.isUTXO {
return b.d.ConnectBlock(block)
}
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
addresses := make(map[string][]outpoint)
if err := b.d.processAddressesUTXO(block, addresses, b.txAddressesMap, b.balances); err != nil {
return err
}
start := time.Now()
var sa bool
var storeAddressesChan, storeBalancesChan chan error
if len(b.txAddressesMap) > maxBulkTxAddresses || len(b.balances) > maxBulkBalances {
sa = true
if len(b.txAddressesMap)+partialStoreAddresses > maxBulkTxAddresses {
storeAddressesChan = make(chan error)
go b.parallelStoreTxAddresses(storeAddressesChan, false)
}
if len(b.balances)+partialStoreBalances > maxBulkBalances {
storeBalancesChan = make(chan error)
go b.parallelStoreBalances(storeBalancesChan, false)
}
}
b.bulkAddresses = append(b.bulkAddresses, bulkAddresses{
height: block.Height,
addresses: addresses,
})
b.bulkAddressesCount += len(addresses)
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
start := time.Now()
var count, count1, storeType int
var err error
const (
storeNone = iota
storeTxAddresses
storeBalances
storeBulkAddresses
)
// in one block store maximally one type of data
if len(b.txAddressesMap) > maxBulkTxAddresses {
storeType = storeTxAddresses
count, count1, err = b.storeTxAddresses(wb, false)
} else if len(b.balances) > maxBulkBalances {
storeType = storeBalances
count, err = b.storeBalances(wb, false)
} else if b.bulkAddressesCount > maxBulkAddresses {
storeType = storeBulkAddresses
count = b.bulkAddressesCount
err = b.storeBulkAddresses(wb)
}
if err != nil {
return err
bac := b.bulkAddressesCount
if sa || b.bulkAddressesCount > maxBulkAddresses {
if err := b.storeBulkAddresses(wb); err != nil {
return err
}
}
if storeBlockTxs {
if err := b.d.storeAndCleanupBlockTxs(wb, block); err != nil {
@ -491,13 +509,18 @@ func (b *BulkConnect) ConnectBlock(block *bchain.Block, storeBlockTxs bool) erro
if err := b.d.db.Write(b.d.wo, wb); err != nil {
return err
}
switch storeType {
case storeTxAddresses:
glog.Info("rocksdb: height ", b.height, ", stored ", count, " (", count1, " spent) txAddresses, ", len(b.txAddressesMap), " remaining, done in ", time.Since(start))
case storeBalances:
glog.Info("rocksdb: height ", b.height, ", stored ", count, " balances, ", len(b.balances), " remaining, done in ", time.Since(start))
case storeBulkAddresses:
glog.Info("rocksdb: height ", b.height, ", stored ", count, " addresses, done in ", time.Since(start))
if bac > b.bulkAddressesCount {
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start))
}
if storeAddressesChan != nil {
if err := <-storeAddressesChan; err != nil {
return err
}
}
if storeBalancesChan != nil {
if err := <-storeBalancesChan; err != nil {
return err
}
}
return nil
}
@ -1628,9 +1651,12 @@ func (d *RocksDB) storeState(is *common.InternalState) error {
func (d *RocksDB) computeColumnSize(col int, stopCompute chan os.Signal) (int64, int64, int64, error) {
var rows, keysSum, valuesSum int64
var seekKey []byte
// do not use cache
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
for {
var key []byte
it := d.db.NewIteratorCF(d.ro, d.cfh[col])
it := d.db.NewIteratorCF(ro, d.cfh[col])
if rows == 0 {
it.SeekToFirst()
} else {

View File

@ -286,6 +286,7 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
go writeBlockWorker()
var hash string
start := time.Now()
msTime := time.Now().Add(1 * time.Minute)
ConnectLoop:
for h := lower; h <= higher; {
select {
@ -305,6 +306,10 @@ ConnectLoop:
glog.Info("connecting block ", h, " ", hash, ", elapsed ", time.Since(start))
start = time.Now()
}
if msTime.Before(time.Now()) {
glog.Info(w.db.GetMemoryStats())
msTime = time.Now().Add(1 * time.Minute)
}
h++
}
}