Sync index on message from zeroMQ

pull/1/head
Martin Boehm 2018-01-31 15:03:06 +01:00
parent e22e1a946d
commit 71d669c0b9
2 changed files with 62 additions and 60 deletions

View File

@ -35,8 +35,10 @@ func New(binding string, callback func(*MQMessage)) (*MQ, error) {
}
socket.SetSubscribe("hashblock")
socket.SetSubscribe("hashtx")
socket.SetSubscribe("rawblock")
socket.SetSubscribe("rawtx")
// for now do not use raw subscriptions - we would have to handle skipped/lost notifications from zeromq
// on each notification we do sync or syncmempool respectively
// socket.SetSubscribe("rawblock")
// socket.SetSubscribe("rawtx")
socket.Connect(binding)
glog.Info("MQ listening to ", binding)
mq := &MQ{context, socket, true, make(chan bool)}

View File

@ -18,23 +18,6 @@ import (
"github.com/pkg/profile"
)
type Blockchain interface {
GetBestBlockHash() (string, error)
GetBestBlockHeight() (uint32, error)
GetBlockHash(height uint32) (string, error)
GetBlockHeader(hash string) (*bitcoin.BlockHeader, error)
GetBlock(hash string) (*bitcoin.Block, error)
}
type Index interface {
GetBestBlock() (uint32, string, error)
GetBlockHash(height uint32) (string, error)
GetTransactions(outputScript []byte, lower uint32, higher uint32, fn func(txid string) error) error
ConnectBlock(block *bitcoin.Block) error
DisconnectBlock(block *bitcoin.Block) error
DisconnectBlocks(lower uint32, higher uint32) error
}
var (
rpcURL = flag.String("rpcurl", "http://localhost:8332", "url of bitcoin RPC service")
rpcUser = flag.String("rpcuser", "rpc", "rpc username")
@ -49,9 +32,9 @@ var (
queryAddress = flag.String("address", "", "query contents of this address")
resync = flag.Bool("resync", false, "resync until tip")
repair = flag.Bool("repair", false, "repair the database")
prof = flag.Bool("prof", false, "profile program execution")
synchronize = flag.Bool("sync", false, "synchronizes until tip, if together with zeromq, keeps index synchronized")
repair = flag.Bool("repair", false, "repair the database")
prof = flag.Bool("prof", false, "profile program execution")
syncChunk = flag.Int("chunk", 100, "block chunk size for processing")
syncWorkers = flag.Int("workers", 8, "number of workers to process blocks")
@ -63,12 +46,20 @@ var (
zeroMQBinding = flag.String("zeromq", "", "binding to zeromq, if missing no zeromq connection")
)
var (
syncChannel = make(chan struct{})
chain *bitcoin.BitcoinRPC
index *db.RocksDB
)
func main() {
flag.Parse()
// override setting for glog to log only to stderr, to match the http handler
flag.Lookup("logtostderr").Value.Set("true")
defer glog.Flush()
if *prof {
defer profile.Start().Stop()
}
@ -80,33 +71,34 @@ func main() {
return
}
rpc := bitcoin.NewBitcoinRPC(
chain = bitcoin.NewBitcoinRPC(
*rpcURL,
*rpcUser,
*rpcPass,
time.Duration(*rpcTimeout)*time.Second)
if *parse {
rpc.Parser = &bitcoin.BitcoinBlockParser{
chain.Parser = &bitcoin.BitcoinBlockParser{
Params: bitcoin.GetChainParams()[0],
}
}
db, err := db.NewRocksDB(*dbPath)
var err error
index, err = db.NewRocksDB(*dbPath)
if err != nil {
glog.Fatalf("NewRocksDB %v", err)
}
defer db.Close()
defer index.Close()
if *rollbackHeight >= 0 {
bestHeight, _, err := db.GetBestBlock()
bestHeight, _, err := index.GetBestBlock()
if err != nil {
glog.Fatalf("rollbackHeight: %v", err)
}
if uint32(*rollbackHeight) > bestHeight {
glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight)
} else {
err = db.DisconnectBlocks(uint32(*rollbackHeight), bestHeight)
err = index.DisconnectBlocks(uint32(*rollbackHeight), bestHeight)
if err != nil {
glog.Fatalf("rollbackHeight: %v", err)
}
@ -114,31 +106,37 @@ func main() {
return
}
if *resync {
if err := resyncIndex(rpc, db); err != nil {
if *synchronize {
if err := resyncIndex(); err != nil {
glog.Fatal("resyncIndex ", err)
}
}
var httpServer *server.HttpServer
if *httpServerBinding != "" {
httpServer, err = server.New(*httpServerBinding, db)
httpServer, err = server.New(*httpServerBinding, index)
if err != nil {
glog.Fatalf("https: %v", err)
glog.Fatal("https: ", err)
}
go func() {
err = httpServer.Run()
if err != nil {
glog.Fatalf("https: %v", err)
glog.Fatal("https: ", err)
}
}()
}
var mq *bitcoin.MQ
if *zeroMQBinding != "" {
mq, err = bitcoin.New(*zeroMQBinding, mqHandler)
if err != nil {
glog.Fatalf("mq: %v", err)
if !*synchronize {
glog.Error("zeromq connection without synchronization does not make sense, ignoring zeromq parameter")
} else {
go syncLoop()
mq, err = bitcoin.New(*zeroMQBinding, mqHandler)
if err != nil {
glog.Fatal("mq: ", err)
}
}
}
@ -155,13 +153,11 @@ func main() {
if err != nil {
glog.Fatalf("GetTransactions %v", err)
}
if err = db.GetTransactions(script, height, until, printResult); err != nil {
if err = index.GetTransactions(script, height, until, printResult); err != nil {
glog.Fatalf("GetTransactions %v", err)
}
} else if !*resync {
} else if !*synchronize {
if err = connectBlocksParallel(
rpc,
db,
height,
until,
*syncChunk,
@ -172,14 +168,28 @@ func main() {
}
}
if httpServer != nil {
if httpServer != nil || mq != nil {
waitForSignalAndShutdown(httpServer, mq, 5*time.Second)
}
}
func syncLoop() {
for range syncChannel {
resyncIndex()
}
}
func mqHandler(m *bitcoin.MQMessage) {
body := hex.EncodeToString(m.Body)
glog.Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body)
if m.Topic == "hashblock" {
syncChannel <- struct{}{}
} else if m.Topic == "hashtx" {
} else {
glog.Errorf("MQ: unknown message %s-%d %s", m.Topic, m.Sequence, body)
}
}
func waitForSignalAndShutdown(s *server.HttpServer, mq *bitcoin.MQ, timeout time.Duration) {
@ -205,7 +215,8 @@ func waitForSignalAndShutdown(s *server.HttpServer, mq *bitcoin.MQ, timeout time
}
}
glog.Flush()
close(syncChannel)
}
func printResult(txid string) error {
@ -213,7 +224,7 @@ func printResult(txid string) error {
return nil
}
func resyncIndex(chain Blockchain, index Index) error {
func resyncIndex() error {
remote, err := chain.GetBestBlockHash()
if err != nil {
return err
@ -268,7 +279,7 @@ func resyncIndex(chain Blockchain, index Index) error {
if err != nil {
return err
}
return resyncIndex(chain, index)
return resyncIndex()
}
}
@ -301,8 +312,6 @@ func resyncIndex(chain Blockchain, index Index) error {
if chainBestHeight-startHeight > uint32(*syncChunk) {
glog.Infof("resync: parallel sync of blocks %d-%d", startHeight, chainBestHeight)
err = connectBlocksParallel(
chain,
index,
startHeight,
chainBestHeight,
*syncChunk,
@ -313,23 +322,21 @@ func resyncIndex(chain Blockchain, index Index) error {
}
// after parallel load finish the sync using standard way,
// new blocks may have been created in the meantime
return resyncIndex(chain, index)
return resyncIndex()
}
}
return connectBlocks(chain, index, hash)
return connectBlocks(hash)
}
func connectBlocks(
chain Blockchain,
index Index,
hash string,
) error {
bch := make(chan blockResult, 8)
done := make(chan struct{})
defer close(done)
go getBlockChain(hash, chain, bch, done)
go getBlockChain(hash, bch, done)
var lastRes blockResult
for res := range bch {
@ -351,8 +358,6 @@ func connectBlocks(
}
func connectBlocksParallel(
chain Blockchain,
index Index,
lower uint32,
higher uint32,
chunkSize int,
@ -371,7 +376,7 @@ func connectBlocksParallel(
if high > higher {
high = higher
}
err := connectBlockChunk(chain, index, low, high)
err := connectBlockChunk(low, high)
if err != nil {
if e, ok := err.(*bitcoin.RPCError); ok && (e.Message == "Block height out of range" || e.Message == "Block not found") {
break
@ -390,12 +395,10 @@ func connectBlocksParallel(
}
func connectBlockChunk(
chain Blockchain,
index Index,
lower uint32,
higher uint32,
) error {
connected, err := isBlockConnected(chain, index, higher)
connected, err := isBlockConnected(higher)
if err != nil || connected {
// if higher is over the best block, continue with lower block, otherwise return error
if e, ok := err.(*bitcoin.RPCError); !ok || e.Message != "Block height out of range" {
@ -432,8 +435,6 @@ func connectBlockChunk(
}
func isBlockConnected(
chain Blockchain,
index Index,
height uint32,
) (bool, error) {
local, err := index.GetBlockHash(height)
@ -457,7 +458,6 @@ type blockResult struct {
func getBlockChain(
hash string,
chain Blockchain,
out chan blockResult,
done chan struct{},
) {