2017-08-28 09:50:57 -06:00
package main
import (
2018-01-18 09:33:20 -07:00
"context"
2018-01-22 08:46:54 -07:00
"encoding/hex"
2017-08-28 09:50:57 -06:00
"flag"
2018-01-18 09:33:20 -07:00
"os"
"os/signal"
2017-10-05 06:35:07 -06:00
"sync"
2018-01-18 09:33:20 -07:00
"syscall"
2017-08-28 09:50:57 -06:00
"time"
2017-09-12 18:50:34 -06:00
2018-03-01 03:06:10 -07:00
"github.com/juju/errors"
2018-01-31 07:23:17 -07:00
"blockbook/bchain"
2018-01-18 08:44:31 -07:00
"blockbook/db"
"blockbook/server"
2018-01-30 10:22:25 -07:00
"github.com/golang/glog"
2017-09-12 18:50:34 -06:00
"github.com/pkg/profile"
2017-08-28 09:50:57 -06:00
)
2018-02-06 01:12:50 -07:00
// resync index at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ)
const resyncIndexPeriodMs = 935093
// debounce too close requests for resync
const debounceResyncIndexMs = 1009
// resync mempool at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ)
const resyncMempoolPeriodMs = 60017
// debounce too close requests for resync mempool (ZeroMQ sends message for each tx, when new block there are many transactions)
const debounceResyncMempoolMs = 1009
2017-08-28 09:50:57 -06:00
var (
rpcURL = flag . String ( "rpcurl" , "http://localhost:8332" , "url of bitcoin RPC service" )
rpcUser = flag . String ( "rpcuser" , "rpc" , "rpc username" )
rpcPass = flag . String ( "rpcpass" , "rpc" , "rpc password" )
rpcTimeout = flag . Uint ( "rpctimeout" , 25 , "rpc timeout in seconds" )
dbPath = flag . String ( "path" , "./data" , "path to address index directory" )
2018-01-29 09:27:42 -07:00
blockHeight = flag . Int ( "blockheight" , - 1 , "height of the starting block" )
blockUntil = flag . Int ( "blockuntil" , - 1 , "height of the final block" )
rollbackHeight = flag . Int ( "rollback" , - 1 , "rollback to the given height and quit" )
2017-10-05 06:35:07 -06:00
2017-08-28 09:50:57 -06:00
queryAddress = flag . String ( "address" , "" , "query contents of this address" )
2017-09-11 04:20:21 -06:00
2018-01-31 07:03:06 -07:00
synchronize = flag . Bool ( "sync" , false , "synchronizes until tip, if together with zeromq, keeps index synchronized" )
repair = flag . Bool ( "repair" , false , "repair the database" )
prof = flag . Bool ( "prof" , false , "profile program execution" )
2017-10-06 04:57:51 -06:00
2018-02-26 08:44:25 -07:00
syncChunk = flag . Int ( "chunk" , 100 , "block chunk size for processing" )
syncWorkers = flag . Int ( "workers" , 8 , "number of workers to process blocks" )
dryRun = flag . Bool ( "dryrun" , false , "do not index blocks, only download" )
parse = flag . Bool ( "parse" , false , "use in-process block parsing" )
2018-01-18 08:44:31 -07:00
2018-02-07 14:56:17 -07:00
httpServerBinding = flag . String ( "httpserver" , "" , "http server binding [address]:port, (default no http server)" )
2018-01-19 07:58:46 -07:00
2018-02-07 14:56:17 -07:00
socketIoBinding = flag . String ( "socketio" , "" , "socketio server binding [address]:port[/path], (default no socket.io server)" )
2018-02-06 04:06:30 -07:00
2018-02-07 14:56:17 -07:00
certFiles = flag . String ( "certfile" , "" , "to enable SSL specify path to certificate files without extension, expecting <certfile>.crt and <certfile>.key, (default no SSL)" )
2018-02-07 12:42:25 -07:00
2018-01-24 07:10:35 -07:00
zeroMQBinding = flag . String ( "zeromq" , "" , "binding to zeromq, if missing no zeromq connection" )
2018-02-26 08:21:58 -07:00
2018-02-26 08:25:40 -07:00
explorerURL = flag . String ( "explorer" , "" , "address of the Bitcoin blockchain explorer" )
2017-08-28 09:50:57 -06:00
)
2018-01-31 07:03:06 -07:00
var (
2018-02-22 05:32:06 -07:00
chanSyncIndex = make ( chan struct { } )
chanSyncMempool = make ( chan struct { } )
chanSyncIndexDone = make ( chan struct { } )
chanSyncMempoolDone = make ( chan struct { } )
chain * bchain . BitcoinRPC
mempool * bchain . Mempool
index * db . RocksDB
callbacksOnNewBlockHash [ ] func ( hash string )
callbacksOnNewTxAddr [ ] func ( txid string , addr string )
2018-02-28 16:59:25 -07:00
chanOsSignal chan os . Signal
2018-01-31 07:03:06 -07:00
)
2017-08-28 09:50:57 -06:00
func main ( ) {
flag . Parse ( )
2018-01-30 10:22:25 -07:00
// override setting for glog to log only to stderr, to match the http handler
flag . Lookup ( "logtostderr" ) . Value . Set ( "true" )
2018-01-31 07:03:06 -07:00
defer glog . Flush ( )
2018-02-28 16:59:25 -07:00
chanOsSignal = make ( chan os . Signal , 1 )
signal . Notify ( chanOsSignal , syscall . SIGHUP , syscall . SIGINT , syscall . SIGQUIT , syscall . SIGTERM )
2018-01-22 08:46:54 -07:00
if * prof {
defer profile . Start ( ) . Stop ( )
2018-01-19 07:58:46 -07:00
}
2017-09-12 08:53:40 -06:00
if * repair {
2018-01-18 09:33:20 -07:00
if err := db . RepairRocksDB ( * dbPath ) ; err != nil {
2018-01-30 10:22:25 -07:00
glog . Fatalf ( "RepairRocksDB %s: %v" , * dbPath , err )
2017-09-12 08:53:40 -06:00
}
return
}
2018-01-31 07:23:17 -07:00
chain = bchain . NewBitcoinRPC (
2017-10-05 06:35:07 -06:00
* rpcURL ,
* rpcUser ,
* rpcPass ,
time . Duration ( * rpcTimeout ) * time . Second )
2017-08-28 09:50:57 -06:00
2017-10-07 03:05:35 -06:00
if * parse {
2018-01-31 07:23:17 -07:00
chain . Parser = & bchain . BitcoinBlockParser {
Params : bchain . GetChainParams ( ) [ 0 ] ,
2017-10-07 03:05:35 -06:00
}
}
2018-01-31 09:51:48 -07:00
mempool = bchain . NewMempool ( chain )
2018-01-31 07:03:06 -07:00
var err error
index , err = db . NewRocksDB ( * dbPath )
2017-08-28 09:50:57 -06:00
if err != nil {
2018-01-30 10:22:25 -07:00
glog . Fatalf ( "NewRocksDB %v" , err )
2017-08-28 09:50:57 -06:00
}
2018-01-31 07:03:06 -07:00
defer index . Close ( )
2017-08-28 09:50:57 -06:00
2018-01-29 09:27:42 -07:00
if * rollbackHeight >= 0 {
2018-01-31 07:03:06 -07:00
bestHeight , _ , err := index . GetBestBlock ( )
2018-01-29 09:27:42 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "rollbackHeight: " , err )
return
2018-01-29 09:27:42 -07:00
}
if uint32 ( * rollbackHeight ) > bestHeight {
2018-01-30 10:22:25 -07:00
glog . Infof ( "nothing to rollback, rollbackHeight %d, bestHeight: %d" , * rollbackHeight , bestHeight )
2018-01-29 09:27:42 -07:00
} else {
2018-01-31 07:03:06 -07:00
err = index . DisconnectBlocks ( uint32 ( * rollbackHeight ) , bestHeight )
2018-01-29 09:27:42 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "rollbackHeight: " , err )
return
2018-01-29 09:27:42 -07:00
}
}
return
}
2018-01-31 07:03:06 -07:00
if * synchronize {
2018-02-24 08:25:55 -07:00
if err := resyncIndex ( nil ) ; err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "resyncIndex " , err )
return
2018-01-27 16:59:54 -07:00
}
}
2018-01-18 09:33:20 -07:00
2018-02-01 04:56:45 -07:00
var httpServer * server . HTTPServer
2018-01-24 07:10:35 -07:00
if * httpServerBinding != "" {
2018-02-07 12:42:25 -07:00
httpServer , err = server . NewHTTPServer ( * httpServerBinding , * certFiles , index , mempool )
2018-01-18 08:44:31 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "https: " , err )
return
2018-01-18 08:44:31 -07:00
}
2018-01-18 09:33:20 -07:00
go func ( ) {
err = httpServer . Run ( )
if err != nil {
2018-01-31 07:34:20 -07:00
if err . Error ( ) == "http: Server closed" {
glog . Info ( err )
} else {
2018-03-01 03:22:37 -07:00
glog . Error ( err )
return
2018-01-31 07:34:20 -07:00
}
2018-01-18 09:33:20 -07:00
}
} ( )
2018-01-18 08:44:31 -07:00
}
2018-02-06 04:06:30 -07:00
var socketIoServer * server . SocketIoServer
if * socketIoBinding != "" {
2018-02-26 08:25:40 -07:00
socketIoServer , err = server . NewSocketIoServer ( * socketIoBinding , * certFiles , index , mempool , chain , * explorerURL )
2018-02-06 04:06:30 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "socketio: " , err )
return
2018-02-06 04:06:30 -07:00
}
go func ( ) {
err = socketIoServer . Run ( )
if err != nil {
if err . Error ( ) == "http: Server closed" {
glog . Info ( err )
} else {
2018-03-01 03:22:37 -07:00
glog . Error ( err )
return
2018-02-06 04:06:30 -07:00
}
}
} ( )
2018-02-22 05:32:06 -07:00
callbacksOnNewBlockHash = append ( callbacksOnNewBlockHash , socketIoServer . OnNewBlockHash )
callbacksOnNewTxAddr = append ( callbacksOnNewTxAddr , socketIoServer . OnNewTxAddr )
2018-02-22 05:01:35 -07:00
}
if * synchronize {
// start the synchronization loops after the server interfaces are started
go syncIndexLoop ( )
go syncMempoolLoop ( )
// sync mempool immediately
chanSyncMempool <- struct { } { }
2018-02-06 04:06:30 -07:00
}
2018-01-31 07:23:17 -07:00
var mq * bchain . MQ
2018-01-24 07:10:35 -07:00
if * zeroMQBinding != "" {
2018-01-31 07:03:06 -07:00
if ! * synchronize {
glog . Error ( "zeromq connection without synchronization does not make sense, ignoring zeromq parameter" )
} else {
2018-01-31 07:23:17 -07:00
mq , err = bchain . NewMQ ( * zeroMQBinding , mqHandler )
2018-01-31 07:03:06 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "mq: " , err )
return
2018-01-31 07:03:06 -07:00
}
2018-01-22 08:46:54 -07:00
}
}
2017-08-28 09:50:57 -06:00
if * blockHeight >= 0 {
if * blockUntil < 0 {
* blockUntil = * blockHeight
}
height := uint32 ( * blockHeight )
until := uint32 ( * blockUntil )
address := * queryAddress
if address != "" {
2018-01-31 07:23:17 -07:00
script , err := bchain . AddressToOutputScript ( address )
2018-01-29 15:25:40 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "GetTransactions " , err )
return
2018-01-29 15:25:40 -07:00
}
2018-01-31 07:03:06 -07:00
if err = index . GetTransactions ( script , height , until , printResult ) ; err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "GetTransactions " , err )
return
2017-08-28 09:50:57 -06:00
}
2018-01-31 07:03:06 -07:00
} else if ! * synchronize {
2018-02-04 16:28:15 -07:00
if err = connectBlocksParallelInChunks (
2017-10-06 04:57:51 -06:00
height ,
until ,
* syncChunk ,
* syncWorkers ,
) ; err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "connectBlocksParallelInChunks " , err )
return
2017-08-28 09:50:57 -06:00
}
}
}
2018-01-18 09:33:20 -07:00
2018-02-22 05:01:35 -07:00
if httpServer != nil || socketIoServer != nil || mq != nil {
2018-02-06 04:06:30 -07:00
waitForSignalAndShutdown ( httpServer , socketIoServer , mq , 5 * time . Second )
2018-01-18 09:33:20 -07:00
}
2018-01-31 07:03:06 -07:00
2018-02-02 08:17:33 -07:00
if * synchronize {
close ( chanSyncIndex )
close ( chanSyncMempool )
<- chanSyncIndexDone
<- chanSyncMempoolDone
}
2018-01-31 07:03:06 -07:00
}
2018-02-01 03:24:53 -07:00
func tickAndDebounce ( tickTime time . Duration , debounceTime time . Duration , input chan struct { } , f func ( ) ) {
timer := time . NewTimer ( tickTime )
Loop :
for {
select {
case _ , ok := <- input :
2018-02-01 04:26:12 -07:00
if ! timer . Stop ( ) {
<- timer . C
}
// exit loop on closed input channel
2018-02-01 03:24:53 -07:00
if ! ok {
break Loop
}
// debounce for debounceTime
2018-02-01 04:26:12 -07:00
timer . Reset ( debounceTime )
case <- timer . C :
2018-02-01 03:24:53 -07:00
// do the action and start the loop again
f ( )
2018-02-01 04:26:12 -07:00
timer . Reset ( tickTime )
2018-02-01 03:24:53 -07:00
}
}
}
2018-01-31 09:51:48 -07:00
func syncIndexLoop ( ) {
defer close ( chanSyncIndexDone )
glog . Info ( "syncIndexLoop starting" )
2018-02-01 03:24:53 -07:00
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
2018-02-06 01:12:50 -07:00
tickAndDebounce ( resyncIndexPeriodMs * time . Millisecond , debounceResyncIndexMs * time . Millisecond , chanSyncIndex , func ( ) {
2018-02-24 08:25:55 -07:00
if err := resyncIndex ( onNewBlockHash ) ; err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "syncIndexLoop " , errors . ErrorStack ( err ) )
2018-01-31 09:51:48 -07:00
}
2018-02-01 03:24:53 -07:00
} )
2018-01-31 09:51:48 -07:00
glog . Info ( "syncIndexLoop stopped" )
}
2018-02-22 05:01:35 -07:00
func onNewBlockHash ( hash string ) {
2018-02-22 05:32:06 -07:00
for _ , c := range callbacksOnNewBlockHash {
2018-02-22 05:01:35 -07:00
c ( hash )
}
}
2018-01-31 09:51:48 -07:00
func syncMempoolLoop ( ) {
defer close ( chanSyncMempoolDone )
glog . Info ( "syncMempoolLoop starting" )
2018-02-01 03:24:53 -07:00
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
2018-02-06 01:12:50 -07:00
tickAndDebounce ( resyncMempoolPeriodMs * time . Millisecond , debounceResyncMempoolMs * time . Millisecond , chanSyncMempool , func ( ) {
2018-02-22 05:32:06 -07:00
if err := mempool . Resync ( onNewTxAddr ) ; err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "syncMempoolLoop " , errors . ErrorStack ( err ) )
2018-01-31 09:51:48 -07:00
}
2018-02-01 03:24:53 -07:00
} )
2018-01-31 09:51:48 -07:00
glog . Info ( "syncMempoolLoop stopped" )
2018-01-18 09:33:20 -07:00
}
2018-02-22 05:32:06 -07:00
func onNewTxAddr ( txid string , addr string ) {
for _ , c := range callbacksOnNewTxAddr {
c ( txid , addr )
}
}
2018-01-31 07:23:17 -07:00
func mqHandler ( m * bchain . MQMessage ) {
2018-01-22 08:46:54 -07:00
body := hex . EncodeToString ( m . Body )
2018-02-01 03:24:53 -07:00
glog . V ( 1 ) . Infof ( "MQ: %s-%d %s" , m . Topic , m . Sequence , body )
2018-01-31 07:03:06 -07:00
if m . Topic == "hashblock" {
2018-01-31 09:51:48 -07:00
chanSyncIndex <- struct { } { }
2018-01-31 07:03:06 -07:00
} else if m . Topic == "hashtx" {
2018-01-31 09:51:48 -07:00
chanSyncMempool <- struct { } { }
2018-01-31 07:03:06 -07:00
} else {
glog . Errorf ( "MQ: unknown message %s-%d %s" , m . Topic , m . Sequence , body )
}
2018-01-22 08:46:54 -07:00
}
2018-01-18 09:33:20 -07:00
2018-02-06 04:06:30 -07:00
func waitForSignalAndShutdown ( https * server . HTTPServer , socketio * server . SocketIoServer , mq * bchain . MQ , timeout time . Duration ) {
2018-02-28 16:59:25 -07:00
sig := <- chanOsSignal
2018-01-18 09:33:20 -07:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , timeout )
defer cancel ( )
2018-01-30 10:22:25 -07:00
glog . Infof ( "Shutdown: %v" , sig )
2018-01-22 08:46:54 -07:00
if mq != nil {
if err := mq . Shutdown ( ) ; err != nil {
2018-01-30 10:22:25 -07:00
glog . Error ( "MQ.Shutdown error: " , err )
2018-01-22 08:46:54 -07:00
}
}
2018-01-18 09:33:20 -07:00
2018-02-06 04:06:30 -07:00
if https != nil {
if err := https . Shutdown ( ctx ) ; err != nil {
2018-01-30 10:22:25 -07:00
glog . Error ( "HttpServer.Shutdown error: " , err )
2018-01-18 12:32:10 -07:00
}
2018-01-18 09:33:20 -07:00
}
2018-02-06 04:06:30 -07:00
if socketio != nil {
if err := socketio . Shutdown ( ctx ) ; err != nil {
glog . Error ( "SocketIo.Shutdown error: " , err )
}
}
2017-08-28 09:50:57 -06:00
}
2018-02-03 11:14:27 -07:00
func printResult ( txid string , vout uint32 , isOutput bool ) error {
glog . Info ( txid , vout , isOutput )
2017-08-28 09:50:57 -06:00
return nil
}
2018-02-24 08:25:55 -07:00
func resyncIndex ( onNewBlock func ( hash string ) ) error {
2017-10-05 06:35:07 -06:00
remote , err := chain . GetBestBlockHash ( )
2017-09-11 04:20:21 -06:00
if err != nil {
return err
}
2018-01-27 16:59:54 -07:00
localBestHeight , local , err := index . GetBestBlock ( )
2017-09-11 04:20:21 -06:00
if err != nil {
2017-10-05 06:35:07 -06:00
local = ""
2017-09-11 04:20:21 -06:00
}
2017-08-28 09:50:57 -06:00
2018-01-27 16:59:54 -07:00
// If the locally indexed block is the same as the best block on the
// network, we're done.
if local == remote {
2018-01-30 10:22:25 -07:00
glog . Infof ( "resync: synced on %d %s" , localBestHeight , local )
2018-01-27 16:59:54 -07:00
return nil
}
2018-01-31 07:23:17 -07:00
var header * bchain . BlockHeader
2018-01-30 01:46:28 -07:00
if local != "" {
// Is local tip on the best chain?
header , err = chain . GetBlockHeader ( local )
forked := false
2017-08-28 09:50:57 -06:00
if err != nil {
2018-01-31 07:23:17 -07:00
if e , ok := err . ( * bchain . RPCError ) ; ok && e . Message == "Block not found" {
2018-01-30 01:46:28 -07:00
forked = true
} else {
return err
}
2017-09-11 04:20:21 -06:00
} else {
2018-01-30 01:46:28 -07:00
if header . Confirmations < 0 {
forked = true
}
2017-09-06 03:03:23 -06:00
}
2018-01-30 01:46:28 -07:00
if forked {
// find and disconnect forked blocks and then synchronize again
2018-01-30 10:22:25 -07:00
glog . Info ( "resync: local is forked" )
2018-01-30 01:46:28 -07:00
var height uint32
for height = localBestHeight - 1 ; height >= 0 ; height -- {
local , err = index . GetBlockHash ( height )
if err != nil {
return err
}
remote , err = chain . GetBlockHash ( height )
if err != nil {
return err
}
if local == remote {
break
}
2018-01-27 16:59:54 -07:00
}
2018-01-30 01:46:28 -07:00
err = index . DisconnectBlocks ( height + 1 , localBestHeight )
2018-01-27 16:59:54 -07:00
if err != nil {
return err
}
2018-02-24 08:25:55 -07:00
return resyncIndex ( onNewBlock )
2018-01-30 01:46:28 -07:00
}
}
startHeight := uint32 ( 0 )
var hash string
if header != nil {
2018-01-30 10:22:25 -07:00
glog . Info ( "resync: local is behind" )
2018-01-30 01:46:28 -07:00
hash = header . Next
startHeight = localBestHeight
} else {
// If the local block is missing, we're indexing from the genesis block
// or from the start block specified by flags
if * blockHeight > 0 {
startHeight = uint32 ( * blockHeight )
}
2018-01-30 10:22:25 -07:00
glog . Info ( "resync: genesis from block " , startHeight )
2018-01-30 01:46:28 -07:00
hash , err = chain . GetBlockHash ( startHeight )
if err != nil {
return err
2018-01-27 16:59:54 -07:00
}
2018-01-30 01:46:28 -07:00
}
// if parallel operation is enabled and the number of blocks to be connected is large,
// use parallel routine to load majority of blocks
2018-02-05 09:47:20 -07:00
if * syncWorkers > 1 {
2018-01-30 01:46:28 -07:00
chainBestHeight , err := chain . GetBestBlockHeight ( )
2018-01-27 16:59:54 -07:00
if err != nil {
return err
}
2018-01-30 01:46:28 -07:00
if chainBestHeight - startHeight > uint32 ( * syncChunk ) {
2018-02-01 14:36:57 -07:00
glog . Infof ( "resync: parallel sync of blocks %d-%d, using %d workers" , startHeight , chainBestHeight , * syncWorkers )
2018-01-30 01:46:28 -07:00
err = connectBlocksParallel (
startHeight ,
chainBestHeight ,
* syncWorkers ,
)
if err != nil {
return err
}
// after parallel load finish the sync using standard way,
// new blocks may have been created in the meantime
2018-02-24 08:25:55 -07:00
return resyncIndex ( onNewBlock )
2018-01-30 01:46:28 -07:00
}
2017-09-11 04:20:21 -06:00
}
2018-01-27 16:59:54 -07:00
2018-02-22 05:01:35 -07:00
return connectBlocks ( hash , onNewBlock )
2017-09-11 04:20:21 -06:00
}
2017-08-28 09:50:57 -06:00
2018-02-22 05:01:35 -07:00
func connectBlocks ( hash string , onNewBlock func ( hash string ) ) error {
2017-09-11 07:06:16 -06:00
bch := make ( chan blockResult , 8 )
done := make ( chan struct { } )
defer close ( done )
2018-01-31 07:03:06 -07:00
go getBlockChain ( hash , bch , done )
2017-09-11 07:06:16 -06:00
2018-01-30 10:22:25 -07:00
var lastRes blockResult
2017-09-11 07:06:16 -06:00
for res := range bch {
2018-01-30 10:22:25 -07:00
lastRes = res
2017-10-05 06:35:07 -06:00
if res . err != nil {
return res . err
2017-08-28 09:50:57 -06:00
}
2017-10-05 06:35:07 -06:00
err := index . ConnectBlock ( res . block )
2017-08-28 09:50:57 -06:00
if err != nil {
2017-09-11 04:20:21 -06:00
return err
2017-08-28 09:50:57 -06:00
}
2018-02-22 05:01:35 -07:00
if onNewBlock != nil {
onNewBlock ( res . block . Hash )
}
2017-09-06 07:36:55 -06:00
}
2017-09-11 04:20:21 -06:00
2018-01-30 10:22:25 -07:00
if lastRes . block != nil {
glog . Infof ( "resync: synced on %d %s" , lastRes . block . Height , lastRes . block . Hash )
}
2017-09-11 04:20:21 -06:00
return nil
2017-08-28 09:50:57 -06:00
}
2017-10-05 06:35:07 -06:00
func connectBlocksParallel (
2018-02-04 16:28:15 -07:00
lower uint32 ,
higher uint32 ,
numWorkers int ,
) error {
2018-02-28 16:32:40 -07:00
type hashHeight struct {
hash string
height uint32
}
2018-02-05 09:47:20 -07:00
var err error
2018-02-04 16:28:15 -07:00
var wg sync . WaitGroup
2018-02-28 16:32:40 -07:00
hch := make ( chan hashHeight , numWorkers )
2018-02-05 02:31:22 -07:00
running := make ( [ ] bool , numWorkers )
work := func ( i int ) {
2018-02-04 16:28:15 -07:00
defer wg . Done ( )
2018-02-28 17:15:25 -07:00
var err error
var block * bchain . Block
2018-02-28 16:32:40 -07:00
for hh := range hch {
2018-02-05 02:31:22 -07:00
running [ i ] = true
2018-02-28 17:15:25 -07:00
for {
block , err = chain . GetBlockWithoutHeader ( hh . hash , hh . height )
if err != nil {
2018-03-01 03:06:10 -07:00
glog . Error ( "Connect block error " , err , ". Retrying..." )
2018-02-28 17:15:25 -07:00
time . Sleep ( time . Millisecond * 500 )
} else {
break
}
2018-02-04 16:28:15 -07:00
}
if * dryRun {
2018-02-05 02:31:22 -07:00
running [ i ] = false
2018-02-04 16:28:15 -07:00
continue
}
err = index . ConnectBlock ( block )
if err != nil {
2018-02-28 16:32:40 -07:00
glog . Error ( "Connect block " , hh . height , " " , hh . hash , " error " , err )
2018-02-04 16:28:15 -07:00
}
2018-02-05 02:31:22 -07:00
running [ i ] = false
2018-02-04 16:28:15 -07:00
}
}
for i := 0 ; i < numWorkers ; i ++ {
wg . Add ( 1 )
2018-02-05 02:31:22 -07:00
go work ( i )
2018-02-04 16:28:15 -07:00
}
2018-02-05 04:27:41 -07:00
2018-02-28 16:59:25 -07:00
var hash string
ConnectLoop :
for h := lower ; h <= higher ; {
select {
case <- chanOsSignal :
// wait for the workers to finish block
2018-02-28 17:15:25 -07:00
i := 0
2018-02-28 16:59:25 -07:00
WaitAgain :
2018-02-28 17:15:25 -07:00
for ; i < 60 ; i ++ {
2018-02-28 16:59:25 -07:00
for _ , r := range running {
if r {
2018-02-28 17:15:25 -07:00
glog . Info ( "Waiting " , i , "s for workers to finish " , running )
time . Sleep ( time . Millisecond * 1000 )
2018-02-28 16:59:25 -07:00
continue WaitAgain
}
}
2018-02-28 17:15:25 -07:00
break
2018-02-28 16:59:25 -07:00
}
2018-03-01 03:06:10 -07:00
err = errors . Errorf ( "connectBlocksParallel interrupted at height %d" , h )
2018-02-28 17:15:25 -07:00
break ConnectLoop
2018-02-28 16:59:25 -07:00
default :
hash , err = chain . GetBlockHash ( h )
if err != nil {
2018-03-01 03:06:10 -07:00
glog . Error ( "GetBlockHash error " , err )
2018-02-28 16:59:25 -07:00
time . Sleep ( time . Millisecond * 500 )
continue
}
hch <- hashHeight { hash , h }
if h > 0 && h % 1000 == 0 {
glog . Info ( "connecting block " , h , " " , hash )
}
h ++
2018-02-04 16:28:15 -07:00
}
}
close ( hch )
wg . Wait ( )
return err
}
func connectBlocksParallelInChunks (
2017-09-04 06:16:37 -06:00
lower uint32 ,
higher uint32 ,
2017-10-06 04:57:51 -06:00
chunkSize int ,
numWorkers int ,
2017-09-04 06:16:37 -06:00
) error {
2017-10-05 06:35:07 -06:00
var wg sync . WaitGroup
2017-09-04 06:16:37 -06:00
2017-10-05 06:35:07 -06:00
work := func ( i int ) {
defer wg . Done ( )
offset := uint32 ( chunkSize * i )
stride := uint32 ( chunkSize * numWorkers )
2017-10-07 02:42:31 -06:00
for low := lower + offset ; low <= higher ; low += stride {
2017-10-06 04:57:51 -06:00
high := low + uint32 ( chunkSize - 1 )
2017-10-05 06:35:07 -06:00
if high > higher {
high = higher
}
2018-01-31 07:03:06 -07:00
err := connectBlockChunk ( low , high )
2017-10-05 06:35:07 -06:00
if err != nil {
2018-01-31 07:23:17 -07:00
if e , ok := err . ( * bchain . RPCError ) ; ok && ( e . Message == "Block height out of range" || e . Message == "Block not found" ) {
2018-01-25 08:24:27 -07:00
break
}
2018-01-30 10:22:25 -07:00
glog . Fatalf ( "connectBlocksParallel %d-%d %v" , low , high , err )
2017-10-05 06:35:07 -06:00
}
2017-08-28 09:50:57 -06:00
}
2017-09-04 06:16:37 -06:00
}
2017-10-05 06:35:07 -06:00
for i := 0 ; i < numWorkers ; i ++ {
wg . Add ( 1 )
go work ( i )
}
wg . Wait ( )
2017-09-04 06:16:37 -06:00
2017-10-05 06:35:07 -06:00
return nil
2017-09-04 06:16:37 -06:00
}
2017-10-05 06:35:07 -06:00
func connectBlockChunk (
2017-09-11 04:20:21 -06:00
lower uint32 ,
higher uint32 ,
2017-10-05 06:35:07 -06:00
) error {
2018-01-31 07:03:06 -07:00
connected , err := isBlockConnected ( higher )
2017-10-05 06:35:07 -06:00
if err != nil || connected {
2018-01-25 08:24:27 -07:00
// if higher is over the best block, continue with lower block, otherwise return error
2018-01-31 07:23:17 -07:00
if e , ok := err . ( * bchain . RPCError ) ; ! ok || e . Message != "Block height out of range" {
2018-01-25 08:24:27 -07:00
return err
}
2017-10-05 06:35:07 -06:00
}
2017-09-06 02:59:40 -06:00
height := lower
2017-10-05 06:35:07 -06:00
hash , err := chain . GetBlockHash ( lower )
2017-09-06 03:03:23 -06:00
if err != nil {
2017-10-05 06:35:07 -06:00
return err
2017-09-06 03:03:23 -06:00
}
2017-09-06 02:59:40 -06:00
for height <= higher {
2017-10-05 06:35:07 -06:00
block , err := chain . GetBlock ( hash )
2017-09-04 06:16:37 -06:00
if err != nil {
2017-10-05 06:35:07 -06:00
return err
2017-08-28 09:50:57 -06:00
}
2017-09-06 02:59:40 -06:00
hash = block . Next
height = block . Height + 1
2017-10-06 04:57:51 -06:00
if * dryRun {
continue
}
2017-10-05 06:35:07 -06:00
err = index . ConnectBlock ( block )
if err != nil {
return err
}
2018-01-30 10:22:25 -07:00
if block . Height % 1000 == 0 {
glog . Info ( "connected block " , block . Height , " " , block . Hash )
}
2017-08-28 09:50:57 -06:00
}
2017-10-05 06:35:07 -06:00
return nil
}
func isBlockConnected (
height uint32 ,
) ( bool , error ) {
local , err := index . GetBlockHash ( height )
if err != nil {
return false , err
}
remote , err := chain . GetBlockHash ( height )
if err != nil {
return false , err
}
if local != remote {
return false , nil
}
return true , nil
}
type blockResult struct {
2018-01-31 07:23:17 -07:00
block * bchain . Block
2017-10-05 06:35:07 -06:00
err error
2017-08-28 09:50:57 -06:00
}
2017-09-11 07:06:16 -06:00
func getBlockChain (
hash string ,
out chan blockResult ,
done chan struct { } ,
) {
defer close ( out )
for hash != "" {
select {
case <- done :
return
default :
}
2017-10-05 06:35:07 -06:00
block , err := chain . GetBlock ( hash )
2017-09-11 07:06:16 -06:00
if err != nil {
out <- blockResult { err : err }
return
}
hash = block . Next
out <- blockResult { block : block }
}
}