blockbook/blockbook.go

432 lines
12 KiB
Go
Raw Normal View History

2017-08-28 09:50:57 -06:00
package main
import (
"context"
2017-08-28 09:50:57 -06:00
"flag"
2018-03-09 09:05:27 -07:00
"log"
"os"
"os/signal"
"syscall"
2017-08-28 09:50:57 -06:00
"time"
2017-09-12 18:50:34 -06:00
"github.com/juju/errors"
2018-01-31 07:23:17 -07:00
"blockbook/bchain"
"blockbook/bchain/coins"
2018-03-13 04:34:49 -06:00
"blockbook/common"
2018-01-18 08:44:31 -07:00
"blockbook/db"
"blockbook/server"
"github.com/golang/glog"
2018-03-09 09:05:27 -07:00
"net/http"
_ "net/http/pprof"
2017-08-28 09:50:57 -06:00
)
2018-02-06 01:12:50 -07:00
// resync index at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ)
const resyncIndexPeriodMs = 935093
// debounce too close requests for resync
const debounceResyncIndexMs = 1009
// resync mempool at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ)
const resyncMempoolPeriodMs = 60017
// debounce too close requests for resync mempool (ZeroMQ sends message for each tx, when new block there are many transactions)
const debounceResyncMempoolMs = 1009
2018-05-23 00:54:02 -06:00
// store internal state about once every minute
const storeInternalStatePeriodMs = 59699
2017-08-28 09:50:57 -06:00
var (
blockchain = flag.String("blockchaincfg", "", "path to blockchain RPC service configuration json file")
2017-08-28 09:50:57 -06:00
dbPath = flag.String("datadir", "./data", "path to database directory")
2017-08-28 09:50:57 -06:00
2018-03-01 10:37:01 -07:00
blockFrom = flag.Int("blockheight", -1, "height of the starting block")
2018-01-29 09:27:42 -07:00
blockUntil = flag.Int("blockuntil", -1, "height of the final block")
rollbackHeight = flag.Int("rollback", -1, "rollback to the given height and quit")
2017-10-05 06:35:07 -06:00
2017-08-28 09:50:57 -06:00
queryAddress = flag.String("address", "", "query contents of this address")
2018-01-31 07:03:06 -07:00
synchronize = flag.Bool("sync", false, "synchronizes until tip, if together with zeromq, keeps index synchronized")
repair = flag.Bool("repair", false, "repair the database")
2018-03-09 09:05:27 -07:00
prof = flag.String("prof", "", "http server binding [address]:port of the interface to profiling data /debug/pprof/ (default no profiling)")
2017-10-06 04:57:51 -06:00
2018-02-26 08:44:25 -07:00
syncChunk = flag.Int("chunk", 100, "block chunk size for processing")
2018-05-03 06:49:19 -06:00
syncWorkers = flag.Int("workers", 8, "number of workers to process blocks")
2018-02-26 08:44:25 -07:00
dryRun = flag.Bool("dryrun", false, "do not index blocks, only download")
2018-01-18 08:44:31 -07:00
2018-02-07 14:56:17 -07:00
httpServerBinding = flag.String("httpserver", "", "http server binding [address]:port, (default no http server)")
2018-01-19 07:58:46 -07:00
2018-02-07 14:56:17 -07:00
socketIoBinding = flag.String("socketio", "", "socketio server binding [address]:port[/path], (default no socket.io server)")
2018-02-07 14:56:17 -07:00
certFiles = flag.String("certfile", "", "to enable SSL specify path to certificate files without extension, expecting <certfile>.crt and <certfile>.key, (default no SSL)")
explorerURL = flag.String("explorer", "", "address of blockchain explorer")
2018-05-03 06:49:19 -06:00
coin = flag.String("coin", "btc", "coin name")
2018-05-14 07:49:08 -06:00
noTxCache = flag.Bool("notxcache", false, "disable tx cache")
2017-08-28 09:50:57 -06:00
)
2018-01-31 07:03:06 -07:00
var (
2018-05-23 00:54:02 -06:00
chanSyncIndex = make(chan struct{})
chanSyncMempool = make(chan struct{})
chanStoreInternalState = make(chan struct{})
chanSyncIndexDone = make(chan struct{})
chanSyncMempoolDone = make(chan struct{})
chanStoreInternalStateDone = make(chan struct{})
chain bchain.BlockChain
index *db.RocksDB
txCache *db.TxCache
syncWorker *db.SyncWorker
callbacksOnNewBlockHash []func(hash string)
callbacksOnNewTxAddr []func(txid string, addr string)
chanOsSignal chan os.Signal
2018-01-31 07:03:06 -07:00
)
2018-04-12 07:23:16 -06:00
func init() {
2018-05-17 04:31:45 -06:00
glog.MaxSize = 1024 * 1024 * 8
2018-04-12 07:23:16 -06:00
glog.CopyStandardLogTo("INFO")
}
func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, error) {
var chain bchain.BlockChain
var err error
timer := time.NewTimer(time.Second)
for i := 0; ; i++ {
if chain, err = coins.NewBlockChain(coin, configfile, pushHandler, metrics); err != nil {
if i < seconds {
glog.Error("rpc: ", err, " Retrying...")
select {
case <-chanOsSignal:
return nil, errors.New("Interrupted")
case <-timer.C:
timer.Reset(time.Second)
continue
}
} else {
return nil, err
}
}
return chain, nil
}
}
2017-08-28 09:50:57 -06:00
func main() {
flag.Parse()
2018-01-31 07:03:06 -07:00
defer glog.Flush()
2018-02-28 16:59:25 -07:00
chanOsSignal = make(chan os.Signal, 1)
signal.Notify(chanOsSignal, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
2018-05-22 09:22:22 -06:00
glog.Infof("Blockbook: %+v", common.GetVersionInfo())
2018-03-09 09:05:27 -07:00
if *prof != "" {
go func() {
log.Println(http.ListenAndServe(*prof, nil))
}()
2018-01-19 07:58:46 -07:00
}
2017-09-12 08:53:40 -06:00
if *repair {
if err := db.RepairRocksDB(*dbPath); err != nil {
glog.Fatalf("RepairRocksDB %s: %v", *dbPath, err)
2017-09-12 08:53:40 -06:00
}
return
}
metrics, err := common.GetMetrics(*coin)
2018-03-13 04:34:49 -06:00
if err != nil {
glog.Fatal("GetMetrics: ", err)
}
if *blockchain == "" {
glog.Fatal("Missing blockchaincfg configuration parameter")
}
if chain, err = getBlockChainWithRetry(*coin, *blockchain, pushSynchronizationHandler, metrics, 60); err != nil {
glog.Fatal("rpc: ", err)
2017-10-07 03:05:35 -06:00
}
index, err = db.NewRocksDB(*dbPath, chain.GetChainParser())
2017-08-28 09:50:57 -06:00
if err != nil {
glog.Fatal("rocksDB: ", err)
2017-08-28 09:50:57 -06:00
}
2018-01-31 07:03:06 -07:00
defer index.Close()
2017-08-28 09:50:57 -06:00
common.IS, err = index.LoadInternalState(*coin)
2018-05-22 04:56:51 -06:00
if err != nil {
glog.Fatal("internalState: ", err)
}
if common.IS.DbState != common.DbStateClosed {
glog.Warning("internalState: database in not closed state ", common.IS.DbState, ", possibly previous ungraceful shutdown")
}
syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics)
if err != nil {
glog.Fatalf("NewSyncWorker %v", err)
}
2018-05-22 04:56:51 -06:00
// set the DbState to open at this moment, after all important workers are initialized
common.IS.DbState = common.DbStateOpen
err = index.StoreInternalState(common.IS)
if err != nil {
glog.Fatal("internalState: ", err)
}
2018-01-29 09:27:42 -07:00
if *rollbackHeight >= 0 {
bestHeight, bestHash, err := index.GetBestBlock()
2018-01-29 09:27:42 -07:00
if err != nil {
glog.Error("rollbackHeight: ", err)
return
2018-01-29 09:27:42 -07:00
}
if uint32(*rollbackHeight) > bestHeight {
glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight)
2018-01-29 09:27:42 -07:00
} else {
hashes := []string{bestHash}
for height := bestHeight - 1; height >= uint32(*rollbackHeight); height-- {
hash, err := index.GetBlockHash(height)
if err != nil {
glog.Error("rollbackHeight: ", err)
return
}
hashes = append(hashes, hash)
}
err = syncWorker.DisconnectBlocks(uint32(*rollbackHeight), bestHeight, hashes)
2018-01-29 09:27:42 -07:00
if err != nil {
glog.Error("rollbackHeight: ", err)
return
2018-01-29 09:27:42 -07:00
}
}
return
}
2018-05-14 07:49:08 -06:00
if txCache, err = db.NewTxCache(index, chain, metrics, !*noTxCache); err != nil {
2018-03-06 04:36:24 -07:00
glog.Error("txCache ", err)
return
}
var httpServer *server.HTTPServer
2018-01-24 07:10:35 -07:00
if *httpServerBinding != "" {
httpServer, err = server.NewHTTPServer(*httpServerBinding, *certFiles, index, chain, txCache)
2018-01-18 08:44:31 -07:00
if err != nil {
glog.Error("https: ", err)
return
2018-01-18 08:44:31 -07:00
}
go func() {
err = httpServer.Run()
if err != nil {
2018-01-31 07:34:20 -07:00
if err.Error() == "http: Server closed" {
glog.Info(err)
} else {
glog.Error(err)
return
2018-01-31 07:34:20 -07:00
}
}
}()
2018-01-18 08:44:31 -07:00
}
if *synchronize {
if err := syncWorker.ResyncIndex(nil); err != nil {
glog.Error("resyncIndex ", err)
return
}
if err = chain.ResyncMempool(nil); err != nil {
glog.Error("resyncMempool ", err)
return
}
}
var socketIoServer *server.SocketIoServer
if *socketIoBinding != "" {
2018-03-13 04:34:49 -06:00
socketIoServer, err = server.NewSocketIoServer(
*socketIoBinding, *certFiles, index, chain, txCache, *explorerURL, metrics)
if err != nil {
glog.Error("socketio: ", err)
return
}
go func() {
err = socketIoServer.Run()
if err != nil {
if err.Error() == "http: Server closed" {
glog.Info(err)
} else {
glog.Error(err)
return
}
}
}()
callbacksOnNewBlockHash = append(callbacksOnNewBlockHash, socketIoServer.OnNewBlockHash)
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, socketIoServer.OnNewTxAddr)
}
if *synchronize {
// start the synchronization loops after the server interfaces are started
go syncIndexLoop()
go syncMempoolLoop()
2018-05-23 00:54:02 -06:00
go storeInternalStateLoop()
}
2018-03-01 10:37:01 -07:00
if *blockFrom >= 0 {
2017-08-28 09:50:57 -06:00
if *blockUntil < 0 {
2018-03-01 10:37:01 -07:00
*blockUntil = *blockFrom
2017-08-28 09:50:57 -06:00
}
2018-03-01 10:37:01 -07:00
height := uint32(*blockFrom)
2017-08-28 09:50:57 -06:00
until := uint32(*blockUntil)
address := *queryAddress
if address != "" {
if err = index.GetTransactions(address, height, until, printResult); err != nil {
glog.Error("GetTransactions ", err)
return
2017-08-28 09:50:57 -06:00
}
2018-01-31 07:03:06 -07:00
} else if !*synchronize {
2018-04-28 16:17:30 -06:00
if err = syncWorker.ConnectBlocksParallel(height, until); err != nil {
glog.Error("connectBlocksParallel ", err)
return
2017-08-28 09:50:57 -06:00
}
}
}
if httpServer != nil || socketIoServer != nil || chain != nil {
waitForSignalAndShutdown(httpServer, socketIoServer, chain, 5*time.Second)
}
2018-01-31 07:03:06 -07:00
2018-02-02 08:17:33 -07:00
if *synchronize {
close(chanSyncIndex)
close(chanSyncMempool)
2018-05-23 00:54:02 -06:00
close(chanStoreInternalState)
2018-02-02 08:17:33 -07:00
<-chanSyncIndexDone
<-chanSyncMempoolDone
2018-05-23 00:54:02 -06:00
<-chanStoreInternalState
2018-02-02 08:17:33 -07:00
}
2018-01-31 07:03:06 -07:00
}
func tickAndDebounce(tickTime time.Duration, debounceTime time.Duration, input chan struct{}, f func()) {
timer := time.NewTimer(tickTime)
var firstDebounce time.Time
Loop:
for {
select {
case _, ok := <-input:
if !timer.Stop() {
<-timer.C
}
// exit loop on closed input channel
if !ok {
break Loop
}
if firstDebounce.IsZero() {
firstDebounce = time.Now()
}
// debounce for up to debounceTime period
// afterwards execute immediately
if firstDebounce.Add(debounceTime).After(time.Now()) {
timer.Reset(debounceTime)
} else {
timer.Reset(0)
}
case <-timer.C:
// do the action and start the loop again
f()
timer.Reset(tickTime)
firstDebounce = time.Time{}
}
}
}
2018-01-31 09:51:48 -07:00
func syncIndexLoop() {
defer close(chanSyncIndexDone)
glog.Info("syncIndexLoop starting")
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
2018-02-06 01:12:50 -07:00
tickAndDebounce(resyncIndexPeriodMs*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
2018-03-01 10:37:01 -07:00
if err := syncWorker.ResyncIndex(onNewBlockHash); err != nil {
glog.Error("syncIndexLoop ", errors.ErrorStack(err))
2018-01-31 09:51:48 -07:00
}
})
2018-01-31 09:51:48 -07:00
glog.Info("syncIndexLoop stopped")
}
func onNewBlockHash(hash string) {
for _, c := range callbacksOnNewBlockHash {
c(hash)
}
}
2018-01-31 09:51:48 -07:00
func syncMempoolLoop() {
defer close(chanSyncMempoolDone)
glog.Info("syncMempoolLoop starting")
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
2018-02-06 01:12:50 -07:00
tickAndDebounce(resyncMempoolPeriodMs*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() {
common.IS.StartedMempoolSync()
if err := chain.ResyncMempool(onNewTxAddr); err != nil {
glog.Error("syncMempoolLoop ", errors.ErrorStack(err))
} else {
common.IS.FinishedMempoolSync()
2018-01-31 09:51:48 -07:00
}
})
2018-01-31 09:51:48 -07:00
glog.Info("syncMempoolLoop stopped")
}
2018-05-23 00:54:02 -06:00
func storeInternalStateLoop() {
defer close(chanStoreInternalStateDone)
glog.Info("storeInternalStateLoop starting")
tickAndDebounce(storeInternalStatePeriodMs*time.Millisecond, (storeInternalStatePeriodMs-1)*time.Millisecond, chanStoreInternalState, func() {
if err := index.StoreInternalState(common.IS); err != nil {
glog.Error("storeInternalStateLoop ", errors.ErrorStack(err))
}
})
glog.Info("storeInternalStateLoop stopped")
}
func onNewTxAddr(txid string, addr string) {
for _, c := range callbacksOnNewTxAddr {
c(txid, addr)
}
}
func pushSynchronizationHandler(nt bchain.NotificationType) {
glog.V(1).Infof("MQ: notification ", nt)
if nt == bchain.NotificationNewBlock {
2018-01-31 09:51:48 -07:00
chanSyncIndex <- struct{}{}
} else if nt == bchain.NotificationNewTx {
2018-01-31 09:51:48 -07:00
chanSyncMempool <- struct{}{}
2018-01-31 07:03:06 -07:00
} else {
glog.Error("MQ: unknown notification sent")
2018-01-31 07:03:06 -07:00
}
}
func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketIoServer, chain bchain.BlockChain, timeout time.Duration) {
2018-02-28 16:59:25 -07:00
sig := <-chanOsSignal
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
glog.Infof("Shutdown: %v", sig)
if https != nil {
if err := https.Shutdown(ctx); err != nil {
glog.Error("HttpServer.Shutdown error: ", err)
2018-01-18 12:32:10 -07:00
}
}
if socketio != nil {
if err := socketio.Shutdown(ctx); err != nil {
glog.Error("SocketIo.Shutdown error: ", err)
}
}
if chain != nil {
if err := chain.Shutdown(); err != nil {
glog.Error("BlockChain.Shutdown error: ", err)
}
}
2017-08-28 09:50:57 -06:00
}
2018-02-03 11:14:27 -07:00
func printResult(txid string, vout uint32, isOutput bool) error {
glog.Info(txid, vout, isOutput)
2017-08-28 09:50:57 -06:00
return nil
}