From ce3c7c5e6678107e7d9c09264754a25e7286e592 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Mon, 25 Mar 2019 16:43:57 +0100 Subject: [PATCH] Extract mempool interface from blockchain --- api/worker.go | 6 +- api/xpub.go | 2 +- bchain/basechain.go | 5 ++ bchain/coins/bch/bcashrpc.go | 3 +- bchain/coins/bellcoin/bellcoinrpc.go | 3 +- bchain/coins/blockchain.go | 85 ++++++++++++++-------- bchain/coins/btc/bitcoinrpc.go | 75 ++++++++----------- bchain/coins/btg/bgoldrpc.go | 3 +- bchain/coins/dash/dashrpc.go | 3 +- bchain/coins/digibyte/digibyterpc.go | 3 +- bchain/coins/dogecoin/dogecoinrpc.go | 3 +- bchain/coins/eth/ethrpc.go | 48 +++++------- bchain/coins/flo/florpc.go | 4 +- bchain/coins/fujicoin/fujicoinrpc.go | 3 +- bchain/coins/gamecredits/gamecreditsrpc.go | 3 +- bchain/coins/grs/grsrpc.go | 3 +- bchain/coins/koto/kotorpc.go | 3 +- bchain/coins/liquid/liquidrpc.go | 3 +- bchain/coins/litecoin/litecoinrpc.go | 3 +- bchain/coins/monacoin/monacoinrpc.go | 3 +- bchain/coins/myriad/myriadrpc.go | 3 +- bchain/coins/namecoin/namecoinrpc.go | 3 +- bchain/coins/pivx/pivxrpc.go | 3 +- bchain/coins/qtum/qtumrpc.go | 4 +- bchain/coins/vertcoin/vertcoinrpc.go | 3 +- bchain/coins/xzc/zcoinrpc.go | 3 +- bchain/coins/zec/zcashrpc.go | 3 +- bchain/mempool_bitcoin_type.go | 2 +- bchain/mempool_ethereum_type.go | 2 +- bchain/types.go | 19 +++-- blockbook.go | 80 +++++++++++--------- server/internal.go | 6 +- server/public.go | 10 ++- server/public_test.go | 7 +- server/socketio.go | 8 +- server/websocket.go | 6 +- tests/dbtestdata/fakechain.go | 34 ++++----- tests/integration.go | 46 +++++++----- tests/rpc/rpc.go | 15 ++-- tests/sync/sync.go | 2 +- 40 files changed, 298 insertions(+), 225 deletions(-) diff --git a/api/worker.go b/api/worker.go index c8632758..806bcf2c 100644 --- a/api/worker.go +++ b/api/worker.go @@ -23,17 +23,19 @@ type Worker struct { chain bchain.BlockChain chainParser bchain.BlockChainParser chainType bchain.ChainType + mempool bchain.Mempool is *common.InternalState } // NewWorker creates new api worker -func NewWorker(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState) (*Worker, error) { +func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*Worker, error) { w := &Worker{ db: db, txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), chainType: chain.GetChainParser().GetChainType(), + mempool: mempool, is: is, } return w, nil @@ -348,7 +350,7 @@ func (w *Worker) getAddressTxids(addrDesc bchain.AddressDescriptor, mempool bool } if mempool { uniqueTxs := make(map[string]struct{}) - o, err := w.chain.GetMempoolTransactionsForAddrDesc(addrDesc) + o, err := w.mempool.GetAddrDescTransactions(addrDesc) if err != nil { return nil, err } diff --git a/api/xpub.go b/api/xpub.go index 79bb2fe5..7508f24a 100644 --- a/api/xpub.go +++ b/api/xpub.go @@ -92,7 +92,7 @@ func (w *Worker) xpubGetAddressTxids(addrDesc bchain.AddressDescriptor, mempool } if mempool { uniqueTxs := make(map[string]int) - o, err := w.chain.GetMempoolTransactionsForAddrDesc(addrDesc) + o, err := w.mempool.GetAddrDescTransactions(addrDesc) if err != nil { return nil, false, err } diff --git a/bchain/basechain.go b/bchain/basechain.go index 1de31a48..f1a58e35 100644 --- a/bchain/basechain.go +++ b/bchain/basechain.go @@ -29,6 +29,11 @@ func (b *BaseChain) GetNetworkName() string { return b.Network } +// GetMempoolEntry is not supported by default +func (b *BaseChain) GetMempoolEntry(txid string) (*MempoolEntry, error) { + return nil, errors.New("GetMempoolEntry: not supported") +} + // EthereumTypeGetBalance is not supported func (b *BaseChain) EthereumTypeGetBalance(addrDesc AddressDescriptor) (*big.Int, error) { return nil, errors.New("Not supported") diff --git a/bchain/coins/bch/bcashrpc.go b/bchain/coins/bch/bcashrpc.go index 9a3c7d4b..36a955c1 100644 --- a/bchain/coins/bch/bcashrpc.go +++ b/bchain/coins/bch/bcashrpc.go @@ -33,10 +33,11 @@ func NewBCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp // Initialize initializes BCashRPC instance. func (b *BCashRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/bellcoin/bellcoinrpc.go b/bchain/coins/bellcoin/bellcoinrpc.go index bd359fd2..2317e063 100644 --- a/bchain/coins/bellcoin/bellcoinrpc.go +++ b/bchain/coins/bellcoin/bellcoinrpc.go @@ -31,10 +31,11 @@ func NewBellcoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes BellcoinRPC instance. func (b *BellcoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index 4c251f59..34871e75 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -99,30 +99,34 @@ func GetCoinNameFromConfig(configfile string) (string, string, string, error) { return cn.CoinName, cn.CoinShortcut, cn.CoinLabel, nil } -// NewBlockChain creates bchain.BlockChain of type defined by parameter coin -func NewBlockChain(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics) (bchain.BlockChain, error) { +// NewBlockChain creates bchain.BlockChain and bchain.Mempool for the coin passed by the parameter coin +func NewBlockChain(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics) (bchain.BlockChain, bchain.Mempool, error) { data, err := ioutil.ReadFile(configfile) if err != nil { - return nil, errors.Annotatef(err, "Error reading file %v", configfile) + return nil, nil, errors.Annotatef(err, "Error reading file %v", configfile) } var config json.RawMessage err = json.Unmarshal(data, &config) if err != nil { - return nil, errors.Annotatef(err, "Error parsing file %v", configfile) + return nil, nil, errors.Annotatef(err, "Error parsing file %v", configfile) } bcf, ok := BlockChainFactories[coin] if !ok { - return nil, errors.New(fmt.Sprint("Unsupported coin '", coin, "'. Must be one of ", reflect.ValueOf(BlockChainFactories).MapKeys())) + return nil, nil, errors.New(fmt.Sprint("Unsupported coin '", coin, "'. Must be one of ", reflect.ValueOf(BlockChainFactories).MapKeys())) } bc, err := bcf(config, pushHandler) if err != nil { - return nil, err + return nil, nil, err } err = bc.Initialize() if err != nil { - return nil, err + return nil, nil, err } - return &blockChainWithMetrics{b: bc, m: metrics}, nil + mempool, err := bc.CreateMempool() + if err != nil { + return nil, nil, err + } + return &blockChainWithMetrics{b: bc, m: metrics}, &mempoolWithMetrics{mempool: mempool, m: metrics}, nil } type blockChainWithMetrics struct { @@ -142,6 +146,14 @@ func (c *blockChainWithMetrics) Initialize() error { return c.b.Initialize() } +func (c *blockChainWithMetrics) CreateMempool() (bchain.Mempool, error) { + return c.b.CreateMempool() +} + +func (c *blockChainWithMetrics) InitializeMempool() error { + return c.b.InitializeMempool() +} + func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error { return c.b.Shutdown(ctx) } @@ -197,9 +209,9 @@ func (c *blockChainWithMetrics) GetBlockInfo(hash string) (v *bchain.BlockInfo, return c.b.GetBlockInfo(hash) } -func (c *blockChainWithMetrics) GetMempool() (v []string, err error) { - defer func(s time.Time) { c.observeRPCLatency("GetMempool", s, err) }(time.Now()) - return c.b.GetMempool() +func (c *blockChainWithMetrics) GetMempoolTransactions() (v []string, err error) { + defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactions", s, err) }(time.Now()) + return c.b.GetMempoolTransactions() } func (c *blockChainWithMetrics) GetTransaction(txid string) (v *bchain.Tx, err error) { @@ -232,25 +244,6 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err return c.b.SendRawTransaction(tx) } -func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) { - defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) - count, err = c.b.ResyncMempool(onNewTxAddr) - if err == nil { - c.m.MempoolSize.Set(float64(count)) - } - return count, err -} - -func (c *blockChainWithMetrics) GetMempoolTransactions(address string) (v []bchain.Outpoint, err error) { - defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactions", s, err) }(time.Now()) - return c.b.GetMempoolTransactions(address) -} - -func (c *blockChainWithMetrics) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) (v []bchain.Outpoint, err error) { - defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactionsForAddrDesc", s, err) }(time.Now()) - return c.b.GetMempoolTransactionsForAddrDesc(addrDesc) -} - func (c *blockChainWithMetrics) GetMempoolEntry(txid string) (v *bchain.MempoolEntry, err error) { defer func(s time.Time) { c.observeRPCLatency("GetMempoolEntry", s, err) }(time.Now()) return c.b.GetMempoolEntry(txid) @@ -284,3 +277,35 @@ func (c *blockChainWithMetrics) EthereumTypeGetErc20ContractBalance(addrDesc, co defer func(s time.Time) { c.observeRPCLatency("EthereumTypeGetErc20ContractInfo", s, err) }(time.Now()) return c.b.EthereumTypeGetErc20ContractBalance(addrDesc, contractDesc) } + +type mempoolWithMetrics struct { + mempool bchain.Mempool + m *common.Metrics +} + +func (c *mempoolWithMetrics) observeRPCLatency(method string, start time.Time, err error) { + var e string + if err != nil { + e = err.Error() + } + c.m.RPCLatency.With(common.Labels{"method": method, "error": e}).Observe(float64(time.Since(start)) / 1e6) // in milliseconds +} + +func (c *mempoolWithMetrics) Resync(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) { + defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) + count, err = c.mempool.Resync(onNewTxAddr) + if err == nil { + c.m.MempoolSize.Set(float64(count)) + } + return count, err +} + +func (c *mempoolWithMetrics) GetTransactions(address string) (v []bchain.Outpoint, err error) { + defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactions", s, err) }(time.Now()) + return c.mempool.GetTransactions(address) +} + +func (c *mempoolWithMetrics) GetAddrDescTransactions(addrDesc bchain.AddressDescriptor) (v []bchain.Outpoint, err error) { + defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactionsForAddrDesc", s, err) }(time.Now()) + return c.mempool.GetAddrDescTransactions(addrDesc) +} diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index ef7574cd..f26830d2 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -101,37 +101,15 @@ func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationT return s, nil } -// GetChainInfoAndInitializeMempool is called by Initialize and reused by other coins -// it contacts the blockchain rpc interface for the first time -// and if successful it connects to ZeroMQ and creates mempool handler -func (b *BitcoinRPC) GetChainInfoAndInitializeMempool(bc bchain.BlockChain) (string, error) { - // try to connect to block chain and get some info - ci, err := bc.GetChainInfo() - if err != nil { - return "", err - } - chainName := ci.Chain - - mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler) - if err != nil { - glog.Error("mq: ", err) - return "", err - } - b.mq = mq - - b.Mempool = bchain.NewMempoolBitcoinType(bc, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers) - - return chainName, nil -} - // Initialize initializes BitcoinRPC instance. func (b *BitcoinRPC) Initialize() error { b.ChainConfig.SupportsEstimateFee = false - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) @@ -152,6 +130,28 @@ func (b *BitcoinRPC) Initialize() error { return nil } +// CreateMempool creates mempool if not already created, however does not initialize it +func (b *BitcoinRPC) CreateMempool() (bchain.Mempool, error) { + if b.Mempool == nil { + b.Mempool = bchain.NewMempoolBitcoinType(b, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers) + } + return b.Mempool, nil +} + +// InitializeMempool creates ZeroMQ subscription +func (b *BitcoinRPC) InitializeMempool() error { + if b.mq == nil { + mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler) + if err != nil { + glog.Error("mq: ", err) + return err + } + b.mq = mq + } + return nil +} + +// Shutdown ZeroMQ and other resources func (b *BitcoinRPC) Shutdown(ctx context.Context) error { if b.mq != nil { if err := b.mq.Shutdown(ctx); err != nil { @@ -162,10 +162,12 @@ func (b *BitcoinRPC) Shutdown(ctx context.Context) error { return nil } +// GetCoinName returns the coin name func (b *BitcoinRPC) GetCoinName() string { return b.ChainConfig.CoinName } +// GetSubversion returns the backend subversion func (b *BitcoinRPC) GetSubversion() string { return b.ChainConfig.Subversion } @@ -458,6 +460,7 @@ func (b *BitcoinRPC) GetChainInfo() (*bchain.ChainInfo, error) { return rv, nil } +// IsErrBlockNotFound returns true if error means block was not found func IsErrBlockNotFound(err *bchain.RPCError) bool { return err.Message == "Block not found" || err.Message == "Block height out of range" @@ -634,8 +637,8 @@ func (b *BitcoinRPC) GetBlockFull(hash string) (*bchain.Block, error) { return &res.Result, nil } -// GetMempool returns transactions in mempool -func (b *BitcoinRPC) GetMempool() ([]string, error) { +// GetMempoolTransactions returns transactions in mempool +func (b *BitcoinRPC) GetMempoolTransactions() ([]string, error) { glog.V(1).Info("rpc: getrawmempool") res := ResGetMempool{} @@ -651,6 +654,7 @@ func (b *BitcoinRPC) GetMempool() ([]string, error) { return res.Result, nil } +// IsMissingTx return true if error means missing tx func IsMissingTx(err *bchain.RPCError) bool { if err.Code == -5 { // "No such mempool or blockchain transaction" return true @@ -732,23 +736,6 @@ func (b *BitcoinRPC) getRawTransaction(txid string) (json.RawMessage, error) { return res.Result, nil } -// ResyncMempool gets mempool transactions and maps output scripts to transactions. -// ResyncMempool is not reentrant, it should be called from a single thread. -// Return value is number of transactions in mempool -func (b *BitcoinRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) { - return b.Mempool.Resync(onNewTxAddr) -} - -// GetMempoolTransactions returns slice of mempool transactions for given address -func (b *BitcoinRPC) GetMempoolTransactions(address string) ([]bchain.Outpoint, error) { - return b.Mempool.GetTransactions(address) -} - -// GetMempoolTransactionsForAddrDesc returns slice of mempool transactions for given address descriptor -func (b *BitcoinRPC) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) ([]bchain.Outpoint, error) { - return b.Mempool.GetAddrDescTransactions(addrDesc) -} - // EstimateSmartFee returns fee estimation func (b *BitcoinRPC) EstimateSmartFee(blocks int, conservative bool) (big.Int, error) { // use EstimateFee if EstimateSmartFee is not supported @@ -810,7 +797,7 @@ func (b *BitcoinRPC) EstimateFee(blocks int) (big.Int, error) { return r, nil } -// SendRawTransaction sends raw transaction. +// SendRawTransaction sends raw transaction func (b *BitcoinRPC) SendRawTransaction(tx string) (string, error) { glog.V(1).Info("rpc: sendrawtransaction") diff --git a/bchain/coins/btg/bgoldrpc.go b/bchain/coins/btg/bgoldrpc.go index b158ba32..a590de97 100644 --- a/bchain/coins/btg/bgoldrpc.go +++ b/bchain/coins/btg/bgoldrpc.go @@ -29,10 +29,11 @@ func NewBGoldRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp // Initialize initializes BGoldRPC instance. func (b *BGoldRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/dash/dashrpc.go b/bchain/coins/dash/dashrpc.go index 66629d79..2320113d 100644 --- a/bchain/coins/dash/dashrpc.go +++ b/bchain/coins/dash/dashrpc.go @@ -34,10 +34,11 @@ func NewDashRPC(config json.RawMessage, pushHandler func(bchain.NotificationType // Initialize initializes DashRPC instance. func (b *DashRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/digibyte/digibyterpc.go b/bchain/coins/digibyte/digibyterpc.go index 0cc47e9e..d0f6f32c 100644 --- a/bchain/coins/digibyte/digibyterpc.go +++ b/bchain/coins/digibyte/digibyterpc.go @@ -31,10 +31,11 @@ func NewDigiByteRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes DigiByteRPC instance. func (b *DigiByteRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/dogecoin/dogecoinrpc.go b/bchain/coins/dogecoin/dogecoinrpc.go index 1fc21618..9d6b4e63 100644 --- a/bchain/coins/dogecoin/dogecoinrpc.go +++ b/bchain/coins/dogecoin/dogecoinrpc.go @@ -31,10 +31,11 @@ func NewDogecoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes DogecoinRPC instance. func (b *DogecoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 2a34c077..bac05d97 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -160,11 +160,24 @@ func (b *EthereumRPC) Initialize() error { } glog.Info("rpc: block chain ", b.Network) + return nil +} + +// CreateMempool creates mempool if not already created, however does not initialize it +func (b *EthereumRPC) CreateMempool() (bchain.Mempool, error) { + if b.Mempool == nil { + b.Mempool = bchain.NewMempoolEthereumType(b) + } + return b.Mempool, nil +} + +// InitializeMempool creates subscriptions to newHeads and newPendingTransactions +func (b *EthereumRPC) InitializeMempool() error { if b.isETC { glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads") } else { // subscriptions - if err = b.subscribe(func() (*rpc.ClientSubscription, error) { + if err := b.subscribe(func() (*rpc.ClientSubscription, error) { // invalidate the previous subscription - it is either the first one or there was an error b.newBlockSubscription = nil ctx, cancel := context.WithTimeout(context.Background(), b.timeout) @@ -180,7 +193,8 @@ func (b *EthereumRPC) Initialize() error { return err } } - if err = b.subscribe(func() (*rpc.ClientSubscription, error) { + + if err := b.subscribe(func() (*rpc.ClientSubscription, error) { // invalidate the previous subscription - it is either the first one or there was an error b.newTxSubscription = nil ctx, cancel := context.WithTimeout(context.Background(), b.timeout) @@ -195,10 +209,6 @@ func (b *EthereumRPC) Initialize() error { }); err != nil { return err } - - // create mempool - b.Mempool = bchain.NewMempoolEthereumType(b) - return nil } @@ -628,8 +638,8 @@ func (b *EthereumRPC) GetTransactionSpecific(tx *bchain.Tx) (json.RawMessage, er return json.RawMessage(m), err } -// GetMempool returns transactions in mempool -func (b *EthereumRPC) GetMempool() ([]string, error) { +// GetMempoolTransactions returns transactions in mempool +func (b *EthereumRPC) GetMempoolTransactions() ([]string, error) { raw, err := b.getBlockRaw("pending", 0, false) if err != nil { return nil, err @@ -737,28 +747,6 @@ func (b *EthereumRPC) EthereumTypeGetNonce(addrDesc bchain.AddressDescriptor) (u return b.client.NonceAt(ctx, ethcommon.BytesToAddress(addrDesc), nil) } -// ResyncMempool gets mempool transactions and maps output scripts to transactions. -// ResyncMempool is not reentrant, it should be called from a single thread. -// Return value is number of transactions in mempool -func (b *EthereumRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) { - return b.Mempool.Resync(onNewTxAddr) -} - -// GetMempoolTransactions returns slice of mempool transactions for given address -func (b *EthereumRPC) GetMempoolTransactions(address string) ([]bchain.Outpoint, error) { - return b.Mempool.GetTransactions(address) -} - -// GetMempoolTransactionsForAddrDesc returns slice of mempool transactions for given address descriptor -func (b *EthereumRPC) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) ([]bchain.Outpoint, error) { - return b.Mempool.GetAddrDescTransactions(addrDesc) -} - -// GetMempoolEntry is not supported by etherem -func (b *EthereumRPC) GetMempoolEntry(txid string) (*bchain.MempoolEntry, error) { - return nil, errors.New("GetMempoolEntry: not supported") -} - // GetChainParser returns ethereum BlockChainParser func (b *EthereumRPC) GetChainParser() bchain.BlockChainParser { return b.Parser diff --git a/bchain/coins/flo/florpc.go b/bchain/coins/flo/florpc.go index 94f831bb..42549034 100644 --- a/bchain/coins/flo/florpc.go +++ b/bchain/coins/flo/florpc.go @@ -4,6 +4,7 @@ import ( "blockbook/bchain" "blockbook/bchain/coins/btc" "encoding/json" + "github.com/juju/errors" "github.com/golang/glog" @@ -32,10 +33,11 @@ func NewFloRPC(config json.RawMessage, pushHandler func(bchain.NotificationType) // Initialize initializes FloRPC instance. func (f *FloRPC) Initialize() error { - chainName, err := f.GetChainInfoAndInitializeMempool(f) + ci, err := f.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/fujicoin/fujicoinrpc.go b/bchain/coins/fujicoin/fujicoinrpc.go index 16c562cc..d7588812 100644 --- a/bchain/coins/fujicoin/fujicoinrpc.go +++ b/bchain/coins/fujicoin/fujicoinrpc.go @@ -31,10 +31,11 @@ func NewFujicoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes FujicoinRPC instance. func (b *FujicoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/gamecredits/gamecreditsrpc.go b/bchain/coins/gamecredits/gamecreditsrpc.go index bb3cecb0..763ff018 100644 --- a/bchain/coins/gamecredits/gamecreditsrpc.go +++ b/bchain/coins/gamecredits/gamecreditsrpc.go @@ -31,10 +31,11 @@ func NewGameCreditsRPC(config json.RawMessage, pushHandler func(bchain.Notificat // Initialize initializes GameCreditsRPC instance. func (b *GameCreditsRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/grs/grsrpc.go b/bchain/coins/grs/grsrpc.go index d3afb9f0..8d4495fa 100644 --- a/bchain/coins/grs/grsrpc.go +++ b/bchain/coins/grs/grsrpc.go @@ -27,10 +27,11 @@ func NewGroestlcoinRPC(config json.RawMessage, pushHandler func(bchain.Notificat // Initialize initializes GroestlcoinRPC instance. func (g *GroestlcoinRPC) Initialize() error { - chainName, err := g.GetChainInfoAndInitializeMempool(g) + ci, err := g.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/koto/kotorpc.go b/bchain/coins/koto/kotorpc.go index 7bb35868..5dc87295 100644 --- a/bchain/coins/koto/kotorpc.go +++ b/bchain/coins/koto/kotorpc.go @@ -28,10 +28,11 @@ func NewKotoRPC(config json.RawMessage, pushHandler func(bchain.NotificationType // Initialize initializes KotoRPC instance. func (z *KotoRPC) Initialize() error { - chainName, err := z.GetChainInfoAndInitializeMempool(z) + ci, err := z.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/liquid/liquidrpc.go b/bchain/coins/liquid/liquidrpc.go index 7bfc54a7..1ec45fb2 100644 --- a/bchain/coins/liquid/liquidrpc.go +++ b/bchain/coins/liquid/liquidrpc.go @@ -31,10 +31,11 @@ func NewLiquidRPC(config json.RawMessage, pushHandler func(bchain.NotificationTy // Initialize initializes GameCreditsRPC instance. func (b *LiquidRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/litecoin/litecoinrpc.go b/bchain/coins/litecoin/litecoinrpc.go index e7345251..274e3054 100644 --- a/bchain/coins/litecoin/litecoinrpc.go +++ b/bchain/coins/litecoin/litecoinrpc.go @@ -31,10 +31,11 @@ func NewLitecoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes LitecoinRPC instance. func (b *LitecoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/monacoin/monacoinrpc.go b/bchain/coins/monacoin/monacoinrpc.go index dacddb4c..77accd43 100644 --- a/bchain/coins/monacoin/monacoinrpc.go +++ b/bchain/coins/monacoin/monacoinrpc.go @@ -31,10 +31,11 @@ func NewMonacoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes MonacoinRPC instance. func (b *MonacoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/myriad/myriadrpc.go b/bchain/coins/myriad/myriadrpc.go index f0bd201a..cafa00b6 100644 --- a/bchain/coins/myriad/myriadrpc.go +++ b/bchain/coins/myriad/myriadrpc.go @@ -31,10 +31,11 @@ func NewMyriadRPC(config json.RawMessage, pushHandler func(bchain.NotificationTy // Initialize initializes MyriadRPC instance. func (b *MyriadRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/namecoin/namecoinrpc.go b/bchain/coins/namecoin/namecoinrpc.go index 89dc245c..927c095d 100644 --- a/bchain/coins/namecoin/namecoinrpc.go +++ b/bchain/coins/namecoin/namecoinrpc.go @@ -31,10 +31,11 @@ func NewNamecoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes NamecoinRPC instance. func (b *NamecoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/pivx/pivxrpc.go b/bchain/coins/pivx/pivxrpc.go index 8979ee4f..38f2c2cc 100644 --- a/bchain/coins/pivx/pivxrpc.go +++ b/bchain/coins/pivx/pivxrpc.go @@ -32,10 +32,11 @@ func NewPivXRPC(config json.RawMessage, pushHandler func(bchain.NotificationType // Initialize initializes PivXRPC instance. func (b *PivXRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/qtum/qtumrpc.go b/bchain/coins/qtum/qtumrpc.go index b5a8cafc..a942ce3b 100644 --- a/bchain/coins/qtum/qtumrpc.go +++ b/bchain/coins/qtum/qtumrpc.go @@ -4,6 +4,7 @@ import ( "blockbook/bchain" "blockbook/bchain/coins/btc" "encoding/json" + "github.com/golang/glog" ) @@ -30,10 +31,11 @@ func NewQtumRPC(config json.RawMessage, pushHandler func(bchain.NotificationType // Initialize initializes QtumRPC instance. func (b *QtumRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/vertcoin/vertcoinrpc.go b/bchain/coins/vertcoin/vertcoinrpc.go index f282dc78..ec1e1ba2 100644 --- a/bchain/coins/vertcoin/vertcoinrpc.go +++ b/bchain/coins/vertcoin/vertcoinrpc.go @@ -31,10 +31,11 @@ func NewVertcoinRPC(config json.RawMessage, pushHandler func(bchain.Notification // Initialize initializes VertcoinRPC instance. func (b *VertcoinRPC) Initialize() error { - chainName, err := b.GetChainInfoAndInitializeMempool(b) + ci, err := b.GetChainInfo() if err != nil { return err } + chainName := ci.Chain glog.Info("Chain name ", chainName) params := GetChainParams(chainName) diff --git a/bchain/coins/xzc/zcoinrpc.go b/bchain/coins/xzc/zcoinrpc.go index 1117cbc3..22dfbe92 100644 --- a/bchain/coins/xzc/zcoinrpc.go +++ b/bchain/coins/xzc/zcoinrpc.go @@ -36,10 +36,11 @@ func NewZcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp } func (zc *ZcoinRPC) Initialize() error { - chainName, err := zc.GetChainInfoAndInitializeMempool(zc) + ci, err := zc.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/coins/zec/zcashrpc.go b/bchain/coins/zec/zcashrpc.go index 3ecffea3..4da5f8d4 100644 --- a/bchain/coins/zec/zcashrpc.go +++ b/bchain/coins/zec/zcashrpc.go @@ -30,10 +30,11 @@ func NewZCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp // Initialize initializes ZCashRPC instance func (z *ZCashRPC) Initialize() error { - chainName, err := z.GetChainInfoAndInitializeMempool(z) + ci, err := z.GetChainInfo() if err != nil { return err } + chainName := ci.Chain params := GetChainParams(chainName) diff --git a/bchain/mempool_bitcoin_type.go b/bchain/mempool_bitcoin_type.go index db12ffd4..e87cc937 100644 --- a/bchain/mempool_bitcoin_type.go +++ b/bchain/mempool_bitcoin_type.go @@ -163,7 +163,7 @@ func (m *MempoolBitcoinType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { start := time.Now() glog.V(1).Info("mempool: resync") m.onNewTxAddr = onNewTxAddr - txs, err := m.chain.GetMempool() + txs, err := m.chain.GetMempoolTransactions() if err != nil { return 0, err } diff --git a/bchain/mempool_ethereum_type.go b/bchain/mempool_ethereum_type.go index eff4bc53..1f05c9c5 100644 --- a/bchain/mempool_ethereum_type.go +++ b/bchain/mempool_ethereum_type.go @@ -62,7 +62,7 @@ func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) [ func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { start := time.Now() glog.V(1).Info("Mempool: resync") - txs, err := m.chain.GetMempool() + txs, err := m.chain.GetMempoolTransactions() if err != nil { return 0, err } diff --git a/bchain/types.go b/bchain/types.go index ead62535..044122e6 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -194,7 +194,13 @@ type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor) // BlockChain defines common interface to block chain daemon type BlockChain interface { // life-cycle methods + // intialize the block chain connector Initialize() error + // create mempool but do not initialize it + CreateMempool() (Mempool, error) + // initialize mempool, create ZeroMQ (or other) subscription + InitializeMempool() error + // shutdown mempool, ZeroMQ and block chain connections Shutdown(ctx context.Context) error // chain info IsTestnet() bool @@ -209,17 +215,13 @@ type BlockChain interface { GetBlockHeader(hash string) (*BlockHeader, error) GetBlock(hash string, height uint32) (*Block, error) GetBlockInfo(hash string) (*BlockInfo, error) - GetMempool() ([]string, error) + GetMempoolTransactions() ([]string, error) GetTransaction(txid string) (*Tx, error) GetTransactionForMempool(txid string) (*Tx, error) GetTransactionSpecific(tx *Tx) (json.RawMessage, error) EstimateSmartFee(blocks int, conservative bool) (big.Int, error) EstimateFee(blocks int) (big.Int, error) SendRawTransaction(tx string) (string, error) - // mempool - ResyncMempool(onNewTxAddr OnNewTxAddrFunc) (int, error) - GetMempoolTransactions(address string) ([]Outpoint, error) - GetMempoolTransactionsForAddrDesc(addrDesc AddressDescriptor) ([]Outpoint, error) GetMempoolEntry(txid string) (*MempoolEntry, error) // parser GetChainParser() BlockChainParser @@ -270,3 +272,10 @@ type BlockChainParser interface { // EthereumType specific EthereumTypeGetErc20FromTx(tx *Tx) ([]Erc20Transfer, error) } + +// Mempool defines common interface to mempool +type Mempool interface { + Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) + GetTransactions(address string) ([]Outpoint, error) + GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) +} diff --git a/blockbook.go b/blockbook.go index 44d8def9..6315ad52 100644 --- a/blockbook.go +++ b/blockbook.go @@ -82,6 +82,7 @@ var ( chanSyncMempoolDone = make(chan struct{}) chanStoreInternalStateDone = make(chan struct{}) chain bchain.BlockChain + mempool bchain.Mempool index *db.RocksDB txCache *db.TxCache metrics *common.Metrics @@ -98,26 +99,27 @@ func init() { glog.CopyStandardLogTo("INFO") } -func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, error) { +func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, bchain.Mempool, error) { var chain bchain.BlockChain + var mempool bchain.Mempool var err error timer := time.NewTimer(time.Second) for i := 0; ; i++ { - if chain, err = coins.NewBlockChain(coin, configfile, pushHandler, metrics); err != nil { + if chain, mempool, err = coins.NewBlockChain(coin, configfile, pushHandler, metrics); err != nil { if i < seconds { glog.Error("rpc: ", err, " Retrying...") select { case <-chanOsSignal: - return nil, errors.New("Interrupted") + return nil, nil, errors.New("Interrupted") case <-timer.C: timer.Reset(time.Second) continue } } else { - return nil, err + return nil, nil, err } } - return chain, nil + return chain, mempool, nil } } @@ -162,7 +164,7 @@ func main() { glog.Fatal("metrics: ", err) } - if chain, err = getBlockChainWithRetry(coin, *blockchain, pushSynchronizationHandler, metrics, 60); err != nil { + if chain, mempool, err = getBlockChainWithRetry(coin, *blockchain, pushSynchronizationHandler, metrics, 60); err != nil { glog.Fatal("rpc: ", err) } @@ -209,29 +211,7 @@ func main() { } if *rollbackHeight >= 0 { - bestHeight, bestHash, err := index.GetBestBlock() - if err != nil { - glog.Error("rollbackHeight: ", err) - return - } - if uint32(*rollbackHeight) > bestHeight { - glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight) - } else { - hashes := []string{bestHash} - for height := bestHeight - 1; height >= uint32(*rollbackHeight); height-- { - hash, err := index.GetBlockHash(height) - if err != nil { - glog.Error("rollbackHeight: ", err) - return - } - hashes = append(hashes, hash) - } - err = syncWorker.DisconnectBlocks(uint32(*rollbackHeight), bestHeight, hashes) - if err != nil { - glog.Error("rollbackHeight: ", err) - return - } - } + performRollback() return } @@ -247,7 +227,7 @@ func main() { var internalServer *server.InternalServer if *internalBinding != "" { - internalServer, err = server.NewInternalServer(*internalBinding, *certFiles, index, chain, txCache, internalState) + internalServer, err = server.NewInternalServer(*internalBinding, *certFiles, index, chain, mempool, txCache, internalState) if err != nil { glog.Error("https: ", err) return @@ -268,7 +248,7 @@ func main() { var publicServer *server.PublicServer if *publicBinding != "" { // start public server in limited functionality, extend it after sync is finished by calling ConnectFullPublicInterface - publicServer, err = server.NewPublicServer(*publicBinding, *certFiles, index, chain, txCache, *explorerURL, metrics, internalState, *debugMode) + publicServer, err = server.NewPublicServer(*publicBinding, *certFiles, index, chain, mempool, txCache, *explorerURL, metrics, internalState, *debugMode) if err != nil { glog.Error("socketio: ", err) return @@ -295,8 +275,14 @@ func main() { glog.Error("resyncIndex ", err) return } + // initialize mempool after the initial sync is complete + err = chain.InitializeMempool() + if err != nil { + glog.Error("initializeMempool ", err) + return + } var mempoolCount int - if mempoolCount, err = chain.ResyncMempool(nil); err != nil { + if mempoolCount, err = mempool.Resync(nil); err != nil { glog.Error("resyncMempool ", err) return } @@ -341,8 +327,34 @@ func main() { } } +func performRollback() { + bestHeight, bestHash, err := index.GetBestBlock() + if err != nil { + glog.Error("rollbackHeight: ", err) + return + } + if uint32(*rollbackHeight) > bestHeight { + glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight) + } else { + hashes := []string{bestHash} + for height := bestHeight - 1; height >= uint32(*rollbackHeight); height-- { + hash, err := index.GetBlockHash(height) + if err != nil { + glog.Error("rollbackHeight: ", err) + return + } + hashes = append(hashes, hash) + } + err = syncWorker.DisconnectBlocks(uint32(*rollbackHeight), bestHeight, hashes) + if err != nil { + glog.Error("rollbackHeight: ", err) + return + } + } +} + func blockbookAppInfoMetric(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error { - api, err := api.NewWorker(db, chain, txCache, is) + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return err } @@ -442,7 +454,7 @@ func syncMempoolLoop() { // resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second tickAndDebounce(time.Duration(*resyncMempoolPeriodMs)*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() { internalState.StartedMempoolSync() - if count, err := chain.ResyncMempool(onNewTxAddr); err != nil { + if count, err := mempool.Resync(onNewTxAddr); err != nil { glog.Error("syncMempoolLoop ", errors.ErrorStack(err)) } else { internalState.FinishedMempoolSync(count) diff --git a/server/internal.go b/server/internal.go index 6ef09f1e..1f7c3b89 100644 --- a/server/internal.go +++ b/server/internal.go @@ -23,13 +23,14 @@ type InternalServer struct { txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser + mempool bchain.Mempool is *common.InternalState api *api.Worker } // NewInternalServer creates new internal http interface to blockbook and returns its handle -func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState) (*InternalServer, error) { - api, err := api.NewWorker(db, chain, txCache, is) +func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*InternalServer, error) { + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err } @@ -47,6 +48,7 @@ func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.B txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), + mempool: mempool, is: is, api: api, } diff --git a/server/public.go b/server/public.go index 12deb354..7543ec2a 100644 --- a/server/public.go +++ b/server/public.go @@ -45,6 +45,7 @@ type PublicServer struct { txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser + mempool bchain.Mempool api *api.Worker explorerURL string internalExplorer bool @@ -56,19 +57,19 @@ type PublicServer struct { // NewPublicServer creates new public server http interface to blockbook and returns its handle // only basic functionality is mapped, to map all functions, call -func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool) (*PublicServer, error) { +func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool) (*PublicServer, error) { - api, err := api.NewWorker(db, chain, txCache, is) + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err } - socketio, err := NewSocketIoServer(db, chain, txCache, metrics, is) + socketio, err := NewSocketIoServer(db, chain, mempool, txCache, metrics, is) if err != nil { return nil, err } - websocket, err := NewWebsocketServer(db, chain, txCache, metrics, is) + websocket, err := NewWebsocketServer(db, chain, mempool, txCache, metrics, is) if err != nil { return nil, err } @@ -91,6 +92,7 @@ func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bch txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), + mempool: mempool, explorerURL: explorerURL, internalExplorer: explorerURL == "", metrics: metrics, diff --git a/server/public_test.go b/server/public_test.go index e203d947..aa64044e 100644 --- a/server/public_test.go +++ b/server/public_test.go @@ -85,6 +85,11 @@ func setupPublicHTTPServer(t *testing.T) (*PublicServer, string) { glog.Fatal("fakechain: ", err) } + mempool, err := chain.CreateMempool() + if err != nil { + glog.Fatal("mempool: ", err) + } + // caching is switched off because test transactions do not have hex data txCache, err := db.NewTxCache(d, chain, metrics, is, false) if err != nil { @@ -92,7 +97,7 @@ func setupPublicHTTPServer(t *testing.T) (*PublicServer, string) { } // s.Run is never called, binding can be to any port - s, err := NewPublicServer("localhost:12345", "", d, chain, txCache, "", metrics, is, false) + s, err := NewPublicServer("localhost:12345", "", d, chain, mempool, txCache, "", metrics, is, false) if err != nil { t.Fatal(err) } diff --git a/server/socketio.go b/server/socketio.go index fc70510d..a1790762 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -26,14 +26,15 @@ type SocketIoServer struct { txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser + mempool bchain.Mempool metrics *common.Metrics is *common.InternalState api *api.Worker } // NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle -func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) { - api, err := api.NewWorker(db, chain, txCache, is) +func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) { + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err } @@ -64,6 +65,7 @@ func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCa txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), + mempool: mempool, metrics: metrics, is: is, api: api, @@ -224,7 +226,7 @@ func (s *SocketIoServer) getAddressTxids(addr []string, opts *addrOpts) (res res return res, err } } else { - o, err := s.chain.GetMempoolTransactions(address) + o, err := s.mempool.GetTransactions(address) if err != nil { return res, err } diff --git a/server/websocket.go b/server/websocket.go index 0c9ec060..3f504ea3 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -59,6 +59,7 @@ type WebsocketServer struct { txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser + mempool bchain.Mempool metrics *common.Metrics is *common.InternalState api *api.Worker @@ -70,8 +71,8 @@ type WebsocketServer struct { } // NewWebsocketServer creates new websocket interface to blockbook and returns its handle -func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) { - api, err := api.NewWorker(db, chain, txCache, is) +func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) { + api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err } @@ -89,6 +90,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxC txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), + mempool: mempool, metrics: metrics, is: is, api: api, diff --git a/tests/dbtestdata/fakechain.go b/tests/dbtestdata/fakechain.go index 71515bbb..2f05bd4b 100644 --- a/tests/dbtestdata/fakechain.go +++ b/tests/dbtestdata/fakechain.go @@ -17,10 +17,18 @@ func NewFakeBlockChain(parser bchain.BlockChainParser) (bchain.BlockChain, error return &fakeBlockChain{&bchain.BaseChain{Parser: parser}}, nil } +func (b *fakeBlockChain) CreateMempool() (bchain.Mempool, error) { + return bchain.NewMempoolBitcoinType(b, 1, 1), nil +} + func (c *fakeBlockChain) Initialize() error { return nil } +func (c *fakeBlockChain) InitializeMempool() error { + return nil +} + func (c *fakeBlockChain) Shutdown(ctx context.Context) error { return nil } @@ -118,10 +126,6 @@ func (c *fakeBlockChain) GetBlockInfo(hash string) (v *bchain.BlockInfo, err err return nil, bchain.ErrBlockNotFound } -func (c *fakeBlockChain) GetMempool() (v []string, err error) { - return nil, errors.New("Not implemented") -} - func getTxInBlock(b *bchain.Block, txid string) *bchain.Tx { for _, tx := range b.Txs { if tx.Txid == txid { @@ -179,22 +183,12 @@ func (c *fakeBlockChain) SendRawTransaction(tx string) (v string, err error) { return "", errors.New("Invalid data") } -func (c *fakeBlockChain) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) { - return 0, errors.New("Not implemented") -} - -func (c *fakeBlockChain) GetMempoolTransactions(address string) (v []bchain.Outpoint, err error) { - return nil, errors.New("Not implemented") -} - -func (c *fakeBlockChain) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) (v []bchain.Outpoint, err error) { - return []bchain.Outpoint{}, nil -} - -func (c *fakeBlockChain) GetMempoolEntry(txid string) (v *bchain.MempoolEntry, err error) { - return nil, errors.New("Not implemented") -} - +// GetChainParser returns parser for the blockchain func (c *fakeBlockChain) GetChainParser() bchain.BlockChainParser { return c.Parser } + +// GetMempoolTransactions returns transactions in mempool +func (b *fakeBlockChain) GetMempoolTransactions() ([]string, error) { + return nil, errors.New("Not implemented") +} diff --git a/tests/integration.go b/tests/integration.go index ad0815f4..789f71fc 100644 --- a/tests/integration.go +++ b/tests/integration.go @@ -5,7 +5,7 @@ package tests import ( "blockbook/bchain" "blockbook/bchain/coins" - "blockbook/build/tools" + build "blockbook/build/tools" "blockbook/tests/rpc" "blockbook/tests/sync" "encoding/json" @@ -23,7 +23,7 @@ import ( "github.com/martinboehm/btcutil/chaincfg" ) -type TestFunc func(t *testing.T, coin string, chain bchain.BlockChain, testConfig json.RawMessage) +type TestFunc func(t *testing.T, coin string, chain bchain.BlockChain, mempool bchain.Mempool, testConfig json.RawMessage) var integrationTests = map[string]TestFunc{ "rpc": rpc.IntegrationTest, @@ -76,7 +76,7 @@ func runTests(t *testing.T, coin string, cfg map[string]json.RawMessage) { } defer chaincfg.ResetParams() - bc, err := makeBlockChain(coin) + bc, m, err := makeBlockChain(coin) if err != nil { if err == notConnectedError { t.Fatal(err) @@ -86,44 +86,44 @@ func runTests(t *testing.T, coin string, cfg map[string]json.RawMessage) { for test, c := range cfg { if fn, found := integrationTests[test]; found { - t.Run(test, func(t *testing.T) { fn(t, coin, bc, c) }) + t.Run(test, func(t *testing.T) { fn(t, coin, bc, m, c) }) } else { t.Errorf("Test not found: %s", test) } } } -func makeBlockChain(coin string) (bchain.BlockChain, error) { +func makeBlockChain(coin string) (bchain.BlockChain, bchain.Mempool, error) { c, err := build.LoadConfig("../configs", coin) if err != nil { - return nil, err + return nil, nil, err } outputDir, err := ioutil.TempDir("", "integration_test") if err != nil { - return nil, err + return nil, nil, err } defer os.RemoveAll(outputDir) err = build.GeneratePackageDefinitions(c, "../build/templates", outputDir) if err != nil { - return nil, err + return nil, nil, err } b, err := ioutil.ReadFile(filepath.Join(outputDir, "blockbook", "blockchaincfg.json")) if err != nil { - return nil, err + return nil, nil, err } var cfg json.RawMessage err = json.Unmarshal(b, &cfg) if err != nil { - return nil, err + return nil, nil, err } coinName, err := getName(cfg) if err != nil { - return nil, err + return nil, nil, err } return initBlockChain(coinName, cfg) @@ -147,29 +147,39 @@ func getName(raw json.RawMessage) (string, error) { } } -func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, error) { +func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, bchain.Mempool, error) { factory, found := coins.BlockChainFactories[coinName] if !found { - return nil, fmt.Errorf("Factory function not found") + return nil, nil, fmt.Errorf("Factory function not found") } cli, err := factory(cfg, func(_ bchain.NotificationType) {}) if err != nil { if isNetError(err) { - return nil, notConnectedError + return nil, nil, notConnectedError } - return nil, fmt.Errorf("Factory function failed: %s", err) + return nil, nil, fmt.Errorf("Factory function failed: %s", err) } err = cli.Initialize() if err != nil { if isNetError(err) { - return nil, notConnectedError + return nil, nil, notConnectedError } - return nil, fmt.Errorf("BlockChain initialization failed: %s", err) + return nil, nil, fmt.Errorf("BlockChain initialization failed: %s", err) } - return cli, nil + mempool, err := cli.CreateMempool() + if err != nil { + return nil, nil, fmt.Errorf("Mempool creation failed: %s", err) + } + + err = cli.InitializeMempool() + if err != nil { + return nil, nil, fmt.Errorf("Mempool initialization failed: %s", err) + } + + return cli, mempool, nil } func isNetError(err error) bool { diff --git a/tests/rpc/rpc.go b/tests/rpc/rpc.go index e8722062..c4122d3f 100644 --- a/tests/rpc/rpc.go +++ b/tests/rpc/rpc.go @@ -30,6 +30,7 @@ var testMap = map[string]func(t *testing.T, th *TestHandler){ type TestHandler struct { Chain bchain.BlockChain + Mempool bchain.Mempool TestData *TestData } @@ -42,7 +43,7 @@ type TestData struct { TxDetails map[string]*bchain.Tx `json:"txDetails"` } -func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, testConfig json.RawMessage) { +func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, mempool bchain.Mempool, testConfig json.RawMessage) { tests, err := getTests(testConfig) if err != nil { t.Fatalf("Failed loading of test list: %s", err) @@ -54,7 +55,11 @@ func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, testCon t.Fatalf("Failed loading of test data: %s", err) } - h := TestHandler{Chain: chain, TestData: td} + h := TestHandler{ + Chain: chain, + Mempool: mempool, + TestData: td, + } for _, test := range tests { if f, found := testMap[test]; found { @@ -195,7 +200,7 @@ func testMempoolSync(t *testing.T, h *TestHandler) { for i := 0; i < 3; i++ { txs := getMempool(t, h) - n, err := h.Chain.ResyncMempool(nil) + n, err := h.Mempool.Resync(nil) if err != nil { t.Fatal(err) } @@ -217,7 +222,7 @@ func testMempoolSync(t *testing.T, h *TestHandler) { for txid, addrs := range txid2addrs { for _, a := range addrs { - got, err := h.Chain.GetMempoolTransactions(a) + got, err := h.Mempool.GetTransactions(a) if err != nil { t.Fatalf("address %q: %s", a, err) } @@ -337,7 +342,7 @@ func testGetBlockHeader(t *testing.T, h *TestHandler) { } func getMempool(t *testing.T, h *TestHandler) []string { - txs, err := h.Chain.GetMempool() + txs, err := h.Chain.GetMempoolTransactions() if err != nil { t.Fatal(err) } diff --git a/tests/sync/sync.go b/tests/sync/sync.go index 6c01bfc0..2364da44 100644 --- a/tests/sync/sync.go +++ b/tests/sync/sync.go @@ -54,7 +54,7 @@ type BlockInfo struct { TxDetails []*bchain.Tx `json:"txDetails"` } -func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, testConfig json.RawMessage) { +func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, mempool bchain.Mempool, testConfig json.RawMessage) { tests, err := getTests(testConfig) if err != nil { t.Fatalf("Failed loading of test list: %s", err)