blockbook/blockbook.go

468 lines
10 KiB
Go
Raw Normal View History

2017-08-28 09:50:57 -06:00
package main
import (
"context"
"encoding/hex"
2017-08-28 09:50:57 -06:00
"flag"
"log"
"os"
"os/signal"
2017-10-05 06:35:07 -06:00
"sync"
"syscall"
2017-08-28 09:50:57 -06:00
"time"
2017-09-12 18:50:34 -06:00
"blockbook/bitcoin"
2018-01-18 08:44:31 -07:00
"blockbook/db"
"blockbook/server"
2017-09-12 18:50:34 -06:00
"github.com/pkg/profile"
2017-08-28 09:50:57 -06:00
)
2017-10-05 06:35:07 -06:00
type Blockchain interface {
2017-09-06 02:59:40 -06:00
GetBestBlockHash() (string, error)
GetBestBlockHeight() (uint32, error)
2017-08-28 09:50:57 -06:00
GetBlockHash(height uint32) (string, error)
GetBlockHeader(hash string) (*bitcoin.BlockHeader, error)
GetBlock(hash string) (*bitcoin.Block, error)
2017-08-28 09:50:57 -06:00
}
2017-10-05 06:35:07 -06:00
type Index interface {
2018-01-24 10:02:46 -07:00
GetBestBlock() (uint32, string, error)
2017-10-05 06:35:07 -06:00
GetBlockHash(height uint32) (string, error)
GetTransactions(outputScript []byte, lower uint32, higher uint32, fn func(txid string) error) error
ConnectBlock(block *bitcoin.Block) error
DisconnectBlock(block *bitcoin.Block) error
2018-01-27 16:59:54 -07:00
DisconnectBlocks(lower uint32, higher uint32) error
2017-08-28 09:50:57 -06:00
}
var (
rpcURL = flag.String("rpcurl", "http://localhost:8332", "url of bitcoin RPC service")
rpcUser = flag.String("rpcuser", "rpc", "rpc username")
rpcPass = flag.String("rpcpass", "rpc", "rpc password")
rpcTimeout = flag.Uint("rpctimeout", 25, "rpc timeout in seconds")
dbPath = flag.String("path", "./data", "path to address index directory")
2018-01-29 09:27:42 -07:00
blockHeight = flag.Int("blockheight", -1, "height of the starting block")
blockUntil = flag.Int("blockuntil", -1, "height of the final block")
rollbackHeight = flag.Int("rollback", -1, "rollback to the given height and quit")
2017-10-05 06:35:07 -06:00
2017-08-28 09:50:57 -06:00
queryAddress = flag.String("address", "", "query contents of this address")
resync = flag.Bool("resync", false, "resync until tip")
2017-09-12 08:53:40 -06:00
repair = flag.Bool("repair", false, "repair the database")
2017-09-12 18:50:34 -06:00
prof = flag.Bool("prof", false, "profile program execution")
2017-10-06 04:57:51 -06:00
syncChunk = flag.Int("chunk", 100, "block chunk size for processing")
syncWorkers = flag.Int("workers", 8, "number of workers to process blocks")
dryRun = flag.Bool("dryrun", false, "do not index blocks, only download")
2017-10-07 03:05:35 -06:00
parse = flag.Bool("parse", false, "use in-process block parsing")
2018-01-18 08:44:31 -07:00
2018-01-24 07:10:35 -07:00
httpServerBinding = flag.String("httpserver", "", "http server binding [address]:port, if missing no http server")
2018-01-19 07:58:46 -07:00
2018-01-24 07:10:35 -07:00
zeroMQBinding = flag.String("zeromq", "", "binding to zeromq, if missing no zeromq connection")
2017-08-28 09:50:57 -06:00
)
func main() {
flag.Parse()
if *prof {
defer profile.Start().Stop()
2018-01-19 07:58:46 -07:00
}
2017-09-12 08:53:40 -06:00
if *repair {
if err := db.RepairRocksDB(*dbPath); err != nil {
2018-01-25 04:31:57 -07:00
log.Fatalf("RepairRocksDB %s: %v", *dbPath, err)
2017-09-12 08:53:40 -06:00
}
return
}
rpc := bitcoin.NewBitcoinRPC(
2017-10-05 06:35:07 -06:00
*rpcURL,
*rpcUser,
*rpcPass,
time.Duration(*rpcTimeout)*time.Second)
2017-08-28 09:50:57 -06:00
2017-10-07 03:05:35 -06:00
if *parse {
rpc.Parser = &bitcoin.BitcoinBlockParser{
Params: bitcoin.GetChainParams()[0],
2017-10-07 03:05:35 -06:00
}
}
2018-01-18 08:44:31 -07:00
db, err := db.NewRocksDB(*dbPath)
2017-08-28 09:50:57 -06:00
if err != nil {
2018-01-25 04:31:57 -07:00
log.Fatalf("NewRocksDB %v", err)
2017-08-28 09:50:57 -06:00
}
defer db.Close()
2018-01-29 09:27:42 -07:00
if *rollbackHeight >= 0 {
bestHeight, _, err := db.GetBestBlock()
if err != nil {
log.Fatalf("rollbackHeight: %v", err)
}
if uint32(*rollbackHeight) > bestHeight {
log.Printf("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight)
} else {
err = db.DisconnectBlocks(uint32(*rollbackHeight), bestHeight)
if err != nil {
log.Fatalf("rollbackHeight: %v", err)
}
}
return
}
2018-01-27 16:59:54 -07:00
if *resync {
if err := resyncIndex(rpc, db); err != nil {
log.Fatalf("resyncIndex %v", err)
}
}
2018-01-27 16:59:54 -07:00
var httpServer *server.HttpServer
2018-01-24 07:10:35 -07:00
if *httpServerBinding != "" {
2018-01-18 16:18:08 -07:00
httpServer, err = server.New(*httpServerBinding, db)
2018-01-18 08:44:31 -07:00
if err != nil {
log.Fatalf("https: %v", err)
2018-01-18 08:44:31 -07:00
}
go func() {
err = httpServer.Run()
if err != nil {
log.Fatalf("https: %v", err)
}
}()
2018-01-18 08:44:31 -07:00
}
var mq *bitcoin.MQ
2018-01-24 07:10:35 -07:00
if *zeroMQBinding != "" {
mq, err = bitcoin.New(*zeroMQBinding, mqHandler)
if err != nil {
log.Fatalf("mq: %v", err)
}
}
2017-08-28 09:50:57 -06:00
if *blockHeight >= 0 {
if *blockUntil < 0 {
*blockUntil = *blockHeight
}
height := uint32(*blockHeight)
until := uint32(*blockUntil)
address := *queryAddress
if address != "" {
script, err := bitcoin.AddressToOutputScript(address)
if err != nil {
log.Fatalf("GetTransactions %v", err)
}
if err = db.GetTransactions(script, height, until, printResult); err != nil {
2018-01-25 04:31:57 -07:00
log.Fatalf("GetTransactions %v", err)
2017-08-28 09:50:57 -06:00
}
} else if !*resync {
2017-10-06 04:57:51 -06:00
if err = connectBlocksParallel(
rpc,
db,
height,
until,
*syncChunk,
*syncWorkers,
); err != nil {
2018-01-25 04:31:57 -07:00
log.Fatalf("connectBlocksParallel %v", err)
2017-08-28 09:50:57 -06:00
}
}
}
if httpServer != nil {
waitForSignalAndShutdown(httpServer, mq, 5*time.Second)
}
}
func mqHandler(m *bitcoin.MQMessage) {
body := hex.EncodeToString(m.Body)
log.Printf("MQ: %s-%d %s", m.Topic, m.Sequence, body)
}
func waitForSignalAndShutdown(s *server.HttpServer, mq *bitcoin.MQ, timeout time.Duration) {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
2018-01-18 12:32:10 -07:00
sig := <-stop
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
2018-01-27 16:59:54 -07:00
log.Printf("Shutdown: %v", sig)
if mq != nil {
if err := mq.Shutdown(); err != nil {
log.Printf("MQ.Shutdown error: %v", err)
}
}
2018-01-18 12:32:10 -07:00
if s != nil {
if err := s.Shutdown(ctx); err != nil {
log.Printf("HttpServer.Shutdown error: %v", err)
2018-01-18 12:32:10 -07:00
}
}
2017-08-28 09:50:57 -06:00
}
2017-10-05 06:35:07 -06:00
func printResult(txid string) error {
log.Printf("%s", txid)
2017-08-28 09:50:57 -06:00
return nil
}
2017-10-05 06:35:07 -06:00
func resyncIndex(chain Blockchain, index Index) error {
remote, err := chain.GetBestBlockHash()
if err != nil {
return err
}
2018-01-27 16:59:54 -07:00
localBestHeight, local, err := index.GetBestBlock()
if err != nil {
2017-10-05 06:35:07 -06:00
local = ""
}
2017-08-28 09:50:57 -06:00
2018-01-27 16:59:54 -07:00
// If the locally indexed block is the same as the best block on the
// network, we're done.
if local == remote {
log.Printf("resync: synced on %d %s", localBestHeight, local)
return nil
}
var header *bitcoin.BlockHeader
if local != "" {
// Is local tip on the best chain?
header, err = chain.GetBlockHeader(local)
forked := false
2017-08-28 09:50:57 -06:00
if err != nil {
if e, ok := err.(*bitcoin.RPCError); ok && e.Message == "Block not found" {
forked = true
} else {
return err
}
} else {
if header.Confirmations < 0 {
forked = true
}
}
if forked {
// find and disconnect forked blocks and then synchronize again
log.Printf("resync: local is forked")
var height uint32
for height = localBestHeight - 1; height >= 0; height-- {
local, err = index.GetBlockHash(height)
if err != nil {
return err
}
remote, err = chain.GetBlockHash(height)
if err != nil {
return err
}
if local == remote {
break
}
2018-01-27 16:59:54 -07:00
}
err = index.DisconnectBlocks(height+1, localBestHeight)
2018-01-27 16:59:54 -07:00
if err != nil {
return err
}
return resyncIndex(chain, index)
}
}
startHeight := uint32(0)
var hash string
if header != nil {
log.Printf("resync: local is behind")
hash = header.Next
startHeight = localBestHeight
} else {
// If the local block is missing, we're indexing from the genesis block
// or from the start block specified by flags
if *blockHeight > 0 {
startHeight = uint32(*blockHeight)
}
log.Printf("resync: genesis from block %d", startHeight)
hash, err = chain.GetBlockHash(startHeight)
if err != nil {
return err
2018-01-27 16:59:54 -07:00
}
}
// if parallel operation is enabled and the number of blocks to be connected is large,
// use parallel routine to load majority of blocks
if *syncWorkers > 1 {
chainBestHeight, err := chain.GetBestBlockHeight()
2018-01-27 16:59:54 -07:00
if err != nil {
return err
}
if chainBestHeight-startHeight > uint32(*syncChunk) {
log.Printf("resync: parallel sync of blocks %d-%d", startHeight, chainBestHeight)
err = connectBlocksParallel(
chain,
index,
startHeight,
chainBestHeight,
*syncChunk,
*syncWorkers,
)
if err != nil {
return err
}
// after parallel load finish the sync using standard way,
// new blocks may have been created in the meantime
return resyncIndex(chain, index)
}
}
2018-01-27 16:59:54 -07:00
return connectBlocks(chain, index, hash)
}
2017-08-28 09:50:57 -06:00
2018-01-27 16:59:54 -07:00
func connectBlocks(
2017-10-05 06:35:07 -06:00
chain Blockchain,
index Index,
hash string,
) error {
2017-09-11 07:06:16 -06:00
bch := make(chan blockResult, 8)
done := make(chan struct{})
defer close(done)
2017-10-05 06:35:07 -06:00
go getBlockChain(hash, chain, bch, done)
2017-09-11 07:06:16 -06:00
for res := range bch {
2017-10-05 06:35:07 -06:00
if res.err != nil {
return res.err
2017-08-28 09:50:57 -06:00
}
2017-10-05 06:35:07 -06:00
err := index.ConnectBlock(res.block)
2017-08-28 09:50:57 -06:00
if err != nil {
return err
2017-08-28 09:50:57 -06:00
}
2017-09-06 07:36:55 -06:00
}
return nil
2017-08-28 09:50:57 -06:00
}
2017-10-05 06:35:07 -06:00
func connectBlocksParallel(
chain Blockchain,
index Index,
2017-09-04 06:16:37 -06:00
lower uint32,
higher uint32,
2017-10-06 04:57:51 -06:00
chunkSize int,
numWorkers int,
2017-09-04 06:16:37 -06:00
) error {
2017-10-05 06:35:07 -06:00
var wg sync.WaitGroup
2017-09-04 06:16:37 -06:00
2017-10-05 06:35:07 -06:00
work := func(i int) {
defer wg.Done()
offset := uint32(chunkSize * i)
stride := uint32(chunkSize * numWorkers)
2017-10-07 02:42:31 -06:00
for low := lower + offset; low <= higher; low += stride {
2017-10-06 04:57:51 -06:00
high := low + uint32(chunkSize-1)
2017-10-05 06:35:07 -06:00
if high > higher {
high = higher
}
err := connectBlockChunk(chain, index, low, high)
if err != nil {
if e, ok := err.(*bitcoin.RPCError); ok && (e.Message == "Block height out of range" || e.Message == "Block not found") {
break
}
log.Fatalf("connectBlocksParallel %d-%d %v", low, high, err)
2017-10-05 06:35:07 -06:00
}
2017-08-28 09:50:57 -06:00
}
2017-09-04 06:16:37 -06:00
}
2017-10-05 06:35:07 -06:00
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go work(i)
}
wg.Wait()
2017-09-04 06:16:37 -06:00
2017-10-05 06:35:07 -06:00
return nil
2017-09-04 06:16:37 -06:00
}
2017-10-05 06:35:07 -06:00
func connectBlockChunk(
chain Blockchain,
index Index,
lower uint32,
higher uint32,
2017-10-05 06:35:07 -06:00
) error {
connected, err := isBlockConnected(chain, index, higher)
if err != nil || connected {
// if higher is over the best block, continue with lower block, otherwise return error
if e, ok := err.(*bitcoin.RPCError); !ok || e.Message != "Block height out of range" {
return err
}
2017-10-05 06:35:07 -06:00
}
2017-09-06 02:59:40 -06:00
height := lower
2017-10-05 06:35:07 -06:00
hash, err := chain.GetBlockHash(lower)
if err != nil {
2017-10-05 06:35:07 -06:00
return err
}
2017-09-06 02:59:40 -06:00
for height <= higher {
2017-10-05 06:35:07 -06:00
block, err := chain.GetBlock(hash)
2017-09-04 06:16:37 -06:00
if err != nil {
2017-10-05 06:35:07 -06:00
return err
2017-08-28 09:50:57 -06:00
}
2017-09-06 02:59:40 -06:00
hash = block.Next
height = block.Height + 1
2017-10-06 04:57:51 -06:00
if *dryRun {
continue
}
2017-10-05 06:35:07 -06:00
err = index.ConnectBlock(block)
if err != nil {
return err
}
2017-08-28 09:50:57 -06:00
}
2017-10-05 06:35:07 -06:00
return nil
}
func isBlockConnected(
chain Blockchain,
index Index,
height uint32,
) (bool, error) {
local, err := index.GetBlockHash(height)
if err != nil {
return false, err
}
remote, err := chain.GetBlockHash(height)
if err != nil {
return false, err
}
if local != remote {
return false, nil
}
return true, nil
}
type blockResult struct {
block *bitcoin.Block
2017-10-05 06:35:07 -06:00
err error
2017-08-28 09:50:57 -06:00
}
2017-09-11 07:06:16 -06:00
func getBlockChain(
hash string,
2017-10-05 06:35:07 -06:00
chain Blockchain,
2017-09-11 07:06:16 -06:00
out chan blockResult,
done chan struct{},
) {
defer close(out)
for hash != "" {
select {
case <-done:
return
default:
}
2017-10-05 06:35:07 -06:00
block, err := chain.GetBlock(hash)
2017-09-11 07:06:16 -06:00
if err != nil {
out <- blockResult{err: err}
return
}
hash = block.Next
out <- blockResult{block: block}
}
}