From c813f763369b2c4c1e30d389014de3fbb64d5ea6 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Fri, 29 Mar 2019 17:01:20 +0100 Subject: [PATCH] Try to load mempool inputs from db to speed up mempool sync --- bchain/coins/blockchain.go | 4 +-- bchain/coins/btc/bitcoinrpc.go | 8 ++++-- bchain/coins/eth/ethrpc.go | 5 +++- bchain/coins/nuls/nulsrpc.go | 5 ++-- bchain/mempool_bitcoin_type.go | 45 ++++++++++++++++++++-------------- bchain/types.go | 7 ++++-- blockbook.go | 6 ++++- db/rocksdb.go | 19 ++++++++++++++ tests/dbtestdata/fakechain.go | 2 +- tests/integration.go | 2 +- 10 files changed, 72 insertions(+), 31 deletions(-) diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index 6d1cdcb6..624b5d75 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -152,8 +152,8 @@ func (c *blockChainWithMetrics) CreateMempool() (bchain.Mempool, error) { return c.b.CreateMempool() } -func (c *blockChainWithMetrics) InitializeMempool() error { - return c.b.InitializeMempool() +func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error { + return c.b.InitializeMempool(addrDescForOutpoint) } func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error { diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index f26830d2..a6c4cdf0 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -138,8 +138,12 @@ func (b *BitcoinRPC) CreateMempool() (bchain.Mempool, error) { return b.Mempool, nil } -// InitializeMempool creates ZeroMQ subscription -func (b *BitcoinRPC) InitializeMempool() error { +// InitializeMempool creates ZeroMQ subscription and sets AddrDescForOutpointFunc to the Mempool +func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error { + if b.Mempool == nil { + return errors.New("Mempool not created") + } + b.Mempool.AddrDescForOutpoint = addrDescForOutpoint if b.mq == nil { mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler) if err != nil { diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index bac05d97..18643370 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -172,7 +172,10 @@ func (b *EthereumRPC) CreateMempool() (bchain.Mempool, error) { } // InitializeMempool creates subscriptions to newHeads and newPendingTransactions -func (b *EthereumRPC) InitializeMempool() error { +func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error { + if b.Mempool == nil { + return errors.New("Mempool not created") + } if b.isETC { glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads") } else { diff --git a/bchain/coins/nuls/nulsrpc.go b/bchain/coins/nuls/nulsrpc.go index 3bd7b24e..49e4178f 100644 --- a/bchain/coins/nuls/nulsrpc.go +++ b/bchain/coins/nuls/nulsrpc.go @@ -5,9 +5,8 @@ import ( "blockbook/bchain/coins/btc" "bytes" "encoding/base64" + "encoding/hex" "encoding/json" - "github.com/gobuffalo/packr/v2/file/resolver/encoding/hex" - "github.com/juju/errors" "io" "io/ioutil" "math/big" @@ -17,6 +16,8 @@ import ( "strconv" "time" + "github.com/juju/errors" + "github.com/golang/glog" ) diff --git a/bchain/mempool_bitcoin_type.go b/bchain/mempool_bitcoin_type.go index e87cc937..ae13459e 100644 --- a/bchain/mempool_bitcoin_type.go +++ b/bchain/mempool_bitcoin_type.go @@ -19,13 +19,14 @@ type txidio struct { // MempoolBitcoinType is mempool handle. type MempoolBitcoinType struct { - chain BlockChain - mux sync.Mutex - txToInputOutput map[string][]addrIndex - addrDescToTx map[string][]Outpoint - chanTxid chan string - chanAddrIndex chan txidio - onNewTxAddr OnNewTxAddrFunc + chain BlockChain + mux sync.Mutex + txToInputOutput map[string][]addrIndex + addrDescToTx map[string][]Outpoint + chanTxid chan string + chanAddrIndex chan txidio + onNewTxAddr OnNewTxAddrFunc + AddrDescForOutpoint AddrDescForOutpointFunc } // NewMempoolBitcoinType creates new mempool handler. @@ -86,19 +87,25 @@ func (m *MempoolBitcoinType) updateMappings(newTxToInputOutput map[string][]addr } func (m *MempoolBitcoinType) getInputAddress(input Outpoint) *addrIndex { - itx, err := m.chain.GetTransactionForMempool(input.Txid) - if err != nil { - glog.Error("cannot get transaction ", input.Txid, ": ", err) - return nil + var addrDesc AddressDescriptor + if m.AddrDescForOutpoint != nil { + addrDesc = m.AddrDescForOutpoint(input) } - if int(input.Vout) >= len(itx.Vout) { - glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout) - return nil - } - addrDesc, err := m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout]) - if err != nil { - glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err) - return nil + if addrDesc == nil { + itx, err := m.chain.GetTransactionForMempool(input.Txid) + if err != nil { + glog.Error("cannot get transaction ", input.Txid, ": ", err) + return nil + } + if int(input.Vout) >= len(itx.Vout) { + glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout) + return nil + } + addrDesc, err = m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout]) + if err != nil { + glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err) + return nil + } } return &addrIndex{string(addrDesc), ^input.Vout} diff --git a/bchain/types.go b/bchain/types.go index 044122e6..0415f4b8 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -191,15 +191,18 @@ type OnNewBlockFunc func(hash string, height uint32) // OnNewTxAddrFunc is used to send notification about a new transaction/address type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor) +// AddrDescForOutpointFunc defines function that returns address descriptorfor given outpoint or nil if outpoint not found +type AddrDescForOutpointFunc func(outpoint Outpoint) AddressDescriptor + // BlockChain defines common interface to block chain daemon type BlockChain interface { // life-cycle methods - // intialize the block chain connector + // initialize the block chain connector Initialize() error // create mempool but do not initialize it CreateMempool() (Mempool, error) // initialize mempool, create ZeroMQ (or other) subscription - InitializeMempool() error + InitializeMempool(AddrDescForOutpointFunc) error // shutdown mempool, ZeroMQ and block chain connections Shutdown(ctx context.Context) error // chain info diff --git a/blockbook.go b/blockbook.go index f5b62377..e694bcc2 100644 --- a/blockbook.go +++ b/blockbook.go @@ -227,7 +227,11 @@ func main() { return } // initialize mempool after the initial sync is complete - err = chain.InitializeMempool() + var addrDescForOutpoint bchain.AddrDescForOutpointFunc + if chain.GetChainParser().GetChainType() == bchain.ChainBitcoinType { + addrDescForOutpoint = index.AddrDescForOutpoint + } + err = chain.InitializeMempool(addrDescForOutpoint) if err != nil { glog.Error("initializeMempool ", err) return diff --git a/db/rocksdb.go b/db/rocksdb.go index 0ec55b03..735402e3 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -757,6 +757,25 @@ func (d *RocksDB) GetTxAddresses(txid string) (*TxAddresses, error) { return d.getTxAddresses(btxID) } +// AddrDescForOutpoint defines function that returns address descriptorfor given outpoint or nil if outpoint not found +func (d *RocksDB) AddrDescForOutpoint(outpoint bchain.Outpoint) bchain.AddressDescriptor { + ta, err := d.GetTxAddresses(outpoint.Txid) + if err != nil || ta == nil { + return nil + } + if outpoint.Vout < 0 { + vin := ^outpoint.Vout + if len(ta.Inputs) <= int(vin) { + return nil + } + return ta.Inputs[vin].AddrDesc + } + if len(ta.Outputs) <= int(outpoint.Vout) { + return nil + } + return ta.Outputs[outpoint.Vout].AddrDesc +} + func packTxAddresses(ta *TxAddresses, buf []byte, varBuf []byte) []byte { buf = buf[:0] l := packVaruint(uint(ta.Height), varBuf) diff --git a/tests/dbtestdata/fakechain.go b/tests/dbtestdata/fakechain.go index 2f05bd4b..270403bb 100644 --- a/tests/dbtestdata/fakechain.go +++ b/tests/dbtestdata/fakechain.go @@ -25,7 +25,7 @@ func (c *fakeBlockChain) Initialize() error { return nil } -func (c *fakeBlockChain) InitializeMempool() error { +func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error { return nil } diff --git a/tests/integration.go b/tests/integration.go index 789f71fc..d1e70130 100644 --- a/tests/integration.go +++ b/tests/integration.go @@ -174,7 +174,7 @@ func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, bc return nil, nil, fmt.Errorf("Mempool creation failed: %s", err) } - err = cli.InitializeMempool() + err = cli.InitializeMempool(nil) if err != nil { return nil, nil, fmt.Errorf("Mempool initialization failed: %s", err) }