Rework resyncIndex to handle eth rollbacks

indexv1
Martin Boehm 2018-04-09 15:43:42 +02:00
parent cc2c7d5112
commit a117bd2abd
1 changed files with 64 additions and 73 deletions

View File

@ -41,7 +41,7 @@ func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk
}, nil
}
var synced = errors.New("synced")
var errSynced = errors.New("synced")
// ResyncIndex synchronizes index to the top of the blockchain
// onNewBlock is called when new block is connected, but not in initial parallel sync
@ -57,7 +57,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
w.metrics.IndexResyncDuration.Observe(float64(d) / 1e6) // in milliseconds
w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk()))
fallthrough
case synced:
case errSynced:
// this is not actually error but flag that resync wasn't necessary
return nil
}
@ -68,90 +68,54 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
}
func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
remote, err := w.chain.GetBestBlockHash()
remoteBestHash, err := w.chain.GetBestBlockHash()
if err != nil {
return err
}
localBestHeight, local, err := w.db.GetBestBlock()
localBestHeight, localBestHash, err := w.db.GetBestBlock()
if err != nil {
local = ""
return err
}
// If the locally indexed block is the same as the best block on the
// network, we're done.
if local == remote {
glog.Infof("resync: synced on %d %s", localBestHeight, local)
return synced
// If the locally indexed block is the same as the best block on the network, we're done.
if localBestHash == remoteBestHash {
glog.Infof("resync: synced at %d %s", localBestHeight, localBestHash)
return errSynced
}
var header *bchain.BlockHeader
if local != "" {
// Is local tip on the best chain?
header, err = w.chain.GetBlockHeader(local)
forked := false
if err != nil {
if err == bchain.ErrBlockNotFound {
forked = true
} else {
return err
}
} else {
if header.Confirmations < 0 {
forked = true
}
}
if forked {
// find and disconnect forked blocks and then synchronize again
glog.Info("resync: local is forked")
var height uint32
hashes := []string{local}
for height = localBestHeight - 1; height >= 0; height-- {
local, err = w.db.GetBlockHash(height)
if err != nil {
return err
}
remote, err = w.chain.GetBlockHash(height)
if err != nil {
return err
}
if local == remote {
break
}
hashes = append(hashes, local)
}
err = w.DisconnectBlocks(height+1, localBestHeight, hashes)
if err != nil {
return err
}
return w.resyncIndex(onNewBlock)
}
}
if header != nil {
glog.Info("resync: local is behind")
w.startHash = header.Next
w.startHeight = localBestHeight + 1
} else {
// If the local block is missing, we're indexing from the genesis block
// or from the start block specified by flags
glog.Info("resync: genesis from block ", w.startHeight)
w.startHash, err = w.chain.GetBlockHash(w.startHeight)
if err != nil {
if localBestHash != "" {
remoteHash, err := w.chain.GetBlockHash(localBestHeight)
// for some coins (eth) remote can be at lower best height after rollback
if err != nil && err != bchain.ErrBlockNotFound {
return err
}
if remoteHash != localBestHash {
// forked - the remote hash differs from the local hash at the same height
glog.Info("resync: local is forked at height ", localBestHeight, ", local hash ", localBestHash, ", remote hash", remoteHash)
return w.handleFork(localBestHeight, localBestHash, onNewBlock)
}
glog.Info("resync: local at ", localBestHeight, " is behind")
w.startHeight = localBestHeight + 1
} else {
// database is empty, start genesis
glog.Info("resync: genesis from block ", w.startHeight)
}
w.startHash, err = w.chain.GetBlockHash(w.startHeight)
if err != nil {
return err
}
// if parallel operation is enabled and the number of blocks to be connected is large,
// use parallel routine to load majority of blocks
if w.syncWorkers > 1 {
chainBestHeight, err := w.chain.GetBestBlockHeight()
remoteBestHeight, err := w.chain.GetBestBlockHeight()
if err != nil {
return err
}
if chainBestHeight-w.startHeight > uint32(w.syncChunk) {
glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, chainBestHeight, w.syncWorkers)
err = w.connectBlocksParallel(w.startHeight, chainBestHeight)
if remoteBestHeight < w.startHeight {
glog.Error("resync: error - remote best height ", remoteBestHeight, " less than sync start height ", w.startHeight)
return errors.New("resync: remote best height error")
}
if remoteBestHeight-w.startHeight > uint32(w.syncChunk) {
glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers)
err = w.connectBlocksParallel(w.startHeight, remoteBestHeight)
if err != nil {
return err
}
@ -160,10 +124,37 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
return w.resyncIndex(onNewBlock)
}
}
return w.connectBlocks(onNewBlock)
}
func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock func(hash string)) error {
// find forked blocks, disconnect them and then synchronize again
var height uint32
hashes := []string{localBestHash}
for height = localBestHeight - 1; height >= 0; height-- {
local, err := w.db.GetBlockHash(height)
if err != nil {
return err
}
if local == "" {
break
}
remote, err := w.chain.GetBlockHash(height)
// for some coins (eth) remote can be at lower best height after rollback
if err != nil && err != bchain.ErrBlockNotFound {
return err
}
if local == remote {
break
}
hashes = append(hashes, local)
}
if err := w.DisconnectBlocks(height+1, localBestHeight, hashes); err != nil {
return err
}
return w.resyncIndex(onNewBlock)
}
func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error {
bch := make(chan blockResult, 8)
done := make(chan struct{})
@ -187,7 +178,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error {
}
if lastRes.block != nil {
glog.Infof("resync: synced on %d %s", lastRes.block.Height, lastRes.block.Hash)
glog.Infof("resync: synced at %d %s", lastRes.block.Height, lastRes.block.Hash)
}
return nil