2017-08-28 09:50:57 -06:00
package main
import (
2018-01-18 09:33:20 -07:00
"context"
2018-01-22 08:46:54 -07:00
"encoding/hex"
2017-08-28 09:50:57 -06:00
"flag"
2018-03-09 09:05:27 -07:00
"log"
2018-01-18 09:33:20 -07:00
"os"
"os/signal"
"syscall"
2017-08-28 09:50:57 -06:00
"time"
2017-09-12 18:50:34 -06:00
2018-03-01 03:06:10 -07:00
"github.com/juju/errors"
2018-01-31 07:23:17 -07:00
"blockbook/bchain"
2018-03-08 10:36:01 -07:00
"blockbook/bchain/coins"
2018-03-13 04:34:49 -06:00
"blockbook/common"
2018-01-18 08:44:31 -07:00
"blockbook/db"
"blockbook/server"
2018-01-30 10:22:25 -07:00
"github.com/golang/glog"
2018-03-09 09:05:27 -07:00
"net/http"
_ "net/http/pprof"
2017-08-28 09:50:57 -06:00
)
2018-02-06 01:12:50 -07:00
// resync index at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ)
const resyncIndexPeriodMs = 935093
// debounce too close requests for resync
const debounceResyncIndexMs = 1009
// resync mempool at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ)
const resyncMempoolPeriodMs = 60017
// debounce too close requests for resync mempool (ZeroMQ sends message for each tx, when new block there are many transactions)
const debounceResyncMempoolMs = 1009
2017-08-28 09:50:57 -06:00
var (
2018-03-15 05:34:29 -06:00
blockchain = flag . String ( "blockchaincfg" , "" , "path to blockchain RPC service configuration json file" )
2017-08-28 09:50:57 -06:00
2018-03-15 05:34:29 -06:00
dbPath = flag . String ( "datadir" , "./data" , "path to database directory" )
2017-08-28 09:50:57 -06:00
2018-03-01 10:37:01 -07:00
blockFrom = flag . Int ( "blockheight" , - 1 , "height of the starting block" )
2018-01-29 09:27:42 -07:00
blockUntil = flag . Int ( "blockuntil" , - 1 , "height of the final block" )
rollbackHeight = flag . Int ( "rollback" , - 1 , "rollback to the given height and quit" )
2017-10-05 06:35:07 -06:00
2017-08-28 09:50:57 -06:00
queryAddress = flag . String ( "address" , "" , "query contents of this address" )
2017-09-11 04:20:21 -06:00
2018-01-31 07:03:06 -07:00
synchronize = flag . Bool ( "sync" , false , "synchronizes until tip, if together with zeromq, keeps index synchronized" )
repair = flag . Bool ( "repair" , false , "repair the database" )
2018-03-09 09:05:27 -07:00
prof = flag . String ( "prof" , "" , "http server binding [address]:port of the interface to profiling data /debug/pprof/ (default no profiling)" )
2017-10-06 04:57:51 -06:00
2018-02-26 08:44:25 -07:00
syncChunk = flag . Int ( "chunk" , 100 , "block chunk size for processing" )
syncWorkers = flag . Int ( "workers" , 8 , "number of workers to process blocks" )
dryRun = flag . Bool ( "dryrun" , false , "do not index blocks, only download" )
2018-01-18 08:44:31 -07:00
2018-02-07 14:56:17 -07:00
httpServerBinding = flag . String ( "httpserver" , "" , "http server binding [address]:port, (default no http server)" )
2018-01-19 07:58:46 -07:00
2018-02-07 14:56:17 -07:00
socketIoBinding = flag . String ( "socketio" , "" , "socketio server binding [address]:port[/path], (default no socket.io server)" )
2018-02-06 04:06:30 -07:00
2018-02-07 14:56:17 -07:00
certFiles = flag . String ( "certfile" , "" , "to enable SSL specify path to certificate files without extension, expecting <certfile>.crt and <certfile>.key, (default no SSL)" )
2018-02-07 12:42:25 -07:00
2018-03-08 10:36:01 -07:00
explorerURL = flag . String ( "explorer" , "" , "address of blockchain explorer" )
coin = flag . String ( "coin" , "btc" , "coin name (default btc)" )
2017-08-28 09:50:57 -06:00
)
2018-01-31 07:03:06 -07:00
var (
2018-02-22 05:32:06 -07:00
chanSyncIndex = make ( chan struct { } )
chanSyncMempool = make ( chan struct { } )
chanSyncIndexDone = make ( chan struct { } )
chanSyncMempoolDone = make ( chan struct { } )
2018-03-08 04:59:37 -07:00
chain bchain . BlockChain
2018-02-22 05:32:06 -07:00
index * db . RocksDB
2018-03-06 04:36:24 -07:00
txCache * db . TxCache
2018-03-01 10:37:01 -07:00
syncWorker * db . SyncWorker
2018-02-22 05:32:06 -07:00
callbacksOnNewBlockHash [ ] func ( hash string )
callbacksOnNewTxAddr [ ] func ( txid string , addr string )
2018-02-28 16:59:25 -07:00
chanOsSignal chan os . Signal
2018-01-31 07:03:06 -07:00
)
2017-08-28 09:50:57 -06:00
func main ( ) {
flag . Parse ( )
2018-01-30 10:22:25 -07:00
// override setting for glog to log only to stderr, to match the http handler
flag . Lookup ( "logtostderr" ) . Value . Set ( "true" )
2018-01-31 07:03:06 -07:00
defer glog . Flush ( )
2018-02-28 16:59:25 -07:00
chanOsSignal = make ( chan os . Signal , 1 )
signal . Notify ( chanOsSignal , syscall . SIGHUP , syscall . SIGINT , syscall . SIGQUIT , syscall . SIGTERM )
2018-03-09 09:05:27 -07:00
if * prof != "" {
go func ( ) {
log . Println ( http . ListenAndServe ( * prof , nil ) )
} ( )
2018-01-19 07:58:46 -07:00
}
2017-09-12 08:53:40 -06:00
if * repair {
2018-01-18 09:33:20 -07:00
if err := db . RepairRocksDB ( * dbPath ) ; err != nil {
2018-01-30 10:22:25 -07:00
glog . Fatalf ( "RepairRocksDB %s: %v" , * dbPath , err )
2017-09-12 08:53:40 -06:00
}
return
}
2018-03-14 02:43:17 -06:00
metrics , err := common . GetMetrics ( * coin )
2018-03-13 04:34:49 -06:00
if err != nil {
glog . Fatal ( "GetMetrics: " , err )
}
2018-03-15 05:34:29 -06:00
if * blockchain == "" {
glog . Fatal ( "Missing blockchaincfg configuration parameter" )
}
if chain , err = coins . NewBlockChain ( * coin , * blockchain , pushSynchronizationHandler , metrics ) ; err != nil {
2018-03-10 17:31:09 -07:00
glog . Fatal ( "rpc: " , err )
2017-10-07 03:05:35 -06:00
}
2018-03-08 04:59:37 -07:00
index , err = db . NewRocksDB ( * dbPath , chain . GetChainParser ( ) )
2017-08-28 09:50:57 -06:00
if err != nil {
2018-03-10 17:31:09 -07:00
glog . Fatal ( "rocksDB: " , err )
2017-08-28 09:50:57 -06:00
}
2018-01-31 07:03:06 -07:00
defer index . Close ( )
2017-08-28 09:50:57 -06:00
2018-03-14 05:34:13 -06:00
syncWorker , err = db . NewSyncWorker ( index , chain , * syncWorkers , * syncChunk , * blockFrom , * dryRun , chanOsSignal , metrics )
if err != nil {
glog . Fatalf ( "NewSyncWorker %v" , err )
}
2018-01-29 09:27:42 -07:00
if * rollbackHeight >= 0 {
2018-03-14 05:34:13 -06:00
bestHeight , bestHash , err := index . GetBestBlock ( )
2018-01-29 09:27:42 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "rollbackHeight: " , err )
return
2018-01-29 09:27:42 -07:00
}
if uint32 ( * rollbackHeight ) > bestHeight {
2018-01-30 10:22:25 -07:00
glog . Infof ( "nothing to rollback, rollbackHeight %d, bestHeight: %d" , * rollbackHeight , bestHeight )
2018-01-29 09:27:42 -07:00
} else {
2018-03-14 05:34:13 -06:00
hashes := [ ] string { bestHash }
for height := bestHeight - 1 ; height >= uint32 ( * rollbackHeight ) ; height -- {
hash , err := index . GetBlockHash ( height )
if err != nil {
glog . Error ( "rollbackHeight: " , err )
return
}
hashes = append ( hashes , hash )
}
err = syncWorker . DisconnectBlocks ( uint32 ( * rollbackHeight ) , bestHeight , hashes )
2018-01-29 09:27:42 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "rollbackHeight: " , err )
return
2018-01-29 09:27:42 -07:00
}
}
return
}
2018-03-13 04:34:49 -06:00
if txCache , err = db . NewTxCache ( index , chain , metrics ) ; err != nil {
2018-03-06 04:36:24 -07:00
glog . Error ( "txCache " , err )
return
}
2018-02-01 04:56:45 -07:00
var httpServer * server . HTTPServer
2018-01-24 07:10:35 -07:00
if * httpServerBinding != "" {
2018-03-08 04:59:37 -07:00
httpServer , err = server . NewHTTPServer ( * httpServerBinding , * certFiles , index , chain , txCache )
2018-01-18 08:44:31 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "https: " , err )
return
2018-01-18 08:44:31 -07:00
}
2018-01-18 09:33:20 -07:00
go func ( ) {
err = httpServer . Run ( )
if err != nil {
2018-01-31 07:34:20 -07:00
if err . Error ( ) == "http: Server closed" {
glog . Info ( err )
} else {
2018-03-01 03:22:37 -07:00
glog . Error ( err )
return
2018-01-31 07:34:20 -07:00
}
2018-01-18 09:33:20 -07:00
}
} ( )
2018-01-18 08:44:31 -07:00
}
2018-03-13 09:10:38 -06:00
if * synchronize {
if err := syncWorker . ResyncIndex ( nil ) ; err != nil {
glog . Error ( "resyncIndex " , err )
return
}
if err = chain . ResyncMempool ( nil ) ; err != nil {
glog . Error ( "resyncMempool " , err )
return
}
}
2018-02-06 04:06:30 -07:00
var socketIoServer * server . SocketIoServer
if * socketIoBinding != "" {
2018-03-13 04:34:49 -06:00
socketIoServer , err = server . NewSocketIoServer (
* socketIoBinding , * certFiles , index , chain , txCache , * explorerURL , metrics )
2018-02-06 04:06:30 -07:00
if err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "socketio: " , err )
return
2018-02-06 04:06:30 -07:00
}
go func ( ) {
err = socketIoServer . Run ( )
if err != nil {
if err . Error ( ) == "http: Server closed" {
glog . Info ( err )
} else {
2018-03-01 03:22:37 -07:00
glog . Error ( err )
return
2018-02-06 04:06:30 -07:00
}
}
} ( )
2018-02-22 05:32:06 -07:00
callbacksOnNewBlockHash = append ( callbacksOnNewBlockHash , socketIoServer . OnNewBlockHash )
callbacksOnNewTxAddr = append ( callbacksOnNewTxAddr , socketIoServer . OnNewTxAddr )
2018-02-22 05:01:35 -07:00
}
if * synchronize {
// start the synchronization loops after the server interfaces are started
go syncIndexLoop ( )
go syncMempoolLoop ( )
2018-02-06 04:06:30 -07:00
}
2018-03-01 10:37:01 -07:00
if * blockFrom >= 0 {
2017-08-28 09:50:57 -06:00
if * blockUntil < 0 {
2018-03-01 10:37:01 -07:00
* blockUntil = * blockFrom
2017-08-28 09:50:57 -06:00
}
2018-03-01 10:37:01 -07:00
height := uint32 ( * blockFrom )
2017-08-28 09:50:57 -06:00
until := uint32 ( * blockUntil )
address := * queryAddress
if address != "" {
2018-03-20 08:58:35 -06:00
if err = index . GetTransactions ( address , height , until , printResult ) ; err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "GetTransactions " , err )
return
2017-08-28 09:50:57 -06:00
}
2018-01-31 07:03:06 -07:00
} else if ! * synchronize {
2018-03-01 10:37:01 -07:00
if err = syncWorker . ConnectBlocksParallelInChunks ( height , until ) ; err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "connectBlocksParallelInChunks " , err )
return
2017-08-28 09:50:57 -06:00
}
}
}
2018-01-18 09:33:20 -07:00
2018-03-15 05:34:29 -06:00
if httpServer != nil || socketIoServer != nil || chain != nil {
waitForSignalAndShutdown ( httpServer , socketIoServer , chain , 5 * time . Second )
2018-01-18 09:33:20 -07:00
}
2018-01-31 07:03:06 -07:00
2018-02-02 08:17:33 -07:00
if * synchronize {
close ( chanSyncIndex )
close ( chanSyncMempool )
<- chanSyncIndexDone
<- chanSyncMempoolDone
}
2018-01-31 07:03:06 -07:00
}
2018-02-01 03:24:53 -07:00
func tickAndDebounce ( tickTime time . Duration , debounceTime time . Duration , input chan struct { } , f func ( ) ) {
timer := time . NewTimer ( tickTime )
Loop :
for {
select {
case _ , ok := <- input :
2018-02-01 04:26:12 -07:00
if ! timer . Stop ( ) {
<- timer . C
}
// exit loop on closed input channel
2018-02-01 03:24:53 -07:00
if ! ok {
break Loop
}
// debounce for debounceTime
2018-02-01 04:26:12 -07:00
timer . Reset ( debounceTime )
case <- timer . C :
2018-02-01 03:24:53 -07:00
// do the action and start the loop again
f ( )
2018-02-01 04:26:12 -07:00
timer . Reset ( tickTime )
2018-02-01 03:24:53 -07:00
}
}
}
2018-01-31 09:51:48 -07:00
func syncIndexLoop ( ) {
defer close ( chanSyncIndexDone )
glog . Info ( "syncIndexLoop starting" )
2018-02-01 03:24:53 -07:00
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
2018-02-06 01:12:50 -07:00
tickAndDebounce ( resyncIndexPeriodMs * time . Millisecond , debounceResyncIndexMs * time . Millisecond , chanSyncIndex , func ( ) {
2018-03-01 10:37:01 -07:00
if err := syncWorker . ResyncIndex ( onNewBlockHash ) ; err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "syncIndexLoop " , errors . ErrorStack ( err ) )
2018-01-31 09:51:48 -07:00
}
2018-02-01 03:24:53 -07:00
} )
2018-01-31 09:51:48 -07:00
glog . Info ( "syncIndexLoop stopped" )
}
2018-02-22 05:01:35 -07:00
func onNewBlockHash ( hash string ) {
2018-02-22 05:32:06 -07:00
for _ , c := range callbacksOnNewBlockHash {
2018-02-22 05:01:35 -07:00
c ( hash )
}
}
2018-01-31 09:51:48 -07:00
func syncMempoolLoop ( ) {
defer close ( chanSyncMempoolDone )
glog . Info ( "syncMempoolLoop starting" )
2018-02-01 03:24:53 -07:00
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
2018-02-06 01:12:50 -07:00
tickAndDebounce ( resyncMempoolPeriodMs * time . Millisecond , debounceResyncMempoolMs * time . Millisecond , chanSyncMempool , func ( ) {
2018-03-08 04:59:37 -07:00
if err := chain . ResyncMempool ( onNewTxAddr ) ; err != nil {
2018-03-01 03:22:37 -07:00
glog . Error ( "syncMempoolLoop " , errors . ErrorStack ( err ) )
2018-01-31 09:51:48 -07:00
}
2018-02-01 03:24:53 -07:00
} )
2018-01-31 09:51:48 -07:00
glog . Info ( "syncMempoolLoop stopped" )
2018-01-18 09:33:20 -07:00
}
2018-02-22 05:32:06 -07:00
func onNewTxAddr ( txid string , addr string ) {
for _ , c := range callbacksOnNewTxAddr {
c ( txid , addr )
}
}
2018-03-15 05:34:29 -06:00
func pushSynchronizationHandler ( m * bchain . MQMessage ) {
2018-03-08 04:59:37 -07:00
// TODO - is coin specific, item for abstraction
2018-01-22 08:46:54 -07:00
body := hex . EncodeToString ( m . Body )
2018-02-01 03:24:53 -07:00
glog . V ( 1 ) . Infof ( "MQ: %s-%d %s" , m . Topic , m . Sequence , body )
2018-01-31 07:03:06 -07:00
if m . Topic == "hashblock" {
2018-01-31 09:51:48 -07:00
chanSyncIndex <- struct { } { }
2018-01-31 07:03:06 -07:00
} else if m . Topic == "hashtx" {
2018-01-31 09:51:48 -07:00
chanSyncMempool <- struct { } { }
2018-01-31 07:03:06 -07:00
} else {
glog . Errorf ( "MQ: unknown message %s-%d %s" , m . Topic , m . Sequence , body )
}
2018-01-22 08:46:54 -07:00
}
2018-01-18 09:33:20 -07:00
2018-03-15 05:34:29 -06:00
func waitForSignalAndShutdown ( https * server . HTTPServer , socketio * server . SocketIoServer , chain bchain . BlockChain , timeout time . Duration ) {
2018-02-28 16:59:25 -07:00
sig := <- chanOsSignal
2018-01-18 09:33:20 -07:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , timeout )
defer cancel ( )
2018-01-30 10:22:25 -07:00
glog . Infof ( "Shutdown: %v" , sig )
2018-01-22 08:46:54 -07:00
2018-02-06 04:06:30 -07:00
if https != nil {
if err := https . Shutdown ( ctx ) ; err != nil {
2018-01-30 10:22:25 -07:00
glog . Error ( "HttpServer.Shutdown error: " , err )
2018-01-18 12:32:10 -07:00
}
2018-01-18 09:33:20 -07:00
}
2018-02-06 04:06:30 -07:00
if socketio != nil {
if err := socketio . Shutdown ( ctx ) ; err != nil {
glog . Error ( "SocketIo.Shutdown error: " , err )
}
}
2018-03-15 05:34:29 -06:00
if chain != nil {
if err := chain . Shutdown ( ) ; err != nil {
glog . Error ( "BlockChain.Shutdown error: " , err )
}
}
2017-08-28 09:50:57 -06:00
}
2018-02-03 11:14:27 -07:00
func printResult ( txid string , vout uint32 , isOutput bool ) error {
glog . Info ( txid , vout , isOutput )
2017-08-28 09:50:57 -06:00
return nil
}