Use block data in disconnect blocks, remove disconnected txs from cache

indexv1
Martin Boehm 2018-03-14 12:34:13 +01:00
parent f03131eb79
commit 96bfdfd74e
3 changed files with 72 additions and 22 deletions

View File

@ -125,8 +125,13 @@ func main() {
}
defer index.Close()
syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics)
if err != nil {
glog.Fatalf("NewSyncWorker %v", err)
}
if *rollbackHeight >= 0 {
bestHeight, _, err := index.GetBestBlock()
bestHeight, bestHash, err := index.GetBestBlock()
if err != nil {
glog.Error("rollbackHeight: ", err)
return
@ -134,7 +139,16 @@ func main() {
if uint32(*rollbackHeight) > bestHeight {
glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight)
} else {
err = index.DisconnectBlocks(uint32(*rollbackHeight), bestHeight)
hashes := []string{bestHash}
for height := bestHeight - 1; height >= uint32(*rollbackHeight); height-- {
hash, err := index.GetBlockHash(height)
if err != nil {
glog.Error("rollbackHeight: ", err)
return
}
hashes = append(hashes, hash)
}
err = syncWorker.DisconnectBlocks(uint32(*rollbackHeight), bestHeight, hashes)
if err != nil {
glog.Error("rollbackHeight: ", err)
return
@ -168,11 +182,6 @@ func main() {
}()
}
syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics)
if err != nil {
glog.Fatalf("NewSyncWorker %v", err)
}
if *synchronize {
if err := syncWorker.ResyncIndex(nil); err != nil {
glog.Error("resyncIndex ", err)

View File

@ -235,11 +235,7 @@ type outpoint struct {
vout uint32
}
func (d *RocksDB) writeOutputs(
wb *gorocksdb.WriteBatch,
block *bchain.Block,
op int,
) error {
func (d *RocksDB) writeOutputs(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
records := make(map[string][]outpoint)
for _, tx := range block.Txs {
@ -253,6 +249,14 @@ func (d *RocksDB) writeOutputs(
txid: tx.Txid,
vout: output.N,
})
if op == opDelete {
// remove transactions from cache
b, err := packTxid(tx.Txid)
if err != nil {
return err
}
wb.DeleteCF(d.cfh[cfTransactions], b)
}
}
}
}
@ -448,12 +452,10 @@ func (d *RocksDB) writeHeight(
return nil
}
// DisconnectBlocks removes all data belonging to blocks in range lower-higher
func (d *RocksDB) DisconnectBlocks(
lower uint32,
higher uint32,
) error {
glog.Infof("rocksdb: disconnecting blocks %d-%d", lower, higher)
// DisconnectBlocksFullScan removes all data belonging to blocks in range lower-higher
// it finds the data by doing full scan of outputs column, therefore it is quite slow
func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error {
glog.Infof("db: disconnecting blocks %d-%d using full scan", lower, higher)
outputKeys := [][]byte{}
outputValues := [][]byte{}
var totalOutputs, count uint64
@ -506,6 +508,7 @@ func (d *RocksDB) DisconnectBlocks(
return err
}
for _, o := range outpoints {
// delete from inputs
boutpoint, err := packOutpoint(o.txid, o.vout)
if err != nil {
return err
@ -514,6 +517,12 @@ func (d *RocksDB) DisconnectBlocks(
glog.Info("input ", hex.EncodeToString(boutpoint))
}
wb.DeleteCF(d.cfh[cfInputs], boutpoint)
// delete from txCache
b, err := packTxid(o.txid)
if err != nil {
return err
}
wb.DeleteCF(d.cfh[cfTransactions], b)
}
}
for height := lower; height <= higher; height++ {

View File

@ -4,6 +4,7 @@ import (
"blockbook/bchain"
"blockbook/common"
"os"
"strings"
"sync"
"sync/atomic"
"time"
@ -66,6 +67,10 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
return err
}
func isError(err error, s string) bool {
return strings.Contains(err.Error(), s)
}
func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
remote, err := w.chain.GetBestBlockHash()
if err != nil {
@ -89,7 +94,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
header, err = w.chain.GetBlockHeader(local)
forked := false
if err != nil {
if e, ok := err.(*bchain.RPCError); ok && e.Message == "Block not found" {
if isError(err, "Block not found") {
forked = true
} else {
return err
@ -104,6 +109,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
// find and disconnect forked blocks and then synchronize again
glog.Info("resync: local is forked")
var height uint32
hashes := []string{local}
for height = localBestHeight - 1; height >= 0; height-- {
local, err = w.db.GetBlockHash(height)
if err != nil {
@ -116,8 +122,9 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
if local == remote {
break
}
hashes = append(hashes, local)
}
err = w.db.DisconnectBlocks(height+1, localBestHeight)
err = w.DisconnectBlocks(height+1, localBestHeight, hashes)
if err != nil {
return err
}
@ -269,7 +276,7 @@ func (w *SyncWorker) connectBlockChunk(lower, higher uint32) error {
connected, err := w.isBlockConnected(higher)
if err != nil || connected {
// if higher is over the best block, continue with lower block, otherwise return error
if e, ok := err.(*bchain.RPCError); !ok || e.Message != "Block height out of range" {
if isError(err, "Block height out of range") {
return err
}
}
@ -319,7 +326,7 @@ func (w *SyncWorker) ConnectBlocksParallelInChunks(lower, higher uint32) error {
}
err := w.connectBlockChunk(low, high)
if err != nil {
if e, ok := err.(*bchain.RPCError); ok && (e.Message == "Block height out of range" || e.Message == "Block not found") {
if isError(err, "Block height out of range") || isError(err, "Block not found") {
break
}
glog.Fatalf("connectBlocksParallel %d-%d %v", low, high, err)
@ -373,3 +380,28 @@ func (w *SyncWorker) getBlockChain(hash string, out chan blockResult, done chan
out <- blockResult{block: block}
}
}
// DisconnectBlocks removes all data belonging to blocks in range lower-higher,
// using block data from blockchain, if they are available,
// otherwise doing full scan
func (w *SyncWorker) DisconnectBlocks(lower uint32, higher uint32, hashes []string) error {
glog.Infof("sync: disconnecting blocks %d-%d", lower, higher)
blocks := make([]*bchain.Block, len(hashes))
var err error
// get all blocks first to see if we can avoid full scan
for i, hash := range hashes {
blocks[i], err = w.chain.GetBlock(hash, 0)
if err != nil {
// cannot get block, do full range scan
return w.db.DisconnectBlocksFullScan(lower, higher)
}
}
// then disconnect one after another
for i, block := range blocks {
glog.Info("Disconnecting block ", (int(higher) - i), " ", block.Hash)
if err = w.db.DisconnectBlock(block); err != nil {
return err
}
}
return nil
}