Tune RocksDB.

Remove bulk import - it consumes a lot of disk space.
Set no compression of data.
Reduce write buffer to .5GB from 2GB.
indexv1
Martin Boehm 2018-02-24 16:25:55 +01:00
parent 1c8386f05d
commit 7a1ff7241f
2 changed files with 11 additions and 86 deletions

View File

@ -132,7 +132,7 @@ func main() {
}
if *synchronize {
if err := resyncIndex(true, nil); err != nil {
if err := resyncIndex(nil); err != nil {
glog.Fatal("resyncIndex ", err)
}
}
@ -263,7 +263,7 @@ func syncIndexLoop() {
glog.Info("syncIndexLoop starting")
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
tickAndDebounce(resyncIndexPeriodMs*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
if err := resyncIndex(false, onNewBlockHash); err != nil {
if err := resyncIndex(onNewBlockHash); err != nil {
glog.Error("syncIndexLoop", err)
}
})
@ -341,7 +341,7 @@ func printResult(txid string, vout uint32, isOutput bool) error {
return nil
}
func resyncIndex(bulk bool, onNewBlock func(hash string)) error {
func resyncIndex(onNewBlock func(hash string)) error {
remote, err := chain.GetBestBlockHash()
if err != nil {
return err
@ -396,7 +396,7 @@ func resyncIndex(bulk bool, onNewBlock func(hash string)) error {
if err != nil {
return err
}
return resyncIndex(false, onNewBlock)
return resyncIndex(onNewBlock)
}
}
@ -406,8 +406,6 @@ func resyncIndex(bulk bool, onNewBlock func(hash string)) error {
glog.Info("resync: local is behind")
hash = header.Next
startHeight = localBestHeight
// bulk load is allowed only for empty db, otherwise we could get rocksdb "error db has more levels than options.num_levels"
bulk = false
} else {
// If the local block is missing, we're indexing from the genesis block
// or from the start block specified by flags
@ -434,14 +432,13 @@ func resyncIndex(bulk bool, onNewBlock func(hash string)) error {
startHeight,
chainBestHeight,
*syncWorkers,
bulk,
)
if err != nil {
return err
}
// after parallel load finish the sync using standard way,
// new blocks may have been created in the meantime
return resyncIndex(false, onNewBlock)
return resyncIndex(onNewBlock)
}
}
@ -481,16 +478,8 @@ func connectBlocksParallel(
lower uint32,
higher uint32,
numWorkers int,
bulk bool,
) error {
var err error
if bulk {
err = index.ReopenWithBulk(true)
if err != nil {
return err
}
}
var wg sync.WaitGroup
hch := make(chan string, numWorkers)
running := make([]bool, numWorkers)
@ -529,38 +518,10 @@ func connectBlocksParallel(
hch <- hash
if h > 0 && h%1000 == 0 {
glog.Info("connecting block ", h, " ", hash)
if bulk && *compactDBTriggerMB > 0 {
size, err := index.DatabaseSizeOnDisk()
if err != nil {
break
}
if size > *compactDBTriggerMB*1048576 {
// wait for the workers to finish block
WaitAgain:
for {
for _, r := range running {
if r {
glog.Info("Waiting ", running)
time.Sleep(time.Millisecond * 500)
continue WaitAgain
}
}
break
}
if err = index.CompactDatabase(bulk); err != nil {
break
}
}
}
}
}
close(hch)
wg.Wait()
if err == nil && bulk {
err = index.ReopenWithBulk(false)
}
return err
}

View File

@ -43,7 +43,7 @@ const (
var cfNames = []string{"default", "height", "outputs", "inputs"}
func openDB(path string, bulk bool) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
func openDB(path string) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
c := gorocksdb.NewLRUCache(8 << 30) // 8 gb
fp := gorocksdb.NewBloomFilter(10)
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
@ -58,8 +58,9 @@ func openDB(path string, bulk bool) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHan
opts.SetMaxBackgroundCompactions(4)
opts.SetMaxBackgroundFlushes(2)
opts.SetBytesPerSync(1 << 20) // 1mb
opts.SetWriteBufferSize(2 << 30) // 2 gb
opts.SetWriteBufferSize(1 << 29) // .5 gb
opts.SetMaxOpenFiles(25000)
opts.SetCompression(gorocksdb.NoCompression)
// opts for outputs are different:
// no bloom filter - from documentation: If most of your queries are executed using iterators, you shouldn't set bloom filter
@ -74,13 +75,9 @@ func openDB(path string, bulk bool) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHan
optsOutputs.SetMaxBackgroundCompactions(4)
optsOutputs.SetMaxBackgroundFlushes(2)
optsOutputs.SetBytesPerSync(1 << 20) // 1mb
optsOutputs.SetWriteBufferSize(2 << 30) // 2 gb
optsOutputs.SetWriteBufferSize(1 << 29) // 0.5 gb
optsOutputs.SetMaxOpenFiles(25000)
if bulk {
opts.PrepareForBulkLoad()
optsOutputs.PrepareForBulkLoad()
}
optsOutputs.SetCompression(gorocksdb.NoCompression)
fcOptions := []*gorocksdb.Options{opts, opts, optsOutputs, opts}
@ -95,7 +92,7 @@ func openDB(path string, bulk bool) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHan
// needs to be called to release it.
func NewRocksDB(path string) (d *RocksDB, err error) {
glog.Infof("rocksdb: open %s", path)
db, cfh, err := openDB(path, false)
db, cfh, err := openDB(path)
wo := gorocksdb.NewDefaultWriteOptions()
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
@ -527,39 +524,6 @@ func (d *RocksDB) DatabaseSizeOnDisk() (int64, error) {
return dirSize(d.path)
}
// CompactDatabase compacts the database
// After unsuccessful experiment with CompactRange method (slow and actually fragmenting the db without compacting)
// the method now closes the db instance and opens it again.
// This means that during compact nobody can access the dababase!
func (d *RocksDB) CompactDatabase(bulk bool) error {
size, _ := dirSize(d.path)
glog.Info("Compacting database, db size ", size)
if err := d.ReopenWithBulk(bulk); err != nil {
return err
}
size, _ = dirSize(d.path)
glog.Info("Compacting database finished, db size ", size)
return nil
}
// ReopenWithBulk reopens the database with different settings:
// if bulk==true, reopens for bulk load
// if bulk==false, reopens for normal operation
// It closes and reopens db, nobody can access the database during the operation!
func (d *RocksDB) ReopenWithBulk(bulk bool) error {
err := d.closeDB()
if err != nil {
return err
}
d.db = nil
db, cfh, err := openDB(d.path, bulk)
if err != nil {
return err
}
d.db, d.cfh = db, cfh
return nil
}
// Helpers
const txIdUnpackedLen = 32