Synchronize with chain using parallel operations

pull/1/head
Martin Boehm 2018-01-30 09:46:28 +01:00
parent 9356e41730
commit 496d6ff2c9
1 changed files with 78 additions and 40 deletions

View File

@ -20,6 +20,7 @@ import (
type Blockchain interface {
GetBestBlockHash() (string, error)
GetBestBlockHeight() (uint32, error)
GetBlockHash(height uint32) (string, error)
GetBlockHeader(hash string) (*bitcoin.BlockHeader, error)
GetBlock(hash string) (*bitcoin.Block, error)
@ -154,7 +155,7 @@ func main() {
if err = db.GetTransactions(script, height, until, printResult); err != nil {
log.Fatalf("GetTransactions %v", err)
}
} else {
} else if !*resync {
if err = connectBlocksParallel(
rpc,
db,
@ -225,57 +226,94 @@ func resyncIndex(chain Blockchain, index Index) error {
return nil
}
// If the local block is missing, we're indexing from the genesis block.
if local == "" {
log.Printf("resync: genesis")
hash, err := chain.GetBlockHash(0)
var header *bitcoin.BlockHeader
if local != "" {
// Is local tip on the best chain?
header, err = chain.GetBlockHeader(local)
forked := false
if err != nil {
return err
}
return connectBlocks(chain, index, hash)
}
// Is local tip on the best chain?
header, err := chain.GetBlockHeader(local)
forked := false
if err != nil {
if e, ok := err.(*bitcoin.RPCError); ok && e.Message == "Block not found" {
forked = true
if e, ok := err.(*bitcoin.RPCError); ok && e.Message == "Block not found" {
forked = true
} else {
return err
}
} else {
return err
if header.Confirmations < 0 {
forked = true
}
}
} else {
if header.Confirmations < 0 {
forked = true
if forked {
// find and disconnect forked blocks and then synchronize again
log.Printf("resync: local is forked")
var height uint32
for height = localBestHeight - 1; height >= 0; height-- {
local, err = index.GetBlockHash(height)
if err != nil {
return err
}
remote, err = chain.GetBlockHash(height)
if err != nil {
return err
}
if local == remote {
break
}
}
err = index.DisconnectBlocks(height+1, localBestHeight)
if err != nil {
return err
}
return resyncIndex(chain, index)
}
}
if forked {
log.Printf("resync: local is forked")
var height uint32
for height = localBestHeight - 1; height >= 0; height-- {
local, err = index.GetBlockHash(height)
if err != nil {
return err
}
remote, err = chain.GetBlockHash(height)
if err != nil {
return err
}
if local == remote {
break
}
startHeight := uint32(0)
var hash string
if header != nil {
log.Printf("resync: local is behind")
hash = header.Next
startHeight = localBestHeight
} else {
// If the local block is missing, we're indexing from the genesis block
// or from the start block specified by flags
if *blockHeight > 0 {
startHeight = uint32(*blockHeight)
}
err = index.DisconnectBlocks(height+1, localBestHeight)
log.Printf("resync: genesis from block %d", startHeight)
hash, err = chain.GetBlockHash(startHeight)
if err != nil {
return err
}
return resyncIndex(chain, index)
}
log.Printf("resync: local is behind")
return connectBlocks(chain, index, header.Next)
// if parallel operation is enabled and the number of blocks to be connected is large,
// use parallel routine to load majority of blocks
if *syncWorkers > 1 {
chainBestHeight, err := chain.GetBestBlockHeight()
if err != nil {
return err
}
if chainBestHeight-startHeight > uint32(*syncChunk) {
log.Printf("resync: parallel sync of blocks %d-%d", startHeight, chainBestHeight)
err = connectBlocksParallel(
chain,
index,
startHeight,
chainBestHeight,
*syncChunk,
*syncWorkers,
)
if err != nil {
return err
}
// after parallel load finish the sync using standard way,
// new blocks may have been created in the meantime
return resyncIndex(chain, index)
}
}
return connectBlocks(chain, index, hash)
}
func connectBlocks(