Try to load mempool inputs from db to speed up mempool sync

utxostat
Martin Boehm 2019-03-29 17:01:20 +01:00
parent d2928b3516
commit c813f76336
10 changed files with 72 additions and 31 deletions

View File

@ -152,8 +152,8 @@ func (c *blockChainWithMetrics) CreateMempool() (bchain.Mempool, error) {
return c.b.CreateMempool()
}
func (c *blockChainWithMetrics) InitializeMempool() error {
return c.b.InitializeMempool()
func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error {
return c.b.InitializeMempool(addrDescForOutpoint)
}
func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error {

View File

@ -138,8 +138,12 @@ func (b *BitcoinRPC) CreateMempool() (bchain.Mempool, error) {
return b.Mempool, nil
}
// InitializeMempool creates ZeroMQ subscription
func (b *BitcoinRPC) InitializeMempool() error {
// InitializeMempool creates ZeroMQ subscription and sets AddrDescForOutpointFunc to the Mempool
func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error {
if b.Mempool == nil {
return errors.New("Mempool not created")
}
b.Mempool.AddrDescForOutpoint = addrDescForOutpoint
if b.mq == nil {
mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler)
if err != nil {

View File

@ -172,7 +172,10 @@ func (b *EthereumRPC) CreateMempool() (bchain.Mempool, error) {
}
// InitializeMempool creates subscriptions to newHeads and newPendingTransactions
func (b *EthereumRPC) InitializeMempool() error {
func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error {
if b.Mempool == nil {
return errors.New("Mempool not created")
}
if b.isETC {
glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads")
} else {

View File

@ -5,9 +5,8 @@ import (
"blockbook/bchain/coins/btc"
"bytes"
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/gobuffalo/packr/v2/file/resolver/encoding/hex"
"github.com/juju/errors"
"io"
"io/ioutil"
"math/big"
@ -17,6 +16,8 @@ import (
"strconv"
"time"
"github.com/juju/errors"
"github.com/golang/glog"
)

View File

@ -19,13 +19,14 @@ type txidio struct {
// MempoolBitcoinType is mempool handle.
type MempoolBitcoinType struct {
chain BlockChain
mux sync.Mutex
txToInputOutput map[string][]addrIndex
addrDescToTx map[string][]Outpoint
chanTxid chan string
chanAddrIndex chan txidio
onNewTxAddr OnNewTxAddrFunc
chain BlockChain
mux sync.Mutex
txToInputOutput map[string][]addrIndex
addrDescToTx map[string][]Outpoint
chanTxid chan string
chanAddrIndex chan txidio
onNewTxAddr OnNewTxAddrFunc
AddrDescForOutpoint AddrDescForOutpointFunc
}
// NewMempoolBitcoinType creates new mempool handler.
@ -86,19 +87,25 @@ func (m *MempoolBitcoinType) updateMappings(newTxToInputOutput map[string][]addr
}
func (m *MempoolBitcoinType) getInputAddress(input Outpoint) *addrIndex {
itx, err := m.chain.GetTransactionForMempool(input.Txid)
if err != nil {
glog.Error("cannot get transaction ", input.Txid, ": ", err)
return nil
var addrDesc AddressDescriptor
if m.AddrDescForOutpoint != nil {
addrDesc = m.AddrDescForOutpoint(input)
}
if int(input.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout)
return nil
}
addrDesc, err := m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout])
if err != nil {
glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err)
return nil
if addrDesc == nil {
itx, err := m.chain.GetTransactionForMempool(input.Txid)
if err != nil {
glog.Error("cannot get transaction ", input.Txid, ": ", err)
return nil
}
if int(input.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout)
return nil
}
addrDesc, err = m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout])
if err != nil {
glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err)
return nil
}
}
return &addrIndex{string(addrDesc), ^input.Vout}

View File

@ -191,15 +191,18 @@ type OnNewBlockFunc func(hash string, height uint32)
// OnNewTxAddrFunc is used to send notification about a new transaction/address
type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor)
// AddrDescForOutpointFunc defines function that returns address descriptorfor given outpoint or nil if outpoint not found
type AddrDescForOutpointFunc func(outpoint Outpoint) AddressDescriptor
// BlockChain defines common interface to block chain daemon
type BlockChain interface {
// life-cycle methods
// intialize the block chain connector
// initialize the block chain connector
Initialize() error
// create mempool but do not initialize it
CreateMempool() (Mempool, error)
// initialize mempool, create ZeroMQ (or other) subscription
InitializeMempool() error
InitializeMempool(AddrDescForOutpointFunc) error
// shutdown mempool, ZeroMQ and block chain connections
Shutdown(ctx context.Context) error
// chain info

View File

@ -227,7 +227,11 @@ func main() {
return
}
// initialize mempool after the initial sync is complete
err = chain.InitializeMempool()
var addrDescForOutpoint bchain.AddrDescForOutpointFunc
if chain.GetChainParser().GetChainType() == bchain.ChainBitcoinType {
addrDescForOutpoint = index.AddrDescForOutpoint
}
err = chain.InitializeMempool(addrDescForOutpoint)
if err != nil {
glog.Error("initializeMempool ", err)
return

View File

@ -757,6 +757,25 @@ func (d *RocksDB) GetTxAddresses(txid string) (*TxAddresses, error) {
return d.getTxAddresses(btxID)
}
// AddrDescForOutpoint defines function that returns address descriptorfor given outpoint or nil if outpoint not found
func (d *RocksDB) AddrDescForOutpoint(outpoint bchain.Outpoint) bchain.AddressDescriptor {
ta, err := d.GetTxAddresses(outpoint.Txid)
if err != nil || ta == nil {
return nil
}
if outpoint.Vout < 0 {
vin := ^outpoint.Vout
if len(ta.Inputs) <= int(vin) {
return nil
}
return ta.Inputs[vin].AddrDesc
}
if len(ta.Outputs) <= int(outpoint.Vout) {
return nil
}
return ta.Outputs[outpoint.Vout].AddrDesc
}
func packTxAddresses(ta *TxAddresses, buf []byte, varBuf []byte) []byte {
buf = buf[:0]
l := packVaruint(uint(ta.Height), varBuf)

View File

@ -25,7 +25,7 @@ func (c *fakeBlockChain) Initialize() error {
return nil
}
func (c *fakeBlockChain) InitializeMempool() error {
func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error {
return nil
}

View File

@ -174,7 +174,7 @@ func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, bc
return nil, nil, fmt.Errorf("Mempool creation failed: %s", err)
}
err = cli.InitializeMempool()
err = cli.InitializeMempool(nil)
if err != nil {
return nil, nil, fmt.Errorf("Mempool initialization failed: %s", err)
}