Synchronize mempool

pull/1/head
Martin Boehm 2018-01-31 17:51:48 +01:00
parent dec566e817
commit 7185060f62
3 changed files with 118 additions and 15 deletions

73
bchain/mempool.go 100644
View File

@ -0,0 +1,73 @@
package bchain
import (
"encoding/hex"
"sync"
"github.com/golang/glog"
)
// Mempool is mempool handle.
type Mempool struct {
chain *BitcoinRPC
mux sync.Mutex
scriptToTx map[string][]string
txToScript map[string][]string
}
// NewMempool creates new mempool handler.
func NewMempool(chain *BitcoinRPC) *Mempool {
return &Mempool{chain: chain}
}
// GetTransactions returns slice of mempool transactions for given output script.
func (m *Mempool) GetTransactions(outputScript []byte) ([]string, error) {
m.mux.Lock()
defer m.mux.Unlock()
scriptHex := hex.EncodeToString(outputScript)
return m.scriptToTx[scriptHex], nil
}
func (m *Mempool) updateMaps(newScriptToTx map[string][]string, newTxToScript map[string][]string) {
m.mux.Lock()
defer m.mux.Unlock()
m.scriptToTx = newScriptToTx
m.txToScript = newTxToScript
}
// Resync gets mempool transactions and maps output scripts to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
func (m *Mempool) Resync() error {
glog.Info("Mempool: resync")
txs, err := m.chain.GetMempool()
if err != nil {
return err
}
newScriptToTx := make(map[string][]string)
newTxToScript := make(map[string][]string)
for _, txid := range txs {
scripts := m.txToScript[txid]
if scripts == nil {
tx, err := m.chain.GetTransaction(txid)
if err != nil {
glog.Error("cannot get transaction ", txid, ": ", err)
continue
}
scripts = make([]string, 0, len(tx.Vout))
for _, output := range tx.Vout {
outputScript := output.ScriptPubKey.Hex
if outputScript != "" {
scripts = append(scripts, outputScript)
}
}
}
newTxToScript[txid] = scripts
for _, script := range scripts {
newScriptToTx[script] = append(newScriptToTx[script], txid)
}
}
m.updateMaps(newScriptToTx, newTxToScript)
glog.Info("Mempool: resync finished, ", len(m.txToScript), " transactions in mempool")
return nil
}

View File

@ -22,7 +22,7 @@ type MQMessage struct {
Body []byte
}
// New creates new Bitcoind ZeroMQ listener
// NewMQ creates new Bitcoind ZeroMQ listener
// callback function receives messages
func NewMQ(binding string, callback func(*MQMessage)) (*MQ, error) {
context, err := zmq.NewContext()

View File

@ -47,9 +47,13 @@ var (
)
var (
syncChannel = make(chan struct{})
chain *bchain.BitcoinRPC
index *db.RocksDB
chanSyncIndex = make(chan struct{})
chanSyncMempool = make(chan struct{})
chanSyncIndexDone = make(chan struct{})
chanSyncMempoolDone = make(chan struct{})
chain *bchain.BitcoinRPC
mempool *bchain.Mempool
index *db.RocksDB
)
func main() {
@ -83,6 +87,8 @@ func main() {
}
}
mempool = bchain.NewMempool(chain)
var err error
index, err = db.NewRocksDB(*dbPath)
if err != nil {
@ -106,6 +112,9 @@ func main() {
return
}
go syncIndexLoop()
go syncMempoolLoop()
if *synchronize {
if err := resyncIndex(); err != nil {
glog.Fatal("resyncIndex ", err)
@ -135,12 +144,10 @@ func main() {
if !*synchronize {
glog.Error("zeromq connection without synchronization does not make sense, ignoring zeromq parameter")
} else {
go syncLoop()
mq, err = bchain.NewMQ(*zeroMQBinding, mqHandler)
if err != nil {
glog.Fatal("mq: ", err)
}
}
}
@ -176,21 +183,41 @@ func main() {
waitForSignalAndShutdown(httpServer, mq, 5*time.Second)
}
close(chanSyncIndex)
close(chanSyncMempool)
<-chanSyncIndexDone
<-chanSyncMempoolDone
}
func syncLoop() {
for range syncChannel {
resyncIndex()
func syncIndexLoop() {
defer close(chanSyncIndexDone)
glog.Info("syncIndexLoop starting")
for range chanSyncIndex {
if err := resyncIndex(); err != nil {
glog.Error(err)
}
}
glog.Info("syncIndexLoop stopped")
}
func syncMempoolLoop() {
defer close(chanSyncMempoolDone)
glog.Info("syncMempoolLoop starting")
for range chanSyncMempool {
if err := mempool.Resync(); err != nil {
glog.Error(err)
}
}
glog.Info("syncMempoolLoop stopped")
}
func mqHandler(m *bchain.MQMessage) {
body := hex.EncodeToString(m.Body)
glog.Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body)
glog.V(2).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body)
if m.Topic == "hashblock" {
syncChannel <- struct{}{}
chanSyncIndex <- struct{}{}
} else if m.Topic == "hashtx" {
chanSyncMempool <- struct{}{}
} else {
glog.Errorf("MQ: unknown message %s-%d %s", m.Topic, m.Sequence, body)
}
@ -218,8 +245,6 @@ func waitForSignalAndShutdown(s *server.HttpServer, mq *bchain.MQ, timeout time.
glog.Error("HttpServer.Shutdown error: ", err)
}
}
close(syncChannel)
}
func printResult(txid string) error {
@ -329,7 +354,12 @@ func resyncIndex() error {
}
}
return connectBlocks(hash)
err = connectBlocks(hash)
if err != nil {
return err
}
chanSyncMempool <- struct{}{}
return nil
}
func connectBlocks(