484 lines
10 KiB
Go
484 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"flag"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"blockbook/bchain"
|
|
"blockbook/db"
|
|
"blockbook/server"
|
|
|
|
"github.com/golang/glog"
|
|
"github.com/pkg/profile"
|
|
)
|
|
|
|
var (
|
|
rpcURL = flag.String("rpcurl", "http://localhost:8332", "url of bitcoin RPC service")
|
|
rpcUser = flag.String("rpcuser", "rpc", "rpc username")
|
|
rpcPass = flag.String("rpcpass", "rpc", "rpc password")
|
|
rpcTimeout = flag.Uint("rpctimeout", 25, "rpc timeout in seconds")
|
|
|
|
dbPath = flag.String("path", "./data", "path to address index directory")
|
|
|
|
blockHeight = flag.Int("blockheight", -1, "height of the starting block")
|
|
blockUntil = flag.Int("blockuntil", -1, "height of the final block")
|
|
rollbackHeight = flag.Int("rollback", -1, "rollback to the given height and quit")
|
|
|
|
queryAddress = flag.String("address", "", "query contents of this address")
|
|
|
|
synchronize = flag.Bool("sync", false, "synchronizes until tip, if together with zeromq, keeps index synchronized")
|
|
repair = flag.Bool("repair", false, "repair the database")
|
|
prof = flag.Bool("prof", false, "profile program execution")
|
|
|
|
syncChunk = flag.Int("chunk", 100, "block chunk size for processing")
|
|
syncWorkers = flag.Int("workers", 8, "number of workers to process blocks")
|
|
dryRun = flag.Bool("dryrun", false, "do not index blocks, only download")
|
|
parse = flag.Bool("parse", false, "use in-process block parsing")
|
|
|
|
httpServerBinding = flag.String("httpserver", "", "http server binding [address]:port, if missing no http server")
|
|
|
|
zeroMQBinding = flag.String("zeromq", "", "binding to zeromq, if missing no zeromq connection")
|
|
)
|
|
|
|
var (
|
|
syncChannel = make(chan struct{})
|
|
chain *bchain.BitcoinRPC
|
|
index *db.RocksDB
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
// override setting for glog to log only to stderr, to match the http handler
|
|
flag.Lookup("logtostderr").Value.Set("true")
|
|
|
|
defer glog.Flush()
|
|
|
|
if *prof {
|
|
defer profile.Start().Stop()
|
|
}
|
|
|
|
if *repair {
|
|
if err := db.RepairRocksDB(*dbPath); err != nil {
|
|
glog.Fatalf("RepairRocksDB %s: %v", *dbPath, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
chain = bchain.NewBitcoinRPC(
|
|
*rpcURL,
|
|
*rpcUser,
|
|
*rpcPass,
|
|
time.Duration(*rpcTimeout)*time.Second)
|
|
|
|
if *parse {
|
|
chain.Parser = &bchain.BitcoinBlockParser{
|
|
Params: bchain.GetChainParams()[0],
|
|
}
|
|
}
|
|
|
|
var err error
|
|
index, err = db.NewRocksDB(*dbPath)
|
|
if err != nil {
|
|
glog.Fatalf("NewRocksDB %v", err)
|
|
}
|
|
defer index.Close()
|
|
|
|
if *rollbackHeight >= 0 {
|
|
bestHeight, _, err := index.GetBestBlock()
|
|
if err != nil {
|
|
glog.Fatalf("rollbackHeight: %v", err)
|
|
}
|
|
if uint32(*rollbackHeight) > bestHeight {
|
|
glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight)
|
|
} else {
|
|
err = index.DisconnectBlocks(uint32(*rollbackHeight), bestHeight)
|
|
if err != nil {
|
|
glog.Fatalf("rollbackHeight: %v", err)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
if *synchronize {
|
|
if err := resyncIndex(); err != nil {
|
|
glog.Fatal("resyncIndex ", err)
|
|
}
|
|
}
|
|
|
|
var httpServer *server.HttpServer
|
|
if *httpServerBinding != "" {
|
|
httpServer, err = server.New(*httpServerBinding, index)
|
|
if err != nil {
|
|
glog.Fatal("https: ", err)
|
|
}
|
|
go func() {
|
|
err = httpServer.Run()
|
|
if err != nil {
|
|
if err.Error() == "http: Server closed" {
|
|
glog.Info(err)
|
|
} else {
|
|
glog.Fatal(err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
var mq *bchain.MQ
|
|
if *zeroMQBinding != "" {
|
|
if !*synchronize {
|
|
glog.Error("zeromq connection without synchronization does not make sense, ignoring zeromq parameter")
|
|
} else {
|
|
go syncLoop()
|
|
mq, err = bchain.NewMQ(*zeroMQBinding, mqHandler)
|
|
if err != nil {
|
|
glog.Fatal("mq: ", err)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
if *blockHeight >= 0 {
|
|
if *blockUntil < 0 {
|
|
*blockUntil = *blockHeight
|
|
}
|
|
height := uint32(*blockHeight)
|
|
until := uint32(*blockUntil)
|
|
address := *queryAddress
|
|
|
|
if address != "" {
|
|
script, err := bchain.AddressToOutputScript(address)
|
|
if err != nil {
|
|
glog.Fatalf("GetTransactions %v", err)
|
|
}
|
|
if err = index.GetTransactions(script, height, until, printResult); err != nil {
|
|
glog.Fatalf("GetTransactions %v", err)
|
|
}
|
|
} else if !*synchronize {
|
|
if err = connectBlocksParallel(
|
|
height,
|
|
until,
|
|
*syncChunk,
|
|
*syncWorkers,
|
|
); err != nil {
|
|
glog.Fatalf("connectBlocksParallel %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if httpServer != nil || mq != nil {
|
|
waitForSignalAndShutdown(httpServer, mq, 5*time.Second)
|
|
}
|
|
|
|
}
|
|
|
|
func syncLoop() {
|
|
for range syncChannel {
|
|
resyncIndex()
|
|
}
|
|
}
|
|
|
|
func mqHandler(m *bchain.MQMessage) {
|
|
body := hex.EncodeToString(m.Body)
|
|
glog.Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body)
|
|
if m.Topic == "hashblock" {
|
|
syncChannel <- struct{}{}
|
|
} else if m.Topic == "hashtx" {
|
|
|
|
} else {
|
|
glog.Errorf("MQ: unknown message %s-%d %s", m.Topic, m.Sequence, body)
|
|
}
|
|
}
|
|
|
|
func waitForSignalAndShutdown(s *server.HttpServer, mq *bchain.MQ, timeout time.Duration) {
|
|
stop := make(chan os.Signal, 1)
|
|
signal.Notify(stop, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
|
|
|
|
sig := <-stop
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
glog.Infof("Shutdown: %v", sig)
|
|
|
|
if mq != nil {
|
|
if err := mq.Shutdown(); err != nil {
|
|
glog.Error("MQ.Shutdown error: ", err)
|
|
}
|
|
}
|
|
|
|
if s != nil {
|
|
if err := s.Shutdown(ctx); err != nil {
|
|
glog.Error("HttpServer.Shutdown error: ", err)
|
|
}
|
|
}
|
|
|
|
close(syncChannel)
|
|
}
|
|
|
|
func printResult(txid string) error {
|
|
glog.Info(txid)
|
|
return nil
|
|
}
|
|
|
|
func resyncIndex() error {
|
|
remote, err := chain.GetBestBlockHash()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
localBestHeight, local, err := index.GetBestBlock()
|
|
if err != nil {
|
|
local = ""
|
|
}
|
|
|
|
// If the locally indexed block is the same as the best block on the
|
|
// network, we're done.
|
|
if local == remote {
|
|
glog.Infof("resync: synced on %d %s", localBestHeight, local)
|
|
return nil
|
|
}
|
|
|
|
var header *bchain.BlockHeader
|
|
if local != "" {
|
|
// Is local tip on the best chain?
|
|
header, err = chain.GetBlockHeader(local)
|
|
forked := false
|
|
if err != nil {
|
|
if e, ok := err.(*bchain.RPCError); ok && e.Message == "Block not found" {
|
|
forked = true
|
|
} else {
|
|
return err
|
|
}
|
|
} else {
|
|
if header.Confirmations < 0 {
|
|
forked = true
|
|
}
|
|
}
|
|
|
|
if forked {
|
|
// find and disconnect forked blocks and then synchronize again
|
|
glog.Info("resync: local is forked")
|
|
var height uint32
|
|
for height = localBestHeight - 1; height >= 0; height-- {
|
|
local, err = index.GetBlockHash(height)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
remote, err = chain.GetBlockHash(height)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if local == remote {
|
|
break
|
|
}
|
|
}
|
|
err = index.DisconnectBlocks(height+1, localBestHeight)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return resyncIndex()
|
|
}
|
|
}
|
|
|
|
startHeight := uint32(0)
|
|
var hash string
|
|
if header != nil {
|
|
glog.Info("resync: local is behind")
|
|
hash = header.Next
|
|
startHeight = localBestHeight
|
|
} else {
|
|
// If the local block is missing, we're indexing from the genesis block
|
|
// or from the start block specified by flags
|
|
if *blockHeight > 0 {
|
|
startHeight = uint32(*blockHeight)
|
|
}
|
|
glog.Info("resync: genesis from block ", startHeight)
|
|
hash, err = chain.GetBlockHash(startHeight)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// if parallel operation is enabled and the number of blocks to be connected is large,
|
|
// use parallel routine to load majority of blocks
|
|
if *syncWorkers > 1 {
|
|
chainBestHeight, err := chain.GetBestBlockHeight()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if chainBestHeight-startHeight > uint32(*syncChunk) {
|
|
glog.Infof("resync: parallel sync of blocks %d-%d", startHeight, chainBestHeight)
|
|
err = connectBlocksParallel(
|
|
startHeight,
|
|
chainBestHeight,
|
|
*syncChunk,
|
|
*syncWorkers,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// after parallel load finish the sync using standard way,
|
|
// new blocks may have been created in the meantime
|
|
return resyncIndex()
|
|
}
|
|
}
|
|
|
|
return connectBlocks(hash)
|
|
}
|
|
|
|
func connectBlocks(
|
|
hash string,
|
|
) error {
|
|
bch := make(chan blockResult, 8)
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
|
|
go getBlockChain(hash, bch, done)
|
|
|
|
var lastRes blockResult
|
|
for res := range bch {
|
|
lastRes = res
|
|
if res.err != nil {
|
|
return res.err
|
|
}
|
|
err := index.ConnectBlock(res.block)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if lastRes.block != nil {
|
|
glog.Infof("resync: synced on %d %s", lastRes.block.Height, lastRes.block.Hash)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func connectBlocksParallel(
|
|
lower uint32,
|
|
higher uint32,
|
|
chunkSize int,
|
|
numWorkers int,
|
|
) error {
|
|
var wg sync.WaitGroup
|
|
|
|
work := func(i int) {
|
|
defer wg.Done()
|
|
|
|
offset := uint32(chunkSize * i)
|
|
stride := uint32(chunkSize * numWorkers)
|
|
|
|
for low := lower + offset; low <= higher; low += stride {
|
|
high := low + uint32(chunkSize-1)
|
|
if high > higher {
|
|
high = higher
|
|
}
|
|
err := connectBlockChunk(low, high)
|
|
if err != nil {
|
|
if e, ok := err.(*bchain.RPCError); ok && (e.Message == "Block height out of range" || e.Message == "Block not found") {
|
|
break
|
|
}
|
|
glog.Fatalf("connectBlocksParallel %d-%d %v", low, high, err)
|
|
}
|
|
}
|
|
}
|
|
for i := 0; i < numWorkers; i++ {
|
|
wg.Add(1)
|
|
go work(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func connectBlockChunk(
|
|
lower uint32,
|
|
higher uint32,
|
|
) error {
|
|
connected, err := isBlockConnected(higher)
|
|
if err != nil || connected {
|
|
// if higher is over the best block, continue with lower block, otherwise return error
|
|
if e, ok := err.(*bchain.RPCError); !ok || e.Message != "Block height out of range" {
|
|
return err
|
|
}
|
|
}
|
|
|
|
height := lower
|
|
hash, err := chain.GetBlockHash(lower)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for height <= higher {
|
|
block, err := chain.GetBlock(hash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
hash = block.Next
|
|
height = block.Height + 1
|
|
if *dryRun {
|
|
continue
|
|
}
|
|
err = index.ConnectBlock(block)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if block.Height%1000 == 0 {
|
|
glog.Info("connected block ", block.Height, " ", block.Hash)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func isBlockConnected(
|
|
height uint32,
|
|
) (bool, error) {
|
|
local, err := index.GetBlockHash(height)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
remote, err := chain.GetBlockHash(height)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if local != remote {
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
type blockResult struct {
|
|
block *bchain.Block
|
|
err error
|
|
}
|
|
|
|
func getBlockChain(
|
|
hash string,
|
|
out chan blockResult,
|
|
done chan struct{},
|
|
) {
|
|
defer close(out)
|
|
|
|
for hash != "" {
|
|
select {
|
|
case <-done:
|
|
return
|
|
default:
|
|
}
|
|
block, err := chain.GetBlock(hash)
|
|
if err != nil {
|
|
out <- blockResult{err: err}
|
|
return
|
|
}
|
|
hash = block.Next
|
|
out <- blockResult{block: block}
|
|
}
|
|
}
|