diff --git a/blockbook.go b/blockbook.go index a15d9546..90627325 100644 --- a/blockbook.go +++ b/blockbook.go @@ -125,8 +125,13 @@ func main() { } defer index.Close() + syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics) + if err != nil { + glog.Fatalf("NewSyncWorker %v", err) + } + if *rollbackHeight >= 0 { - bestHeight, _, err := index.GetBestBlock() + bestHeight, bestHash, err := index.GetBestBlock() if err != nil { glog.Error("rollbackHeight: ", err) return @@ -134,7 +139,16 @@ func main() { if uint32(*rollbackHeight) > bestHeight { glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight) } else { - err = index.DisconnectBlocks(uint32(*rollbackHeight), bestHeight) + hashes := []string{bestHash} + for height := bestHeight - 1; height >= uint32(*rollbackHeight); height-- { + hash, err := index.GetBlockHash(height) + if err != nil { + glog.Error("rollbackHeight: ", err) + return + } + hashes = append(hashes, hash) + } + err = syncWorker.DisconnectBlocks(uint32(*rollbackHeight), bestHeight, hashes) if err != nil { glog.Error("rollbackHeight: ", err) return @@ -168,11 +182,6 @@ func main() { }() } - syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics) - if err != nil { - glog.Fatalf("NewSyncWorker %v", err) - } - if *synchronize { if err := syncWorker.ResyncIndex(nil); err != nil { glog.Error("resyncIndex ", err) diff --git a/db/rocksdb.go b/db/rocksdb.go index 34e6b09e..2098b77c 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -235,11 +235,7 @@ type outpoint struct { vout uint32 } -func (d *RocksDB) writeOutputs( - wb *gorocksdb.WriteBatch, - block *bchain.Block, - op int, -) error { +func (d *RocksDB) writeOutputs(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error { records := make(map[string][]outpoint) for _, tx := range block.Txs { @@ -253,6 +249,14 @@ func (d *RocksDB) writeOutputs( txid: tx.Txid, vout: output.N, }) + if op == opDelete { + // remove transactions from cache + b, err := packTxid(tx.Txid) + if err != nil { + return err + } + wb.DeleteCF(d.cfh[cfTransactions], b) + } } } } @@ -448,12 +452,10 @@ func (d *RocksDB) writeHeight( return nil } -// DisconnectBlocks removes all data belonging to blocks in range lower-higher -func (d *RocksDB) DisconnectBlocks( - lower uint32, - higher uint32, -) error { - glog.Infof("rocksdb: disconnecting blocks %d-%d", lower, higher) +// DisconnectBlocksFullScan removes all data belonging to blocks in range lower-higher +// it finds the data by doing full scan of outputs column, therefore it is quite slow +func (d *RocksDB) DisconnectBlocksFullScan(lower uint32, higher uint32) error { + glog.Infof("db: disconnecting blocks %d-%d using full scan", lower, higher) outputKeys := [][]byte{} outputValues := [][]byte{} var totalOutputs, count uint64 @@ -506,6 +508,7 @@ func (d *RocksDB) DisconnectBlocks( return err } for _, o := range outpoints { + // delete from inputs boutpoint, err := packOutpoint(o.txid, o.vout) if err != nil { return err @@ -514,6 +517,12 @@ func (d *RocksDB) DisconnectBlocks( glog.Info("input ", hex.EncodeToString(boutpoint)) } wb.DeleteCF(d.cfh[cfInputs], boutpoint) + // delete from txCache + b, err := packTxid(o.txid) + if err != nil { + return err + } + wb.DeleteCF(d.cfh[cfTransactions], b) } } for height := lower; height <= higher; height++ { diff --git a/db/sync.go b/db/sync.go index fc778c87..517d0f6e 100644 --- a/db/sync.go +++ b/db/sync.go @@ -4,6 +4,7 @@ import ( "blockbook/bchain" "blockbook/common" "os" + "strings" "sync" "sync/atomic" "time" @@ -66,6 +67,10 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { return err } +func isError(err error, s string) bool { + return strings.Contains(err.Error(), s) +} + func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { remote, err := w.chain.GetBestBlockHash() if err != nil { @@ -89,7 +94,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { header, err = w.chain.GetBlockHeader(local) forked := false if err != nil { - if e, ok := err.(*bchain.RPCError); ok && e.Message == "Block not found" { + if isError(err, "Block not found") { forked = true } else { return err @@ -104,6 +109,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { // find and disconnect forked blocks and then synchronize again glog.Info("resync: local is forked") var height uint32 + hashes := []string{local} for height = localBestHeight - 1; height >= 0; height-- { local, err = w.db.GetBlockHash(height) if err != nil { @@ -116,8 +122,9 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { if local == remote { break } + hashes = append(hashes, local) } - err = w.db.DisconnectBlocks(height+1, localBestHeight) + err = w.DisconnectBlocks(height+1, localBestHeight, hashes) if err != nil { return err } @@ -269,7 +276,7 @@ func (w *SyncWorker) connectBlockChunk(lower, higher uint32) error { connected, err := w.isBlockConnected(higher) if err != nil || connected { // if higher is over the best block, continue with lower block, otherwise return error - if e, ok := err.(*bchain.RPCError); !ok || e.Message != "Block height out of range" { + if isError(err, "Block height out of range") { return err } } @@ -319,7 +326,7 @@ func (w *SyncWorker) ConnectBlocksParallelInChunks(lower, higher uint32) error { } err := w.connectBlockChunk(low, high) if err != nil { - if e, ok := err.(*bchain.RPCError); ok && (e.Message == "Block height out of range" || e.Message == "Block not found") { + if isError(err, "Block height out of range") || isError(err, "Block not found") { break } glog.Fatalf("connectBlocksParallel %d-%d %v", low, high, err) @@ -373,3 +380,28 @@ func (w *SyncWorker) getBlockChain(hash string, out chan blockResult, done chan out <- blockResult{block: block} } } + +// DisconnectBlocks removes all data belonging to blocks in range lower-higher, +// using block data from blockchain, if they are available, +// otherwise doing full scan +func (w *SyncWorker) DisconnectBlocks(lower uint32, higher uint32, hashes []string) error { + glog.Infof("sync: disconnecting blocks %d-%d", lower, higher) + blocks := make([]*bchain.Block, len(hashes)) + var err error + // get all blocks first to see if we can avoid full scan + for i, hash := range hashes { + blocks[i], err = w.chain.GetBlock(hash, 0) + if err != nil { + // cannot get block, do full range scan + return w.db.DisconnectBlocksFullScan(lower, higher) + } + } + // then disconnect one after another + for i, block := range blocks { + glog.Info("Disconnecting block ", (int(higher) - i), " ", block.Hash) + if err = w.db.DisconnectBlock(block); err != nil { + return err + } + } + return nil +}