From 71d669c0b98a1525244e1c83452403d372bd8231 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 31 Jan 2018 15:03:06 +0100 Subject: [PATCH] Sync index on message from zeroMQ --- bitcoin/mq.go | 6 ++- blockbook.go | 116 +++++++++++++++++++++++++------------------------- 2 files changed, 62 insertions(+), 60 deletions(-) diff --git a/bitcoin/mq.go b/bitcoin/mq.go index 75ddeafc..fbbd02bb 100644 --- a/bitcoin/mq.go +++ b/bitcoin/mq.go @@ -35,8 +35,10 @@ func New(binding string, callback func(*MQMessage)) (*MQ, error) { } socket.SetSubscribe("hashblock") socket.SetSubscribe("hashtx") - socket.SetSubscribe("rawblock") - socket.SetSubscribe("rawtx") + // for now do not use raw subscriptions - we would have to handle skipped/lost notifications from zeromq + // on each notification we do sync or syncmempool respectively + // socket.SetSubscribe("rawblock") + // socket.SetSubscribe("rawtx") socket.Connect(binding) glog.Info("MQ listening to ", binding) mq := &MQ{context, socket, true, make(chan bool)} diff --git a/blockbook.go b/blockbook.go index 2d915f83..da0fd6df 100644 --- a/blockbook.go +++ b/blockbook.go @@ -18,23 +18,6 @@ import ( "github.com/pkg/profile" ) -type Blockchain interface { - GetBestBlockHash() (string, error) - GetBestBlockHeight() (uint32, error) - GetBlockHash(height uint32) (string, error) - GetBlockHeader(hash string) (*bitcoin.BlockHeader, error) - GetBlock(hash string) (*bitcoin.Block, error) -} - -type Index interface { - GetBestBlock() (uint32, string, error) - GetBlockHash(height uint32) (string, error) - GetTransactions(outputScript []byte, lower uint32, higher uint32, fn func(txid string) error) error - ConnectBlock(block *bitcoin.Block) error - DisconnectBlock(block *bitcoin.Block) error - DisconnectBlocks(lower uint32, higher uint32) error -} - var ( rpcURL = flag.String("rpcurl", "http://localhost:8332", "url of bitcoin RPC service") rpcUser = flag.String("rpcuser", "rpc", "rpc username") @@ -49,9 +32,9 @@ var ( queryAddress = flag.String("address", "", "query contents of this address") - resync = flag.Bool("resync", false, "resync until tip") - repair = flag.Bool("repair", false, "repair the database") - prof = flag.Bool("prof", false, "profile program execution") + synchronize = flag.Bool("sync", false, "synchronizes until tip, if together with zeromq, keeps index synchronized") + repair = flag.Bool("repair", false, "repair the database") + prof = flag.Bool("prof", false, "profile program execution") syncChunk = flag.Int("chunk", 100, "block chunk size for processing") syncWorkers = flag.Int("workers", 8, "number of workers to process blocks") @@ -63,12 +46,20 @@ var ( zeroMQBinding = flag.String("zeromq", "", "binding to zeromq, if missing no zeromq connection") ) +var ( + syncChannel = make(chan struct{}) + chain *bitcoin.BitcoinRPC + index *db.RocksDB +) + func main() { flag.Parse() // override setting for glog to log only to stderr, to match the http handler flag.Lookup("logtostderr").Value.Set("true") + defer glog.Flush() + if *prof { defer profile.Start().Stop() } @@ -80,33 +71,34 @@ func main() { return } - rpc := bitcoin.NewBitcoinRPC( + chain = bitcoin.NewBitcoinRPC( *rpcURL, *rpcUser, *rpcPass, time.Duration(*rpcTimeout)*time.Second) if *parse { - rpc.Parser = &bitcoin.BitcoinBlockParser{ + chain.Parser = &bitcoin.BitcoinBlockParser{ Params: bitcoin.GetChainParams()[0], } } - db, err := db.NewRocksDB(*dbPath) + var err error + index, err = db.NewRocksDB(*dbPath) if err != nil { glog.Fatalf("NewRocksDB %v", err) } - defer db.Close() + defer index.Close() if *rollbackHeight >= 0 { - bestHeight, _, err := db.GetBestBlock() + bestHeight, _, err := index.GetBestBlock() if err != nil { glog.Fatalf("rollbackHeight: %v", err) } if uint32(*rollbackHeight) > bestHeight { glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight) } else { - err = db.DisconnectBlocks(uint32(*rollbackHeight), bestHeight) + err = index.DisconnectBlocks(uint32(*rollbackHeight), bestHeight) if err != nil { glog.Fatalf("rollbackHeight: %v", err) } @@ -114,31 +106,37 @@ func main() { return } - if *resync { - if err := resyncIndex(rpc, db); err != nil { + if *synchronize { + if err := resyncIndex(); err != nil { glog.Fatal("resyncIndex ", err) } } var httpServer *server.HttpServer if *httpServerBinding != "" { - httpServer, err = server.New(*httpServerBinding, db) + httpServer, err = server.New(*httpServerBinding, index) if err != nil { - glog.Fatalf("https: %v", err) + glog.Fatal("https: ", err) } go func() { err = httpServer.Run() if err != nil { - glog.Fatalf("https: %v", err) + glog.Fatal("https: ", err) } }() } var mq *bitcoin.MQ if *zeroMQBinding != "" { - mq, err = bitcoin.New(*zeroMQBinding, mqHandler) - if err != nil { - glog.Fatalf("mq: %v", err) + if !*synchronize { + glog.Error("zeromq connection without synchronization does not make sense, ignoring zeromq parameter") + } else { + go syncLoop() + mq, err = bitcoin.New(*zeroMQBinding, mqHandler) + if err != nil { + glog.Fatal("mq: ", err) + } + } } @@ -155,13 +153,11 @@ func main() { if err != nil { glog.Fatalf("GetTransactions %v", err) } - if err = db.GetTransactions(script, height, until, printResult); err != nil { + if err = index.GetTransactions(script, height, until, printResult); err != nil { glog.Fatalf("GetTransactions %v", err) } - } else if !*resync { + } else if !*synchronize { if err = connectBlocksParallel( - rpc, - db, height, until, *syncChunk, @@ -172,14 +168,28 @@ func main() { } } - if httpServer != nil { + if httpServer != nil || mq != nil { waitForSignalAndShutdown(httpServer, mq, 5*time.Second) } + +} + +func syncLoop() { + for range syncChannel { + resyncIndex() + } } func mqHandler(m *bitcoin.MQMessage) { body := hex.EncodeToString(m.Body) glog.Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body) + if m.Topic == "hashblock" { + syncChannel <- struct{}{} + } else if m.Topic == "hashtx" { + + } else { + glog.Errorf("MQ: unknown message %s-%d %s", m.Topic, m.Sequence, body) + } } func waitForSignalAndShutdown(s *server.HttpServer, mq *bitcoin.MQ, timeout time.Duration) { @@ -205,7 +215,8 @@ func waitForSignalAndShutdown(s *server.HttpServer, mq *bitcoin.MQ, timeout time } } - glog.Flush() + close(syncChannel) + } func printResult(txid string) error { @@ -213,7 +224,7 @@ func printResult(txid string) error { return nil } -func resyncIndex(chain Blockchain, index Index) error { +func resyncIndex() error { remote, err := chain.GetBestBlockHash() if err != nil { return err @@ -268,7 +279,7 @@ func resyncIndex(chain Blockchain, index Index) error { if err != nil { return err } - return resyncIndex(chain, index) + return resyncIndex() } } @@ -301,8 +312,6 @@ func resyncIndex(chain Blockchain, index Index) error { if chainBestHeight-startHeight > uint32(*syncChunk) { glog.Infof("resync: parallel sync of blocks %d-%d", startHeight, chainBestHeight) err = connectBlocksParallel( - chain, - index, startHeight, chainBestHeight, *syncChunk, @@ -313,23 +322,21 @@ func resyncIndex(chain Blockchain, index Index) error { } // after parallel load finish the sync using standard way, // new blocks may have been created in the meantime - return resyncIndex(chain, index) + return resyncIndex() } } - return connectBlocks(chain, index, hash) + return connectBlocks(hash) } func connectBlocks( - chain Blockchain, - index Index, hash string, ) error { bch := make(chan blockResult, 8) done := make(chan struct{}) defer close(done) - go getBlockChain(hash, chain, bch, done) + go getBlockChain(hash, bch, done) var lastRes blockResult for res := range bch { @@ -351,8 +358,6 @@ func connectBlocks( } func connectBlocksParallel( - chain Blockchain, - index Index, lower uint32, higher uint32, chunkSize int, @@ -371,7 +376,7 @@ func connectBlocksParallel( if high > higher { high = higher } - err := connectBlockChunk(chain, index, low, high) + err := connectBlockChunk(low, high) if err != nil { if e, ok := err.(*bitcoin.RPCError); ok && (e.Message == "Block height out of range" || e.Message == "Block not found") { break @@ -390,12 +395,10 @@ func connectBlocksParallel( } func connectBlockChunk( - chain Blockchain, - index Index, lower uint32, higher uint32, ) error { - connected, err := isBlockConnected(chain, index, higher) + connected, err := isBlockConnected(higher) if err != nil || connected { // if higher is over the best block, continue with lower block, otherwise return error if e, ok := err.(*bitcoin.RPCError); !ok || e.Message != "Block height out of range" { @@ -432,8 +435,6 @@ func connectBlockChunk( } func isBlockConnected( - chain Blockchain, - index Index, height uint32, ) (bool, error) { local, err := index.GetBlockHash(height) @@ -457,7 +458,6 @@ type blockResult struct { func getBlockChain( hash string, - chain Blockchain, out chan blockResult, done chan struct{}, ) {