Merge branch 'mempool'

pull/155/head v0.2.2
Martin Boehm 2019-04-15 12:27:24 +02:00
commit 9642e306ac
58 changed files with 948 additions and 546 deletions

View File

@ -337,3 +337,16 @@ type SystemInfo struct {
Blockbook *BlockbookInfo `json:"blockbook"`
Backend *bchain.ChainInfo `json:"backend"`
}
// MempoolTxid contains information about a transaction in mempool
type MempoolTxid struct {
Time int64 `json:"time"`
Txid string `json:"txid"`
}
// MempoolTxids contains a list of mempool txids with paging information
type MempoolTxids struct {
Paging
Mempool []MempoolTxid `json:"mempool"`
MempoolSize int `json:"mempoolSize"`
}

View File

@ -23,17 +23,19 @@ type Worker struct {
chain bchain.BlockChain
chainParser bchain.BlockChainParser
chainType bchain.ChainType
mempool bchain.Mempool
is *common.InternalState
}
// NewWorker creates new api worker
func NewWorker(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState) (*Worker, error) {
func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*Worker, error) {
w := &Worker{
db: db,
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
chainType: chain.GetChainParser().GetChainType(),
mempool: mempool,
is: is,
}
return w, nil
@ -292,6 +294,10 @@ func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height uint32,
return nil, err
}
}
// for mempool transaction get first seen time
if bchainTx.Confirmations == 0 {
bchainTx.Blocktime = int64(w.mempool.GetTransactionTime(bchainTx.Txid))
}
r := &Tx{
Blockhash: blockhash,
Blockheight: int(height),
@ -348,7 +354,7 @@ func (w *Worker) getAddressTxids(addrDesc bchain.AddressDescriptor, mempool bool
}
if mempool {
uniqueTxs := make(map[string]struct{})
o, err := w.chain.GetMempoolTransactionsForAddrDesc(addrDesc)
o, err := w.mempool.GetAddrDescTransactions(addrDesc)
if err != nil {
return nil, err
}
@ -1068,3 +1074,26 @@ func (w *Worker) GetSystemInfo(internal bool) (*SystemInfo, error) {
glog.Info("GetSystemInfo finished in ", time.Since(start))
return &SystemInfo{bi, ci}, nil
}
// GetMempool returns a page of mempool txids
func (w *Worker) GetMempool(page int, itemsOnPage int) (*MempoolTxids, error) {
page--
if page < 0 {
page = 0
}
entries := w.mempool.GetAllEntries()
pg, from, to, page := computePaging(len(entries), page, itemsOnPage)
r := &MempoolTxids{
Paging: pg,
MempoolSize: len(entries),
}
r.Mempool = make([]MempoolTxid, to-from)
for i := from; i < to; i++ {
entry := &entries[i]
r.Mempool[i-from] = MempoolTxid{
Txid: entry.Txid,
Time: int64(entry.Time),
}
}
return r, nil
}

View File

@ -92,7 +92,7 @@ func (w *Worker) xpubGetAddressTxids(addrDesc bchain.AddressDescriptor, mempool
}
if mempool {
uniqueTxs := make(map[string]int)
o, err := w.chain.GetMempoolTransactionsForAddrDesc(addrDesc)
o, err := w.mempool.GetAddrDescTransactions(addrDesc)
if err != nil {
return nil, false, err
}
@ -383,13 +383,13 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
return nil, err
}
// setup filtering of txids
var useTxids func(txid *xpubTxid, ad *xpubAddress) bool
var txidFilter func(txid *xpubTxid, ad *xpubAddress) bool
if !(filter.FromHeight == 0 && filter.ToHeight == 0 && filter.Vout == AddressFilterVoutOff) {
toHeight := maxUint32
if filter.ToHeight != 0 {
toHeight = filter.ToHeight
}
useTxids = func(txid *xpubTxid, ad *xpubAddress) bool {
txidFilter = func(txid *xpubTxid, ad *xpubAddress) bool {
if txid.height < filter.FromHeight || txid.height > toHeight {
return false
}
@ -406,6 +406,7 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
// process mempool, only if ToHeight is not specified
if filter.ToHeight == 0 && !filter.OnlyConfirmed {
txmMap = make(map[string]*Tx)
mempoolEntries := make(bchain.MempoolTxidEntries, 0)
for _, da := range [][]xpubAddress{data.addresses, data.changeAddresses} {
for i := range da {
ad := &da[i]
@ -432,18 +433,23 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
}
uBalSat.Add(&uBalSat, tx.getAddrVoutValue(ad.addrDesc))
uBalSat.Sub(&uBalSat, tx.getAddrVinValue(ad.addrDesc))
if page == 0 && !foundTx && (useTxids == nil || useTxids(&txid, ad)) {
if option == AccountDetailsTxidHistory {
txids = append(txids, tx.Txid)
} else if option >= AccountDetailsTxHistoryLight {
txs = append(txs, tx)
}
// mempool txs are returned only on the first page, uniquely and filtered
if page == 0 && !foundTx && (txidFilter == nil || txidFilter(&txid, ad)) {
mempoolEntries = append(mempoolEntries, bchain.MempoolTxidEntry{Txid: txid.txid, Time: uint32(tx.Blocktime)})
}
}
}
}
}
// sort the entries by time descending
sort.Sort(mempoolEntries)
for _, entry := range mempoolEntries {
if option == AccountDetailsTxidHistory {
txids = append(txids, entry.Txid)
} else if option >= AccountDetailsTxHistoryLight {
txs = append(txs, txmMap[entry.Txid])
}
}
}
if option >= AccountDetailsTxidHistory {
txcMap := make(map[string]bool)
@ -459,7 +465,7 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
}
// add tx only once
if !added {
add := useTxids == nil || useTxids(&txid, ad)
add := txidFilter == nil || txidFilter(&txid, ad)
txcMap[txid.txid] = add
if add {
txc = append(txc, txid)

View File

@ -29,6 +29,11 @@ func (b *BaseChain) GetNetworkName() string {
return b.Network
}
// GetMempoolEntry is not supported by default
func (b *BaseChain) GetMempoolEntry(txid string) (*MempoolEntry, error) {
return nil, errors.New("GetMempoolEntry: not supported")
}
// EthereumTypeGetBalance is not supported
func (b *BaseChain) EthereumTypeGetBalance(addrDesc AddressDescriptor) (*big.Int, error) {
return nil, errors.New("Not supported")

View File

@ -0,0 +1,115 @@
package bchain
import (
"sort"
"sync"
)
type addrIndex struct {
addrDesc string
n int32
}
type txEntry struct {
addrIndexes []addrIndex
time uint32
}
type txidio struct {
txid string
io []addrIndex
}
// BaseMempool is mempool base handle
type BaseMempool struct {
chain BlockChain
mux sync.Mutex
txEntries map[string]txEntry
addrDescToTx map[string][]Outpoint
OnNewTxAddr OnNewTxAddrFunc
}
// GetTransactions returns slice of mempool transactions for given address
func (m *BaseMempool) GetTransactions(address string) ([]Outpoint, error) {
parser := m.chain.GetChainParser()
addrDesc, err := parser.GetAddrDescFromAddress(address)
if err != nil {
return nil, err
}
return m.GetAddrDescTransactions(addrDesc)
}
// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor, in reverse order
func (m *BaseMempool) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) {
m.mux.Lock()
defer m.mux.Unlock()
outpoints := m.addrDescToTx[string(addrDesc)]
rv := make([]Outpoint, len(outpoints))
for i, j := len(outpoints)-1, 0; i >= 0; i-- {
rv[j] = outpoints[i]
j++
}
return rv, nil
}
func (a MempoolTxidEntries) Len() int { return len(a) }
func (a MempoolTxidEntries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a MempoolTxidEntries) Less(i, j int) bool {
// if the Time is equal, sort by txid to make the order defined
hi := a[i].Time
hj := a[j].Time
if hi == hj {
return a[i].Txid > a[j].Txid
}
// order in reverse
return hi > hj
}
// removeEntryFromMempool removes entry from mempool structs. The caller is responsible for locking!
func (m *BaseMempool) removeEntryFromMempool(txid string, entry txEntry) {
delete(m.txEntries, txid)
for _, si := range entry.addrIndexes {
outpoints, found := m.addrDescToTx[si.addrDesc]
if found {
newOutpoints := make([]Outpoint, 0, len(outpoints)-1)
for _, o := range outpoints {
if o.Txid != txid {
newOutpoints = append(newOutpoints, o)
}
}
if len(newOutpoints) > 0 {
m.addrDescToTx[si.addrDesc] = newOutpoints
} else {
delete(m.addrDescToTx, si.addrDesc)
}
}
}
}
// GetAllEntries returns all mempool entries sorted by fist seen time in descending order
func (m *BaseMempool) GetAllEntries() MempoolTxidEntries {
i := 0
m.mux.Lock()
entries := make(MempoolTxidEntries, len(m.txEntries))
for txid, entry := range m.txEntries {
entries[i] = MempoolTxidEntry{
Txid: txid,
Time: entry.time,
}
i++
}
m.mux.Unlock()
sort.Sort(entries)
return entries
}
// GetTransactionTime returns first seen time of a transaction
func (m *BaseMempool) GetTransactionTime(txid string) uint32 {
m.mux.Lock()
e, found := m.txEntries[txid]
m.mux.Unlock()
if !found {
return 0
}
return e.time
}

View File

@ -34,10 +34,11 @@ func NewBCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp
// Initialize initializes BCashRPC instance.
func (b *BCashRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewBellcoinRPC(config json.RawMessage, pushHandler func(bchain.Notification
// Initialize initializes BellcoinRPC instance.
func (b *BellcoinRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -103,30 +103,34 @@ func GetCoinNameFromConfig(configfile string) (string, string, string, error) {
return cn.CoinName, cn.CoinShortcut, cn.CoinLabel, nil
}
// NewBlockChain creates bchain.BlockChain of type defined by parameter coin
func NewBlockChain(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics) (bchain.BlockChain, error) {
// NewBlockChain creates bchain.BlockChain and bchain.Mempool for the coin passed by the parameter coin
func NewBlockChain(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics) (bchain.BlockChain, bchain.Mempool, error) {
data, err := ioutil.ReadFile(configfile)
if err != nil {
return nil, errors.Annotatef(err, "Error reading file %v", configfile)
return nil, nil, errors.Annotatef(err, "Error reading file %v", configfile)
}
var config json.RawMessage
err = json.Unmarshal(data, &config)
if err != nil {
return nil, errors.Annotatef(err, "Error parsing file %v", configfile)
return nil, nil, errors.Annotatef(err, "Error parsing file %v", configfile)
}
bcf, ok := BlockChainFactories[coin]
if !ok {
return nil, errors.New(fmt.Sprint("Unsupported coin '", coin, "'. Must be one of ", reflect.ValueOf(BlockChainFactories).MapKeys()))
return nil, nil, errors.New(fmt.Sprint("Unsupported coin '", coin, "'. Must be one of ", reflect.ValueOf(BlockChainFactories).MapKeys()))
}
bc, err := bcf(config, pushHandler)
if err != nil {
return nil, err
return nil, nil, err
}
err = bc.Initialize()
if err != nil {
return nil, err
return nil, nil, err
}
return &blockChainWithMetrics{b: bc, m: metrics}, nil
mempool, err := bc.CreateMempool(bc)
if err != nil {
return nil, nil, err
}
return &blockChainWithMetrics{b: bc, m: metrics}, &mempoolWithMetrics{mempool: mempool, m: metrics}, nil
}
type blockChainWithMetrics struct {
@ -146,6 +150,14 @@ func (c *blockChainWithMetrics) Initialize() error {
return c.b.Initialize()
}
func (c *blockChainWithMetrics) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, error) {
return c.b.CreateMempool(chain)
}
func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
return c.b.InitializeMempool(addrDescForOutpoint, onNewTxAddr)
}
func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error {
return c.b.Shutdown(ctx)
}
@ -201,9 +213,9 @@ func (c *blockChainWithMetrics) GetBlockInfo(hash string) (v *bchain.BlockInfo,
return c.b.GetBlockInfo(hash)
}
func (c *blockChainWithMetrics) GetMempool() (v []string, err error) {
defer func(s time.Time) { c.observeRPCLatency("GetMempool", s, err) }(time.Now())
return c.b.GetMempool()
func (c *blockChainWithMetrics) GetMempoolTransactions() (v []string, err error) {
defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactions", s, err) }(time.Now())
return c.b.GetMempoolTransactions()
}
func (c *blockChainWithMetrics) GetTransaction(txid string) (v *bchain.Tx, err error) {
@ -236,25 +248,6 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err
return c.b.SendRawTransaction(tx)
}
func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) {
defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now())
count, err = c.b.ResyncMempool(onNewTxAddr)
if err == nil {
c.m.MempoolSize.Set(float64(count))
}
return count, err
}
func (c *blockChainWithMetrics) GetMempoolTransactions(address string) (v []bchain.Outpoint, err error) {
defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactions", s, err) }(time.Now())
return c.b.GetMempoolTransactions(address)
}
func (c *blockChainWithMetrics) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) (v []bchain.Outpoint, err error) {
defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactionsForAddrDesc", s, err) }(time.Now())
return c.b.GetMempoolTransactionsForAddrDesc(addrDesc)
}
func (c *blockChainWithMetrics) GetMempoolEntry(txid string) (v *bchain.MempoolEntry, err error) {
defer func(s time.Time) { c.observeRPCLatency("GetMempoolEntry", s, err) }(time.Now())
return c.b.GetMempoolEntry(txid)
@ -288,3 +281,44 @@ func (c *blockChainWithMetrics) EthereumTypeGetErc20ContractBalance(addrDesc, co
defer func(s time.Time) { c.observeRPCLatency("EthereumTypeGetErc20ContractInfo", s, err) }(time.Now())
return c.b.EthereumTypeGetErc20ContractBalance(addrDesc, contractDesc)
}
type mempoolWithMetrics struct {
mempool bchain.Mempool
m *common.Metrics
}
func (c *mempoolWithMetrics) observeRPCLatency(method string, start time.Time, err error) {
var e string
if err != nil {
e = err.Error()
}
c.m.RPCLatency.With(common.Labels{"method": method, "error": e}).Observe(float64(time.Since(start)) / 1e6) // in milliseconds
}
func (c *mempoolWithMetrics) Resync() (count int, err error) {
defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now())
count, err = c.mempool.Resync()
if err == nil {
c.m.MempoolSize.Set(float64(count))
}
return count, err
}
func (c *mempoolWithMetrics) GetTransactions(address string) (v []bchain.Outpoint, err error) {
defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactions", s, err) }(time.Now())
return c.mempool.GetTransactions(address)
}
func (c *mempoolWithMetrics) GetAddrDescTransactions(addrDesc bchain.AddressDescriptor) (v []bchain.Outpoint, err error) {
defer func(s time.Time) { c.observeRPCLatency("GetMempoolTransactionsForAddrDesc", s, err) }(time.Now())
return c.mempool.GetAddrDescTransactions(addrDesc)
}
func (c *mempoolWithMetrics) GetAllEntries() (v bchain.MempoolTxidEntries) {
defer func(s time.Time) { c.observeRPCLatency("GetAllEntries", s, nil) }(time.Now())
return c.mempool.GetAllEntries()
}
func (c *mempoolWithMetrics) GetTransactionTime(txid string) uint32 {
return c.mempool.GetTransactionTime(txid)
}

View File

@ -218,7 +218,7 @@ func TestGetAddressesFromAddrDesc(t *testing.T) {
{
name: "OP_RETURN omni simple send tether",
args: args{script: "6a146f6d6e69000000000000001f00000709bb647351"},
want: []string{"OMNI Simple Send 77383.80022609 TetherUS (#31)"},
want: []string{"OMNI Simple Send: 77383.80022609 TetherUS (#31)"},
want2: false,
wantErr: false,
},

View File

@ -101,37 +101,15 @@ func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationT
return s, nil
}
// GetChainInfoAndInitializeMempool is called by Initialize and reused by other coins
// it contacts the blockchain rpc interface for the first time
// and if successful it connects to ZeroMQ and creates mempool handler
func (b *BitcoinRPC) GetChainInfoAndInitializeMempool(bc bchain.BlockChain) (string, error) {
// try to connect to block chain and get some info
ci, err := bc.GetChainInfo()
if err != nil {
return "", err
}
chainName := ci.Chain
mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler)
if err != nil {
glog.Error("mq: ", err)
return "", err
}
b.mq = mq
b.Mempool = bchain.NewMempoolBitcoinType(bc, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers)
return chainName, nil
}
// Initialize initializes BitcoinRPC instance.
func (b *BitcoinRPC) Initialize() error {
b.ChainConfig.SupportsEstimateFee = false
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)
@ -152,6 +130,33 @@ func (b *BitcoinRPC) Initialize() error {
return nil
}
// CreateMempool creates mempool if not already created, however does not initialize it
func (b *BitcoinRPC) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, error) {
if b.Mempool == nil {
b.Mempool = bchain.NewMempoolBitcoinType(chain, b.ChainConfig.MempoolWorkers, b.ChainConfig.MempoolSubWorkers)
}
return b.Mempool, nil
}
// InitializeMempool creates ZeroMQ subscription and sets AddrDescForOutpointFunc to the Mempool
func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
if b.Mempool == nil {
return errors.New("Mempool not created")
}
b.Mempool.AddrDescForOutpoint = addrDescForOutpoint
b.Mempool.OnNewTxAddr = onNewTxAddr
if b.mq == nil {
mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler)
if err != nil {
glog.Error("mq: ", err)
return err
}
b.mq = mq
}
return nil
}
// Shutdown ZeroMQ and other resources
func (b *BitcoinRPC) Shutdown(ctx context.Context) error {
if b.mq != nil {
if err := b.mq.Shutdown(ctx); err != nil {
@ -162,10 +167,12 @@ func (b *BitcoinRPC) Shutdown(ctx context.Context) error {
return nil
}
// GetCoinName returns the coin name
func (b *BitcoinRPC) GetCoinName() string {
return b.ChainConfig.CoinName
}
// GetSubversion returns the backend subversion
func (b *BitcoinRPC) GetSubversion() string {
return b.ChainConfig.Subversion
}
@ -458,6 +465,7 @@ func (b *BitcoinRPC) GetChainInfo() (*bchain.ChainInfo, error) {
return rv, nil
}
// IsErrBlockNotFound returns true if error means block was not found
func IsErrBlockNotFound(err *bchain.RPCError) bool {
return err.Message == "Block not found" ||
err.Message == "Block height out of range"
@ -634,8 +642,8 @@ func (b *BitcoinRPC) GetBlockFull(hash string) (*bchain.Block, error) {
return &res.Result, nil
}
// GetMempool returns transactions in mempool
func (b *BitcoinRPC) GetMempool() ([]string, error) {
// GetMempoolTransactions returns transactions in mempool
func (b *BitcoinRPC) GetMempoolTransactions() ([]string, error) {
glog.V(1).Info("rpc: getrawmempool")
res := ResGetMempool{}
@ -651,6 +659,7 @@ func (b *BitcoinRPC) GetMempool() ([]string, error) {
return res.Result, nil
}
// IsMissingTx return true if error means missing tx
func IsMissingTx(err *bchain.RPCError) bool {
if err.Code == -5 { // "No such mempool or blockchain transaction"
return true
@ -732,23 +741,6 @@ func (b *BitcoinRPC) getRawTransaction(txid string) (json.RawMessage, error) {
return res.Result, nil
}
// ResyncMempool gets mempool transactions and maps output scripts to transactions.
// ResyncMempool is not reentrant, it should be called from a single thread.
// Return value is number of transactions in mempool
func (b *BitcoinRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) {
return b.Mempool.Resync(onNewTxAddr)
}
// GetMempoolTransactions returns slice of mempool transactions for given address
func (b *BitcoinRPC) GetMempoolTransactions(address string) ([]bchain.Outpoint, error) {
return b.Mempool.GetTransactions(address)
}
// GetMempoolTransactionsForAddrDesc returns slice of mempool transactions for given address descriptor
func (b *BitcoinRPC) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) ([]bchain.Outpoint, error) {
return b.Mempool.GetAddrDescTransactions(addrDesc)
}
// EstimateSmartFee returns fee estimation
func (b *BitcoinRPC) EstimateSmartFee(blocks int, conservative bool) (big.Int, error) {
// use EstimateFee if EstimateSmartFee is not supported
@ -810,7 +802,7 @@ func (b *BitcoinRPC) EstimateFee(blocks int) (big.Int, error) {
return r, nil
}
// SendRawTransaction sends raw transaction.
// SendRawTransaction sends raw transaction
func (b *BitcoinRPC) SendRawTransaction(tx string) (string, error) {
glog.V(1).Info("rpc: sendrawtransaction")

View File

@ -29,10 +29,11 @@ func NewBGoldRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp
// Initialize initializes BGoldRPC instance.
func (b *BGoldRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)

View File

@ -34,10 +34,11 @@ func NewDashRPC(config json.RawMessage, pushHandler func(bchain.NotificationType
// Initialize initializes DashRPC instance.
func (b *DashRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewDigiByteRPC(config json.RawMessage, pushHandler func(bchain.Notification
// Initialize initializes DigiByteRPC instance.
func (b *DigiByteRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewDogecoinRPC(config json.RawMessage, pushHandler func(bchain.Notification
// Initialize initializes DogecoinRPC instance.
func (b *DogecoinRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -31,32 +31,33 @@ const (
// Configuration represents json config file
type Configuration struct {
CoinName string `json:"coin_name"`
CoinShortcut string `json:"coin_shortcut"`
RPCURL string `json:"rpc_url"`
RPCTimeout int `json:"rpc_timeout"`
BlockAddressesToKeep int `json:"block_addresses_to_keep"`
CoinName string `json:"coin_name"`
CoinShortcut string `json:"coin_shortcut"`
RPCURL string `json:"rpc_url"`
RPCTimeout int `json:"rpc_timeout"`
BlockAddressesToKeep int `json:"block_addresses_to_keep"`
MempoolTxTimeoutHours int `json:"mempoolTxTimeoutHours"`
QueryBackendOnMempoolResync bool `json:"queryBackendOnMempoolResync"`
}
// EthereumRPC is an interface to JSON-RPC eth service.
type EthereumRPC struct {
*bchain.BaseChain
client *ethclient.Client
rpc *rpc.Client
timeout time.Duration
Parser *EthereumParser
Mempool *bchain.MempoolEthereumType
bestHeaderLock sync.Mutex
bestHeader *ethtypes.Header
bestHeaderTime time.Time
chanNewBlock chan *ethtypes.Header
newBlockSubscription *rpc.ClientSubscription
chanNewTx chan ethcommon.Hash
newTxSubscription *rpc.ClientSubscription
pendingTransactions map[string]struct{}
pendingTransactionsLock sync.Mutex
ChainConfig *Configuration
isETC bool
client *ethclient.Client
rpc *rpc.Client
timeout time.Duration
Parser *EthereumParser
Mempool *bchain.MempoolEthereumType
mempoolInitialized bool
bestHeaderLock sync.Mutex
bestHeader *ethtypes.Header
bestHeaderTime time.Time
chanNewBlock chan *ethtypes.Header
newBlockSubscription *rpc.ClientSubscription
chanNewTx chan ethcommon.Hash
newTxSubscription *rpc.ClientSubscription
ChainConfig *Configuration
isETC bool
}
// NewEthereumRPC returns new EthRPC instance.
@ -78,11 +79,10 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
ec := ethclient.NewClient(rc)
s := &EthereumRPC{
BaseChain: &bchain.BaseChain{},
client: ec,
rpc: rc,
ChainConfig: &c,
pendingTransactions: make(map[string]struct{}),
BaseChain: &bchain.BaseChain{},
client: ec,
rpc: rc,
ChainConfig: &c,
}
// always create parser
@ -125,9 +125,7 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
if glog.V(2) {
glog.Info("rpc: new tx ", hex)
}
s.pendingTransactionsLock.Lock()
s.pendingTransactions[hex] = struct{}{}
s.pendingTransactionsLock.Unlock()
s.Mempool.AddTransactionToMempool(hex)
pushHandler(bchain.NotificationNewTx)
}
}()
@ -160,11 +158,40 @@ func (b *EthereumRPC) Initialize() error {
}
glog.Info("rpc: block chain ", b.Network)
return nil
}
// CreateMempool creates mempool if not already created, however does not initialize it
func (b *EthereumRPC) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, error) {
if b.Mempool == nil {
b.Mempool = bchain.NewMempoolEthereumType(chain, b.ChainConfig.MempoolTxTimeoutHours, b.ChainConfig.QueryBackendOnMempoolResync)
glog.Info("mempool created, MempoolTxTimeoutHours=", b.ChainConfig.MempoolTxTimeoutHours, ", QueryBackendOnMempoolResync=", b.ChainConfig.QueryBackendOnMempoolResync)
}
return b.Mempool, nil
}
// InitializeMempool creates subscriptions to newHeads and newPendingTransactions
func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
if b.Mempool == nil {
return errors.New("Mempool not created")
}
// get initial mempool transactions
txs, err := b.GetMempoolTransactions()
if err != nil {
return err
}
for _, txid := range txs {
b.Mempool.AddTransactionToMempool(txid)
}
b.Mempool.OnNewTxAddr = onNewTxAddr
if b.isETC {
glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads")
} else {
// subscriptions
if err = b.subscribe(func() (*rpc.ClientSubscription, error) {
if err := b.subscribe(func() (*rpc.ClientSubscription, error) {
// invalidate the previous subscription - it is either the first one or there was an error
b.newBlockSubscription = nil
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
@ -180,7 +207,8 @@ func (b *EthereumRPC) Initialize() error {
return err
}
}
if err = b.subscribe(func() (*rpc.ClientSubscription, error) {
if err := b.subscribe(func() (*rpc.ClientSubscription, error) {
// invalidate the previous subscription - it is either the first one or there was an error
b.newTxSubscription = nil
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
@ -196,8 +224,7 @@ func (b *EthereumRPC) Initialize() error {
return err
}
// create mempool
b.Mempool = bchain.NewMempoolEthereumType(b)
b.mempoolInitialized = true
return nil
}
@ -481,9 +508,9 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error
return nil, errors.Annotatef(err, "hash %v, height %v, txid %v", hash, height, tx.Hash)
}
btxs[i] = *btx
b.pendingTransactionsLock.Lock()
delete(b.pendingTransactions, tx.Hash)
b.pendingTransactionsLock.Unlock()
if b.mempoolInitialized {
b.Mempool.RemoveTransactionFromMempool(tx.Hash)
}
}
bbk := bchain.Block{
BlockHeader: *bbh,
@ -521,14 +548,7 @@ func (b *EthereumRPC) GetBlockInfo(hash string) (*bchain.BlockInfo, error) {
// GetTransactionForMempool returns a transaction by the transaction ID.
// It could be optimized for mempool, i.e. without block time and confirmations
func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) {
tx, err := b.GetTransaction(txid)
// if there is an error getting the tx or the tx is confirmed, remove it from pending transactions
if err == bchain.ErrTxNotFound || (tx != nil && tx.Confirmations > 0) {
b.pendingTransactionsLock.Lock()
delete(b.pendingTransactions, txid)
b.pendingTransactionsLock.Unlock()
}
return tx, err
return b.GetTransaction(txid)
}
// GetTransaction returns a transaction by the transaction ID.
@ -541,6 +561,9 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
if err != nil {
return nil, err
} else if tx == nil {
if b.mempoolInitialized {
b.Mempool.RemoveTransactionFromMempool(txid)
}
return nil, bchain.ErrTxNotFound
}
var btx *bchain.Tx
@ -607,6 +630,10 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
}
// remove tx from mempool if it is there
if b.mempoolInitialized {
b.Mempool.RemoveTransactionFromMempool(txid)
}
}
return btx, nil
}
@ -628,8 +655,8 @@ func (b *EthereumRPC) GetTransactionSpecific(tx *bchain.Tx) (json.RawMessage, er
return json.RawMessage(m), err
}
// GetMempool returns transactions in mempool
func (b *EthereumRPC) GetMempool() ([]string, error) {
// GetMempoolTransactions returns transactions in mempool
func (b *EthereumRPC) GetMempoolTransactions() ([]string, error) {
raw, err := b.getBlockRaw("pending", 0, false)
if err != nil {
return nil, err
@ -640,19 +667,7 @@ func (b *EthereumRPC) GetMempool() ([]string, error) {
return nil, err
}
}
b.pendingTransactionsLock.Lock()
// join transactions returned by getBlockRaw with pendingTransactions from subscription
for _, txid := range body.Transactions {
b.pendingTransactions[txid] = struct{}{}
}
txids := make([]string, len(b.pendingTransactions))
i := 0
for txid := range b.pendingTransactions {
txids[i] = txid
i++
}
b.pendingTransactionsLock.Unlock()
return txids, nil
return body.Transactions, nil
}
// EstimateFee returns fee estimation
@ -737,28 +752,6 @@ func (b *EthereumRPC) EthereumTypeGetNonce(addrDesc bchain.AddressDescriptor) (u
return b.client.NonceAt(ctx, ethcommon.BytesToAddress(addrDesc), nil)
}
// ResyncMempool gets mempool transactions and maps output scripts to transactions.
// ResyncMempool is not reentrant, it should be called from a single thread.
// Return value is number of transactions in mempool
func (b *EthereumRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) {
return b.Mempool.Resync(onNewTxAddr)
}
// GetMempoolTransactions returns slice of mempool transactions for given address
func (b *EthereumRPC) GetMempoolTransactions(address string) ([]bchain.Outpoint, error) {
return b.Mempool.GetTransactions(address)
}
// GetMempoolTransactionsForAddrDesc returns slice of mempool transactions for given address descriptor
func (b *EthereumRPC) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) ([]bchain.Outpoint, error) {
return b.Mempool.GetAddrDescTransactions(addrDesc)
}
// GetMempoolEntry is not supported by etherem
func (b *EthereumRPC) GetMempoolEntry(txid string) (*bchain.MempoolEntry, error) {
return nil, errors.New("GetMempoolEntry: not supported")
}
// GetChainParser returns ethereum BlockChainParser
func (b *EthereumRPC) GetChainParser() bchain.BlockChainParser {
return b.Parser

View File

@ -4,6 +4,7 @@ import (
"blockbook/bchain"
"blockbook/bchain/coins/btc"
"encoding/json"
"github.com/juju/errors"
"github.com/golang/glog"
@ -32,10 +33,11 @@ func NewFloRPC(config json.RawMessage, pushHandler func(bchain.NotificationType)
// Initialize initializes FloRPC instance.
func (f *FloRPC) Initialize() error {
chainName, err := f.GetChainInfoAndInitializeMempool(f)
ci, err := f.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewFujicoinRPC(config json.RawMessage, pushHandler func(bchain.Notification
// Initialize initializes FujicoinRPC instance.
func (b *FujicoinRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewGameCreditsRPC(config json.RawMessage, pushHandler func(bchain.Notificat
// Initialize initializes GameCreditsRPC instance.
func (b *GameCreditsRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -27,10 +27,11 @@ func NewGroestlcoinRPC(config json.RawMessage, pushHandler func(bchain.Notificat
// Initialize initializes GroestlcoinRPC instance.
func (g *GroestlcoinRPC) Initialize() error {
chainName, err := g.GetChainInfoAndInitializeMempool(g)
ci, err := g.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)

View File

@ -28,10 +28,11 @@ func NewKotoRPC(config json.RawMessage, pushHandler func(bchain.NotificationType
// Initialize initializes KotoRPC instance.
func (z *KotoRPC) Initialize() error {
chainName, err := z.GetChainInfoAndInitializeMempool(z)
ci, err := z.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewLiquidRPC(config json.RawMessage, pushHandler func(bchain.NotificationTy
// Initialize initializes GameCreditsRPC instance.
func (b *LiquidRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewLitecoinRPC(config json.RawMessage, pushHandler func(bchain.Notification
// Initialize initializes LitecoinRPC instance.
func (b *LitecoinRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewMonacoinRPC(config json.RawMessage, pushHandler func(bchain.Notification
// Initialize initializes MonacoinRPC instance.
func (b *MonacoinRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewMyriadRPC(config json.RawMessage, pushHandler func(bchain.NotificationTy
// Initialize initializes MyriadRPC instance.
func (b *MyriadRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewNamecoinRPC(config json.RawMessage, pushHandler func(bchain.Notification
// Initialize initializes NamecoinRPC instance.
func (b *NamecoinRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -5,9 +5,8 @@ import (
"blockbook/bchain/coins/btc"
"bytes"
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/gobuffalo/packr/v2/file/resolver/encoding/hex"
"github.com/juju/errors"
"io"
"io/ioutil"
"math/big"
@ -17,6 +16,8 @@ import (
"strconv"
"time"
"github.com/juju/errors"
"github.com/golang/glog"
)

View File

@ -32,10 +32,11 @@ func NewPivXRPC(config json.RawMessage, pushHandler func(bchain.NotificationType
// Initialize initializes PivXRPC instance.
func (b *PivXRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewPolisRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp
// Initialize initializes PolisRPC instance.
func (b *PolisRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)

View File

@ -4,6 +4,7 @@ import (
"blockbook/bchain"
"blockbook/bchain/coins/btc"
"encoding/json"
"github.com/golang/glog"
)
@ -30,10 +31,11 @@ func NewQtumRPC(config json.RawMessage, pushHandler func(bchain.NotificationType
// Initialize initializes QtumRPC instance.
func (b *QtumRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)

View File

@ -31,10 +31,11 @@ func NewVertcoinRPC(config json.RawMessage, pushHandler func(bchain.Notification
// Initialize initializes VertcoinRPC instance.
func (b *VertcoinRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)

View File

@ -36,10 +36,11 @@ func NewZcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp
}
func (zc *ZcoinRPC) Initialize() error {
chainName, err := zc.GetChainInfoAndInitializeMempool(zc)
ci, err := zc.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)

View File

@ -30,10 +30,11 @@ func NewZCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp
// Initialize initializes ZCashRPC instance
func (z *ZCashRPC) Initialize() error {
chainName, err := z.GetChainInfoAndInitializeMempool(z)
ci, err := z.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain
params := GetChainParams(chainName)

View File

@ -1,38 +1,28 @@
package bchain
import (
"sync"
"time"
"github.com/golang/glog"
)
type addrIndex struct {
addrDesc string
n int32
}
type txidio struct {
txid string
io []addrIndex
}
// MempoolBitcoinType is mempool handle.
type MempoolBitcoinType struct {
chain BlockChain
mux sync.Mutex
txToInputOutput map[string][]addrIndex
addrDescToTx map[string][]Outpoint
chanTxid chan string
chanAddrIndex chan txidio
onNewTxAddr OnNewTxAddrFunc
BaseMempool
chanTxid chan string
chanAddrIndex chan txidio
AddrDescForOutpoint AddrDescForOutpointFunc
}
// NewMempoolBitcoinType creates new mempool handler.
// For now there is no cleanup of sync routines, the expectation is that the mempool is created only once per process
func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *MempoolBitcoinType {
m := &MempoolBitcoinType{
chain: chain,
BaseMempool: BaseMempool{
chain: chain,
txEntries: make(map[string]txEntry),
addrDescToTx: make(map[string][]Outpoint),
},
chanTxid: make(chan string, 1),
chanAddrIndex: make(chan txidio, 1),
}
@ -61,44 +51,26 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *Mempo
return m
}
// GetTransactions returns slice of mempool transactions for given address
func (m *MempoolBitcoinType) GetTransactions(address string) ([]Outpoint, error) {
parser := m.chain.GetChainParser()
addrDesc, err := parser.GetAddrDescFromAddress(address)
if err != nil {
return nil, err
}
return m.GetAddrDescTransactions(addrDesc)
}
// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor
func (m *MempoolBitcoinType) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) {
m.mux.Lock()
defer m.mux.Unlock()
return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil
}
func (m *MempoolBitcoinType) updateMappings(newTxToInputOutput map[string][]addrIndex, newAddrDescToTx map[string][]Outpoint) {
m.mux.Lock()
defer m.mux.Unlock()
m.txToInputOutput = newTxToInputOutput
m.addrDescToTx = newAddrDescToTx
}
func (m *MempoolBitcoinType) getInputAddress(input Outpoint) *addrIndex {
itx, err := m.chain.GetTransactionForMempool(input.Txid)
if err != nil {
glog.Error("cannot get transaction ", input.Txid, ": ", err)
return nil
var addrDesc AddressDescriptor
if m.AddrDescForOutpoint != nil {
addrDesc = m.AddrDescForOutpoint(input)
}
if int(input.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout)
return nil
}
addrDesc, err := m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout])
if err != nil {
glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err)
return nil
if addrDesc == nil {
itx, err := m.chain.GetTransactionForMempool(input.Txid)
if err != nil {
glog.Error("cannot get transaction ", input.Txid, ": ", err)
return nil
}
if int(input.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout)
return nil
}
addrDesc, err = m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout])
if err != nil {
glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err)
return nil
}
}
return &addrIndex{string(addrDesc), ^input.Vout}
@ -121,8 +93,8 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch
if len(addrDesc) > 0 {
io = append(io, addrIndex{string(addrDesc), int32(output.N)})
}
if m.onNewTxAddr != nil {
m.onNewTxAddr(tx, addrDesc)
if m.OnNewTxAddr != nil {
m.OnNewTxAddr(tx, addrDesc)
}
}
dispatched := 0
@ -159,37 +131,38 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch
// Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
func (m *MempoolBitcoinType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
func (m *MempoolBitcoinType) Resync() (int, error) {
start := time.Now()
glog.V(1).Info("mempool: resync")
m.onNewTxAddr = onNewTxAddr
txs, err := m.chain.GetMempool()
txs, err := m.chain.GetMempoolTransactions()
if err != nil {
return 0, err
}
glog.V(2).Info("mempool: resync ", len(txs), " txs")
// allocate slightly larger capacity of the maps
newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5)
newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5)
dispatched := 0
onNewData := func(txid string, io []addrIndex) {
if len(io) > 0 {
newTxToInputOutput[txid] = io
for _, si := range io {
newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n})
onNewEntry := func(txid string, entry txEntry) {
if len(entry.addrIndexes) > 0 {
m.mux.Lock()
m.txEntries[txid] = entry
for _, si := range entry.addrIndexes {
m.addrDescToTx[si.addrDesc] = append(m.addrDescToTx[si.addrDesc], Outpoint{txid, si.n})
}
m.mux.Unlock()
}
}
txsMap := make(map[string]struct{}, len(txs))
dispatched := 0
txTime := uint32(time.Now().Unix())
// get transaction in parallel using goroutines created in NewUTXOMempool
for _, txid := range txs {
io, exists := m.txToInputOutput[txid]
txsMap[txid] = struct{}{}
_, exists := m.txEntries[txid]
if !exists {
loop:
for {
select {
// store as many processed transactions as possible
case tio := <-m.chanAddrIndex:
onNewData(tio.txid, tio.io)
onNewEntry(tio.txid, txEntry{tio.io, txTime})
dispatched--
// send transaction to be processed
case m.chanTxid <- txid:
@ -197,16 +170,20 @@ func (m *MempoolBitcoinType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
break loop
}
}
} else {
onNewData(txid, io)
}
}
for i := 0; i < dispatched; i++ {
tio := <-m.chanAddrIndex
onNewData(tio.txid, tio.io)
onNewEntry(tio.txid, txEntry{tio.io, txTime})
}
m.updateMappings(newTxToInputOutput, newAddrDescToTx)
m.onNewTxAddr = nil
glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool")
return len(m.txToInputOutput), nil
for txid, entry := range m.txEntries {
if _, exists := txsMap[txid]; !exists {
m.mux.Lock()
m.removeEntryFromMempool(txid, entry)
m.mux.Unlock()
}
}
glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool")
return len(m.txEntries), nil
}

View File

@ -1,47 +1,34 @@
package bchain
import (
"sync"
"time"
"github.com/golang/glog"
)
const mempoolTimeoutRunPeriod = 10 * time.Minute
// MempoolEthereumType is mempool handle of EthereumType chains
type MempoolEthereumType struct {
chain BlockChain
mux sync.Mutex
txToInputOutput map[string][]addrIndex
addrDescToTx map[string][]Outpoint
BaseMempool
mempoolTimeoutTime time.Duration
queryBackendOnResync bool
nextTimeoutRun time.Time
}
// NewMempoolEthereumType creates new mempool handler.
func NewMempoolEthereumType(chain BlockChain) *MempoolEthereumType {
return &MempoolEthereumType{chain: chain}
}
// GetTransactions returns slice of mempool transactions for given address
func (m *MempoolEthereumType) GetTransactions(address string) ([]Outpoint, error) {
parser := m.chain.GetChainParser()
addrDesc, err := parser.GetAddrDescFromAddress(address)
if err != nil {
return nil, err
func NewMempoolEthereumType(chain BlockChain, mempoolTxTimeoutHours int, queryBackendOnResync bool) *MempoolEthereumType {
mempoolTimeoutTime := time.Duration(mempoolTxTimeoutHours) * time.Hour
return &MempoolEthereumType{
BaseMempool: BaseMempool{
chain: chain,
txEntries: make(map[string]txEntry),
addrDescToTx: make(map[string][]Outpoint),
},
mempoolTimeoutTime: mempoolTimeoutTime,
queryBackendOnResync: queryBackendOnResync,
nextTimeoutRun: time.Now().Add(mempoolTimeoutTime),
}
return m.GetAddrDescTransactions(addrDesc)
}
// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor
func (m *MempoolEthereumType) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) {
m.mux.Lock()
defer m.mux.Unlock()
return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil
}
func (m *MempoolEthereumType) updateMappings(newTxToInputOutput map[string][]addrIndex, newAddrDescToTx map[string][]Outpoint) {
m.mux.Lock()
defer m.mux.Unlock()
m.txToInputOutput = newTxToInputOutput
m.addrDescToTx = newAddrDescToTx
}
func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) []addrIndex {
@ -56,73 +43,117 @@ func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) [
return io
}
// Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
start := time.Now()
glog.V(1).Info("Mempool: resync")
txs, err := m.chain.GetMempool()
func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry, bool) {
tx, err := m.chain.GetTransactionForMempool(txid)
if err != nil {
return 0, err
if err != ErrTxNotFound {
glog.Warning("cannot get transaction ", txid, ": ", err)
}
return txEntry{}, false
}
parser := m.chain.GetChainParser()
// allocate slightly larger capacity of the maps
newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5)
newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5)
for _, txid := range txs {
io, exists := m.txToInputOutput[txid]
if !exists {
tx, err := m.chain.GetTransactionForMempool(txid)
if err != nil {
if err != ErrTxNotFound {
glog.Warning("cannot get transaction ", txid, ": ", err)
}
continue
}
io = make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin))
for _, output := range tx.Vout {
addrDesc, err := parser.GetAddrDescFromVout(&output)
if err != nil {
if err != ErrAddressMissing {
glog.Error("error in output addrDesc in ", txid, " ", output.N, ": ", err)
}
continue
}
if len(addrDesc) > 0 {
io = append(io, addrIndex{string(addrDesc), int32(output.N)})
}
}
for _, input := range tx.Vin {
for i, a := range input.Addresses {
appendAddress(io, ^int32(i), a, parser)
}
}
t, err := parser.EthereumTypeGetErc20FromTx(tx)
if err != nil {
glog.Error("GetErc20FromTx for tx ", txid, ", ", err)
} else {
for i := range t {
io = appendAddress(io, ^int32(i+1), t[i].From, parser)
io = appendAddress(io, int32(i+1), t[i].To, parser)
}
}
if onNewTxAddr != nil {
sent := make(map[string]struct{})
for _, si := range io {
if _, found := sent[si.addrDesc]; !found {
onNewTxAddr(tx, AddressDescriptor(si.addrDesc))
sent[si.addrDesc] = struct{}{}
}
}
addrIndexes := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin))
for _, output := range tx.Vout {
addrDesc, err := parser.GetAddrDescFromVout(&output)
if err != nil {
if err != ErrAddressMissing {
glog.Error("error in output addrDesc in ", txid, " ", output.N, ": ", err)
}
continue
}
newTxToInputOutput[txid] = io
for _, si := range io {
newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n})
if len(addrDesc) > 0 {
addrIndexes = append(addrIndexes, addrIndex{string(addrDesc), int32(output.N)})
}
}
m.updateMappings(newTxToInputOutput, newAddrDescToTx)
glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool")
return len(m.txToInputOutput), nil
for _, input := range tx.Vin {
for i, a := range input.Addresses {
addrIndexes = appendAddress(addrIndexes, ^int32(i), a, parser)
}
}
t, err := parser.EthereumTypeGetErc20FromTx(tx)
if err != nil {
glog.Error("GetErc20FromTx for tx ", txid, ", ", err)
} else {
for i := range t {
addrIndexes = appendAddress(addrIndexes, ^int32(i+1), t[i].From, parser)
addrIndexes = appendAddress(addrIndexes, int32(i+1), t[i].To, parser)
}
}
if m.OnNewTxAddr != nil {
sent := make(map[string]struct{})
for _, si := range addrIndexes {
if _, found := sent[si.addrDesc]; !found {
m.OnNewTxAddr(tx, AddressDescriptor(si.addrDesc))
sent[si.addrDesc] = struct{}{}
}
}
}
return txEntry{addrIndexes: addrIndexes, time: txTime}, true
}
// Resync ethereum type removes timed out transactions and returns number of transactions in mempool.
// Transactions are added/removed by AddTransactionToMempool/RemoveTransactionFromMempool methods
func (m *MempoolEthereumType) Resync() (int, error) {
if m.queryBackendOnResync {
txs, err := m.chain.GetMempoolTransactions()
if err != nil {
return 0, err
}
for _, txid := range txs {
m.AddTransactionToMempool(txid)
}
}
m.mux.Lock()
entries := len(m.txEntries)
now := time.Now()
if m.nextTimeoutRun.Before(now) {
threshold := now.Add(-m.mempoolTimeoutTime)
for txid, entry := range m.txEntries {
if time.Unix(int64(entry.time), 0).Before(threshold) {
m.removeEntryFromMempool(txid, entry)
}
}
removed := entries - len(m.txEntries)
entries = len(m.txEntries)
glog.Info("Mempool: cleanup, removed ", removed, " transactions from mempool")
m.nextTimeoutRun = now.Add(mempoolTimeoutRunPeriod)
}
m.mux.Unlock()
glog.Info("Mempool: resync ", entries, " transactions in mempool")
return entries, nil
}
// AddTransactionToMempool adds transactions to mempool
func (m *MempoolEthereumType) AddTransactionToMempool(txid string) {
m.mux.Lock()
_, exists := m.txEntries[txid]
m.mux.Unlock()
if glog.V(1) {
glog.Info("AddTransactionToMempool ", txid, ", existed ", exists)
}
if !exists {
entry, ok := m.createTxEntry(txid, uint32(time.Now().Unix()))
if !ok {
return
}
m.mux.Lock()
m.txEntries[txid] = entry
for _, si := range entry.addrIndexes {
m.addrDescToTx[si.addrDesc] = append(m.addrDescToTx[si.addrDesc], Outpoint{txid, si.n})
}
m.mux.Unlock()
}
}
// RemoveTransactionFromMempool removes transaction from mempool
func (m *MempoolEthereumType) RemoveTransactionFromMempool(txid string) {
m.mux.Lock()
entry, exists := m.txEntries[txid]
if glog.V(1) {
glog.Info("RemoveTransactionFromMempool ", txid, ", existed ", exists)
}
if exists {
m.removeEntryFromMempool(txid, entry)
}
m.mux.Unlock()
}

View File

@ -185,16 +185,34 @@ type Erc20Transfer struct {
Tokens big.Int
}
// MempoolTxidEntry contains mempool txid with first seen time
type MempoolTxidEntry struct {
Txid string
Time uint32
}
// MempoolTxidEntries is array of MempoolTxidEntry
type MempoolTxidEntries []MempoolTxidEntry
// OnNewBlockFunc is used to send notification about a new block
type OnNewBlockFunc func(hash string, height uint32)
// OnNewTxAddrFunc is used to send notification about a new transaction/address
type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor)
// AddrDescForOutpointFunc defines function that returns address descriptorfor given outpoint or nil if outpoint not found
type AddrDescForOutpointFunc func(outpoint Outpoint) AddressDescriptor
// BlockChain defines common interface to block chain daemon
type BlockChain interface {
// life-cycle methods
// initialize the block chain connector
Initialize() error
// create mempool but do not initialize it
CreateMempool(BlockChain) (Mempool, error)
// initialize mempool, create ZeroMQ (or other) subscription
InitializeMempool(AddrDescForOutpointFunc, OnNewTxAddrFunc) error
// shutdown mempool, ZeroMQ and block chain connections
Shutdown(ctx context.Context) error
// chain info
IsTestnet() bool
@ -209,17 +227,13 @@ type BlockChain interface {
GetBlockHeader(hash string) (*BlockHeader, error)
GetBlock(hash string, height uint32) (*Block, error)
GetBlockInfo(hash string) (*BlockInfo, error)
GetMempool() ([]string, error)
GetMempoolTransactions() ([]string, error)
GetTransaction(txid string) (*Tx, error)
GetTransactionForMempool(txid string) (*Tx, error)
GetTransactionSpecific(tx *Tx) (json.RawMessage, error)
EstimateSmartFee(blocks int, conservative bool) (big.Int, error)
EstimateFee(blocks int) (big.Int, error)
SendRawTransaction(tx string) (string, error)
// mempool
ResyncMempool(onNewTxAddr OnNewTxAddrFunc) (int, error)
GetMempoolTransactions(address string) ([]Outpoint, error)
GetMempoolTransactionsForAddrDesc(addrDesc AddressDescriptor) ([]Outpoint, error)
GetMempoolEntry(txid string) (*MempoolEntry, error)
// parser
GetChainParser() BlockChainParser
@ -270,3 +284,12 @@ type BlockChainParser interface {
// EthereumType specific
EthereumTypeGetErc20FromTx(tx *Tx) ([]Erc20Transfer, error)
}
// Mempool defines common interface to mempool
type Mempool interface {
Resync() (int, error)
GetTransactions(address string) ([]Outpoint, error)
GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error)
GetAllEntries() MempoolTxidEntries
GetTransactionTime(txid string) uint32
}

View File

@ -82,6 +82,7 @@ var (
chanSyncMempoolDone = make(chan struct{})
chanStoreInternalStateDone = make(chan struct{})
chain bchain.BlockChain
mempool bchain.Mempool
index *db.RocksDB
txCache *db.TxCache
metrics *common.Metrics
@ -98,29 +99,6 @@ func init() {
glog.CopyStandardLogTo("INFO")
}
func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, error) {
var chain bchain.BlockChain
var err error
timer := time.NewTimer(time.Second)
for i := 0; ; i++ {
if chain, err = coins.NewBlockChain(coin, configfile, pushHandler, metrics); err != nil {
if i < seconds {
glog.Error("rpc: ", err, " Retrying...")
select {
case <-chanOsSignal:
return nil, errors.New("Interrupted")
case <-timer.C:
timer.Reset(time.Second)
continue
}
} else {
return nil, err
}
}
return chain, nil
}
}
func main() {
flag.Parse()
@ -141,34 +119,40 @@ func main() {
if *repair {
if err := db.RepairRocksDB(*dbPath); err != nil {
glog.Fatalf("RepairRocksDB %s: %v", *dbPath, err)
glog.Errorf("RepairRocksDB %s: %v", *dbPath, err)
return
}
return
}
if *blockchain == "" {
glog.Fatal("Missing blockchaincfg configuration parameter")
glog.Error("Missing blockchaincfg configuration parameter")
return
}
coin, coinShortcut, coinLabel, err := coins.GetCoinNameFromConfig(*blockchain)
if err != nil {
glog.Fatal("config: ", err)
glog.Error("config: ", err)
return
}
// gspt.SetProcTitle("blockbook-" + normalizeName(coin))
metrics, err = common.GetMetrics(coin)
if err != nil {
glog.Fatal("metrics: ", err)
glog.Error("metrics: ", err)
return
}
if chain, err = getBlockChainWithRetry(coin, *blockchain, pushSynchronizationHandler, metrics, 60); err != nil {
glog.Fatal("rpc: ", err)
if chain, mempool, err = getBlockChainWithRetry(coin, *blockchain, pushSynchronizationHandler, metrics, 60); err != nil {
glog.Error("rpc: ", err)
return
}
index, err = db.NewRocksDB(*dbPath, *dbCache, *dbMaxOpenFiles, chain.GetChainParser(), metrics)
if err != nil {
glog.Fatal("rocksDB: ", err)
glog.Error("rocksDB: ", err)
return
}
defer index.Close()
@ -198,40 +182,20 @@ func main() {
syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics, internalState)
if err != nil {
glog.Fatalf("NewSyncWorker %v", err)
glog.Errorf("NewSyncWorker %v", err)
return
}
// set the DbState to open at this moment, after all important workers are initialized
internalState.DbState = common.DbStateOpen
err = index.StoreInternalState(internalState)
if err != nil {
glog.Fatal("internalState: ", err)
glog.Error("internalState: ", err)
return
}
if *rollbackHeight >= 0 {
bestHeight, bestHash, err := index.GetBestBlock()
if err != nil {
glog.Error("rollbackHeight: ", err)
return
}
if uint32(*rollbackHeight) > bestHeight {
glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight)
} else {
hashes := []string{bestHash}
for height := bestHeight - 1; height >= uint32(*rollbackHeight); height-- {
hash, err := index.GetBlockHash(height)
if err != nil {
glog.Error("rollbackHeight: ", err)
return
}
hashes = append(hashes, hash)
}
err = syncWorker.DisconnectBlocks(uint32(*rollbackHeight), bestHeight, hashes)
if err != nil {
glog.Error("rollbackHeight: ", err)
return
}
}
performRollback()
return
}
@ -247,45 +211,20 @@ func main() {
var internalServer *server.InternalServer
if *internalBinding != "" {
internalServer, err = server.NewInternalServer(*internalBinding, *certFiles, index, chain, txCache, internalState)
internalServer, err = startInternalServer()
if err != nil {
glog.Error("https: ", err)
glog.Error("internal server: ", err)
return
}
go func() {
err = internalServer.Run()
if err != nil {
if err.Error() == "http: Server closed" {
glog.Info("internal server: closed")
} else {
glog.Error(err)
return
}
}
}()
}
var publicServer *server.PublicServer
if *publicBinding != "" {
// start public server in limited functionality, extend it after sync is finished by calling ConnectFullPublicInterface
publicServer, err = server.NewPublicServer(*publicBinding, *certFiles, index, chain, txCache, *explorerURL, metrics, internalState, *debugMode)
publicServer, err = startPublicServer()
if err != nil {
glog.Error("socketio: ", err)
glog.Error("public server: ", err)
return
}
go func() {
err = publicServer.Run()
if err != nil {
if err.Error() == "http: Server closed" {
glog.Info("public server: closed")
} else {
glog.Error(err)
return
}
}
}()
callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock)
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr)
}
if *synchronize {
@ -295,8 +234,18 @@ func main() {
glog.Error("resyncIndex ", err)
return
}
// initialize mempool after the initial sync is complete
var addrDescForOutpoint bchain.AddrDescForOutpointFunc
if chain.GetChainParser().GetChainType() == bchain.ChainBitcoinType {
addrDescForOutpoint = index.AddrDescForOutpoint
}
err = chain.InitializeMempool(addrDescForOutpoint, onNewTxAddr)
if err != nil {
glog.Error("initializeMempool ", err)
return
}
var mempoolCount int
if mempoolCount, err = chain.ResyncMempool(nil); err != nil {
if mempoolCount, err = mempool.Resync(); err != nil {
glog.Error("resyncMempool ", err)
return
}
@ -307,8 +256,10 @@ func main() {
}
go storeInternalStateLoop()
if *publicBinding != "" {
if publicServer != nil {
// start full public interface
callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock)
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr)
publicServer.ConnectFullPublicInterface()
}
@ -341,8 +292,97 @@ func main() {
}
}
func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, bchain.Mempool, error) {
var chain bchain.BlockChain
var mempool bchain.Mempool
var err error
timer := time.NewTimer(time.Second)
for i := 0; ; i++ {
if chain, mempool, err = coins.NewBlockChain(coin, configfile, pushHandler, metrics); err != nil {
if i < seconds {
glog.Error("rpc: ", err, " Retrying...")
select {
case <-chanOsSignal:
return nil, nil, errors.New("Interrupted")
case <-timer.C:
timer.Reset(time.Second)
continue
}
} else {
return nil, nil, err
}
}
return chain, mempool, nil
}
}
func startInternalServer() (*server.InternalServer, error) {
internalServer, err := server.NewInternalServer(*internalBinding, *certFiles, index, chain, mempool, txCache, internalState)
if err != nil {
return nil, err
}
go func() {
err = internalServer.Run()
if err != nil {
if err.Error() == "http: Server closed" {
glog.Info("internal server: closed")
} else {
glog.Error(err)
return
}
}
}()
return internalServer, nil
}
func startPublicServer() (*server.PublicServer, error) {
// start public server in limited functionality, extend it after sync is finished by calling ConnectFullPublicInterface
publicServer, err := server.NewPublicServer(*publicBinding, *certFiles, index, chain, mempool, txCache, *explorerURL, metrics, internalState, *debugMode)
if err != nil {
return nil, err
}
go func() {
err = publicServer.Run()
if err != nil {
if err.Error() == "http: Server closed" {
glog.Info("public server: closed")
} else {
glog.Error(err)
return
}
}
}()
return publicServer, err
}
func performRollback() {
bestHeight, bestHash, err := index.GetBestBlock()
if err != nil {
glog.Error("rollbackHeight: ", err)
return
}
if uint32(*rollbackHeight) > bestHeight {
glog.Infof("nothing to rollback, rollbackHeight %d, bestHeight: %d", *rollbackHeight, bestHeight)
} else {
hashes := []string{bestHash}
for height := bestHeight - 1; height >= uint32(*rollbackHeight); height-- {
hash, err := index.GetBlockHash(height)
if err != nil {
glog.Error("rollbackHeight: ", err)
return
}
hashes = append(hashes, hash)
}
err = syncWorker.DisconnectBlocks(uint32(*rollbackHeight), bestHeight, hashes)
if err != nil {
glog.Error("rollbackHeight: ", err)
return
}
}
}
func blockbookAppInfoMetric(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error {
api, err := api.NewWorker(db, chain, txCache, is)
api, err := api.NewWorker(db, chain, mempool, txCache, is)
if err != nil {
return err
}
@ -442,7 +482,7 @@ func syncMempoolLoop() {
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
tickAndDebounce(time.Duration(*resyncMempoolPeriodMs)*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() {
internalState.StartedMempoolSync()
if count, err := chain.ResyncMempool(onNewTxAddr); err != nil {
if count, err := mempool.Resync(); err != nil {
glog.Error("syncMempoolLoop ", errors.ErrorStack(err))
} else {
internalState.FinishedMempoolSync(count)

View File

@ -1,9 +1,9 @@
{
"coin": {
"name": "Ethereum Classic",
"shortcut": "ETC",
"label": "Ethereum Classic",
"alias": "ethereum-classic"
"name": "Ethereum Classic",
"shortcut": "ETC",
"label": "Ethereum Classic",
"alias": "ethereum-classic"
},
"ports": {
"backend_rpc": 8037,
@ -41,17 +41,20 @@
"internal_binding_template": ":{{.Ports.BlockbookInternal}}",
"public_binding_template": ":{{.Ports.BlockbookPublic}}",
"explorer_url": "",
"additional_params": "-resyncindexperiod=4441",
"additional_params": "-resyncindexperiod=4441 -resyncmempoolperiod=2011",
"block_chain": {
"parse": true,
"mempool_workers": 8,
"mempool_sub_workers": 2,
"block_addresses_to_keep": 300,
"additional_params": {}
"additional_params": {
"mempoolTxTimeoutHours": 48,
"queryBackendOnMempoolResync": true
}
}
},
"meta": {
"package_maintainer": "Petr Kracik",
"package_maintainer_email": "petr.kracik@satoshilabs.com"
}
}
}

View File

@ -1,9 +1,9 @@
{
"coin": {
"name": "Ethereum",
"shortcut": "ETH",
"label": "Ethereum",
"alias": "ethereum"
"name": "Ethereum",
"shortcut": "ETH",
"label": "Ethereum",
"alias": "ethereum"
},
"ports": {
"backend_rpc": 8036,
@ -49,11 +49,14 @@
"mempool_workers": 8,
"mempool_sub_workers": 2,
"block_addresses_to_keep": 300,
"additional_params": {}
"additional_params": {
"mempoolTxTimeoutHours": 48,
"queryBackendOnMempoolResync": false
}
}
},
"meta": {
"package_maintainer": "Petr Kracik",
"package_maintainer_email": "petr.kracik@satoshilabs.com"
}
}
}

View File

@ -1,9 +1,9 @@
{
"coin": {
"name": "Ethereum Testnet Ropsten",
"shortcut": "tROP",
"label": "Ethereum Ropsten",
"alias": "ethereum_testnet_ropsten"
"name": "Ethereum Testnet Ropsten",
"shortcut": "tROP",
"label": "Ethereum Ropsten",
"alias": "ethereum_testnet_ropsten"
},
"ports": {
"backend_rpc": 18036,
@ -48,11 +48,14 @@
"mempool_workers": 8,
"mempool_sub_workers": 2,
"block_addresses_to_keep": 300,
"additional_params": {}
"additional_params": {
"mempoolTxTimeoutHours": 12,
"queryBackendOnMempoolResync": false
}
}
},
"meta": {
"package_maintainer": "Petr Kracik",
"package_maintainer_email": "petr.kracik@satoshilabs.com"
}
}
}

View File

@ -1,5 +1,5 @@
{
"version": "0.2.1",
"version": "0.2.2",
"backend_install_path": "/opt/coins/nodes",
"backend_data_path": "/opt/coins/data",
"blockbook_install_path": "/opt/coins/blockbook",

View File

@ -757,6 +757,25 @@ func (d *RocksDB) GetTxAddresses(txid string) (*TxAddresses, error) {
return d.getTxAddresses(btxID)
}
// AddrDescForOutpoint defines function that returns address descriptorfor given outpoint or nil if outpoint not found
func (d *RocksDB) AddrDescForOutpoint(outpoint bchain.Outpoint) bchain.AddressDescriptor {
ta, err := d.GetTxAddresses(outpoint.Txid)
if err != nil || ta == nil {
return nil
}
if outpoint.Vout < 0 {
vin := ^outpoint.Vout
if len(ta.Inputs) <= int(vin) {
return nil
}
return ta.Inputs[vin].AddrDesc
}
if len(ta.Outputs) <= int(outpoint.Vout) {
return nil
}
return ta.Outputs[outpoint.Vout].AddrDesc
}
func packTxAddresses(ta *TxAddresses, buf []byte, varBuf []byte) []byte {
buf = buf[:0]
l := packVaruint(uint(ta.Height), varBuf)

View File

@ -63,11 +63,15 @@ func (w *SyncWorker) ResyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b
if err == nil {
w.is.FinishedSync(bh)
}
return nil
return err
case errSynced:
// this is not actually error but flag that resync wasn't necessary
w.is.FinishedSyncNoChange()
w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk()))
if initialSync {
d := time.Since(start)
glog.Info("resync: finished in ", d)
}
return nil
}
@ -113,7 +117,8 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b
}
// if parallel operation is enabled and the number of blocks to be connected is large,
// use parallel routine to load majority of blocks
if w.syncWorkers > 1 {
// use parallel sync only in case of initial sync because it puts the db to inconsistent state
if w.syncWorkers > 1 && initialSync {
remoteBestHeight, err := w.chain.GetBestBlockHeight()
if err != nil {
return err

View File

@ -167,6 +167,10 @@ Response for Ethereum-type coins. There is always only one *vin*, only one *vout
}
```
A note about the `blocktime` field:
- for already mined transaction (`confirmations > 0`), the field `blocktime` contains time of the block
- for transactions in mempool (`confirmations == 0`), the field contains time when the running instance of Blockbook was first time notified about the transaction. This time may be different in different instances of Blockbook.
#### Get transaction specific
Returns transaction data in the exact format as returned by backend, including all coin specific fields:

View File

@ -23,13 +23,14 @@ type InternalServer struct {
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
mempool bchain.Mempool
is *common.InternalState
api *api.Worker
}
// NewInternalServer creates new internal http interface to blockbook and returns its handle
func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState) (*InternalServer, error) {
api, err := api.NewWorker(db, chain, txCache, is)
func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*InternalServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
if err != nil {
return nil, err
}
@ -47,6 +48,7 @@ func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.B
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
mempool: mempool,
is: is,
api: api,
}

View File

@ -26,6 +26,7 @@ import (
const txsOnPage = 25
const blocksOnPage = 50
const mempoolTxsOnPage = 50
const txsInAPI = 1000
const (
@ -45,6 +46,7 @@ type PublicServer struct {
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
mempool bchain.Mempool
api *api.Worker
explorerURL string
internalExplorer bool
@ -56,19 +58,19 @@ type PublicServer struct {
// NewPublicServer creates new public server http interface to blockbook and returns its handle
// only basic functionality is mapped, to map all functions, call
func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool) (*PublicServer, error) {
func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool) (*PublicServer, error) {
api, err := api.NewWorker(db, chain, txCache, is)
api, err := api.NewWorker(db, chain, mempool, txCache, is)
if err != nil {
return nil, err
}
socketio, err := NewSocketIoServer(db, chain, txCache, metrics, is)
socketio, err := NewSocketIoServer(db, chain, mempool, txCache, metrics, is)
if err != nil {
return nil, err
}
websocket, err := NewWebsocketServer(db, chain, txCache, metrics, is)
websocket, err := NewWebsocketServer(db, chain, mempool, txCache, metrics, is)
if err != nil {
return nil, err
}
@ -91,6 +93,7 @@ func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bch
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
mempool: mempool,
explorerURL: explorerURL,
internalExplorer: explorerURL == "",
metrics: metrics,
@ -137,6 +140,7 @@ func (s *PublicServer) ConnectFullPublicInterface() {
serveMux.HandleFunc(path+"block/", s.htmlTemplateHandler(s.explorerBlock))
serveMux.HandleFunc(path+"spending/", s.htmlTemplateHandler(s.explorerSpendingTx))
serveMux.HandleFunc(path+"sendtx", s.htmlTemplateHandler(s.explorerSendTx))
serveMux.HandleFunc(path+"mempool", s.htmlTemplateHandler(s.explorerMempool))
} else {
// redirect to wallet requests for tx and address, possibly to external site
serveMux.HandleFunc(path+"tx/", s.txRedirect)
@ -382,6 +386,7 @@ const (
blocksTpl
blockTpl
sendTransactionTpl
mempoolTpl
tplCount
)
@ -400,6 +405,7 @@ type TemplateData struct {
Blocks *api.Blocks
Block *api.Block
Info *api.SystemInfo
MempoolTxids *api.MempoolTxids
Page int
PrevPage int
NextPage int
@ -475,6 +481,7 @@ func (s *PublicServer) parseTemplates() []*template.Template {
t[blockTpl] = createTemplate("./static/templates/block.html", "./static/templates/txdetail.html", "./static/templates/paging.html", "./static/templates/base.html")
}
t[xpubTpl] = createTemplate("./static/templates/xpub.html", "./static/templates/txdetail.html", "./static/templates/paging.html", "./static/templates/base.html")
t[mempoolTpl] = createTemplate("./static/templates/mempool.html", "./static/templates/paging.html", "./static/templates/base.html")
return t
}
@ -796,6 +803,25 @@ func (s *PublicServer) explorerSendTx(w http.ResponseWriter, r *http.Request) (t
return sendTransactionTpl, data, nil
}
func (s *PublicServer) explorerMempool(w http.ResponseWriter, r *http.Request) (tpl, *TemplateData, error) {
var mempoolTxids *api.MempoolTxids
var err error
s.metrics.ExplorerViews.With(common.Labels{"action": "mempool"}).Inc()
page, ec := strconv.Atoi(r.URL.Query().Get("page"))
if ec != nil {
page = 0
}
mempoolTxids, err = s.api.GetMempool(page, mempoolTxsOnPage)
if err != nil {
return errorTpl, nil, err
}
data := s.newTemplateData()
data.MempoolTxids = mempoolTxids
data.Page = mempoolTxids.Page
data.PagingRange, data.PrevPage, data.NextPage = getPagingRange(mempoolTxids.Page, mempoolTxids.TotalPages)
return mempoolTpl, data, nil
}
func getPagingRange(page int, total int) ([]int, int, int) {
// total==-1 means total is unknown, show only prev/next buttons
if total >= 0 && total < 2 {

View File

@ -85,6 +85,11 @@ func setupPublicHTTPServer(t *testing.T) (*PublicServer, string) {
glog.Fatal("fakechain: ", err)
}
mempool, err := chain.CreateMempool(chain)
if err != nil {
glog.Fatal("mempool: ", err)
}
// caching is switched off because test transactions do not have hex data
txCache, err := db.NewTxCache(d, chain, metrics, is, false)
if err != nil {
@ -92,7 +97,7 @@ func setupPublicHTTPServer(t *testing.T) (*PublicServer, string) {
}
// s.Run is never called, binding can be to any port
s, err := NewPublicServer("localhost:12345", "", d, chain, txCache, "", metrics, is, false)
s, err := NewPublicServer("localhost:12345", "", d, chain, mempool, txCache, "", metrics, is, false)
if err != nil {
t.Fatal(err)
}

View File

@ -26,14 +26,15 @@ type SocketIoServer struct {
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
mempool bchain.Mempool
metrics *common.Metrics
is *common.InternalState
api *api.Worker
}
// NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle
func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) {
api, err := api.NewWorker(db, chain, txCache, is)
func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
if err != nil {
return nil, err
}
@ -64,6 +65,7 @@ func NewSocketIoServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCa
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
mempool: mempool,
metrics: metrics,
is: is,
api: api,
@ -224,7 +226,7 @@ func (s *SocketIoServer) getAddressTxids(addr []string, opts *addrOpts) (res res
return res, err
}
} else {
o, err := s.chain.GetMempoolTransactions(address)
o, err := s.mempool.GetTransactions(address)
if err != nil {
return res, err
}
@ -326,13 +328,15 @@ func txToResTx(tx *api.Tx) resTx {
outputs[i] = output
}
var h int
var blocktime int64
if tx.Confirmations == 0 {
h = -1
} else {
h = int(tx.Blockheight)
blocktime = tx.Blocktime
}
return resTx{
BlockTimestamp: tx.Blocktime,
BlockTimestamp: blocktime,
FeeSatoshis: tx.FeesSat.AsInt64(),
Hash: tx.Txid,
Height: h,

View File

@ -59,6 +59,7 @@ type WebsocketServer struct {
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
mempool bchain.Mempool
metrics *common.Metrics
is *common.InternalState
api *api.Worker
@ -70,8 +71,8 @@ type WebsocketServer struct {
}
// NewWebsocketServer creates new websocket interface to blockbook and returns its handle
func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) {
api, err := api.NewWorker(db, chain, txCache, is)
func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
if err != nil {
return nil, err
}
@ -89,6 +90,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxC
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
mempool: mempool,
metrics: metrics,
is: is,
api: api,

View File

@ -268,6 +268,11 @@ table.data-table table.data-table th {
margin: 0;
}
.h-container h5 {
margin-top: 6px;
margin-bottom: 0;
}
.page-link {
color: #428bca;
}

View File

@ -43,6 +43,10 @@
<td>Last Mempool Update</td>
<td class="data">{{formatTime $bb.LastMempoolTime}}</td>
</tr>
<tr>
<td>Transactions in Mempool</td>
<td class="data">{{if .InternalExplorer}}<a href="/mempool">{{$bb.MempoolSize}}</a>{{else}}{{$bb.MempoolSize}}{{end}}</td>
</tr>
<tr>
<td>Size On Disk</td>
<td class="data">{{$bb.DbSize}}</td>

View File

@ -0,0 +1,27 @@
{{define "specific"}}{{$txs := .MempoolTxids.Mempool}}{{$data := .}}
<h1>Mempool Transactions <small class="text-muted">by first seen time</small>
</h1>
<div class="row h-container">
<h5 class="col-md-6 col-sm-12">{{$.MempoolTxids.MempoolSize}} Transactions in mempool</h5>
<nav class="col-md-6 col-sm-12">{{template "paging" $data }}</nav>
</div>
<div class="data-div">
<table class="table table-striped data-table table-hover">
<thead>
<tr>
<th style="width: 70%;">Transaction</th>
<th style="width: 30%;">First Seen Time</span></th>
</tr>
</thead>
<tbody>
{{- range $tx := $txs -}}
<tr>
<td class="ellipsis"><a href="/tx/{{$tx.Txid}}">{{$tx.Txid}}</a></td>
<td>{{formatUnixTime $tx.Time}}</td>
</tr>
{{- end -}}
</tbody>
</table>
</div>
<nav>{{template "paging" $data }}</nav>
{{end}}

View File

@ -4,9 +4,7 @@
<div class="col-xs-7 col-md-8 ellipsis">
<a href="/tx/{{$tx.Txid}}">{{$tx.Txid}}</a>
</div>
{{- if $tx.Confirmations -}}
<div class="col-xs-5 col-md-4 text-muted text-right">mined {{formatUnixTime $tx.Blocktime}}</div>
{{- end -}}
{{- if $tx.Blocktime}}<div class="col-xs-5 col-md-4 text-muted text-right">{{if $tx.Confirmations}}mined{{else}}first seen{{end}} {{formatUnixTime $tx.Blocktime}}</div>{{end -}}
</div>
<div class="row line-mid">
<div class="col-md-5">

View File

@ -5,9 +5,7 @@
<a href="/tx/{{$tx.Txid}}">{{$tx.Txid}}</a>
{{if eq $tx.EthereumSpecific.Status 1}}<span class="text-success"></span>{{end}}{{if eq $tx.EthereumSpecific.Status 0}}<span class="text-danger"></span>{{end}}
</div>
{{- if $tx.Confirmations -}}
<div class="col-xs-5 col-md-4 text-muted text-right">mined {{formatUnixTime $tx.Blocktime}}</div>
{{- end -}}
{{- if $tx.Blocktime}}<div class="col-xs-5 col-md-4 text-muted text-right">{{if $tx.Confirmations}}mined{{else}}first seen{{end}} {{formatUnixTime $tx.Blocktime}}</div>{{end -}}
</div>
<div class="row line-mid">
<div class="col-md-4">

View File

@ -29,8 +29,8 @@
<td>Used XPUB Addresses</td>
<td class="data">{{$addr.TotalTokens}}</td>
</tr>
{{- if or $addr.Tokens $addr.TotalTokens -}}
<tr>
<tr>
{{- if or $addr.Tokens $addr.TotalTokens -}}
<td>{{if $data.NonZeroBalanceTokens}}XPUB Addresses with Balance{{else}}XPUB Addresses{{end}}</td>
<td style="padding: 0;">
<table class="table data-table">
@ -51,15 +51,17 @@
{{- end -}}
{{- if $data.NonZeroBalanceTokens -}}
<tr>
<td colspan="4"><a href="?tokens=used">Show all XPUB addresses</a></td>
<td colspan="4"><a href="?tokens=used" style="float: left; margin-right: 30px;">Show used XPUB addresses</a><a href="?tokens=derived" style="float: left;">Show derived XPUB addresses</a></td>
</tr>
{{- end -}}
</tbody>
</table>
</td>
{{- else -}}
<td></td><td><a href="?tokens=derived" style="float: left;">Show derived XPUB addresses</a></td>
{{- end -}}
</tr>
{{- end -}}
</tbody>
</tbody>
</table>
</div>
<div class="col-md-2">

View File

@ -17,10 +17,18 @@ func NewFakeBlockChain(parser bchain.BlockChainParser) (bchain.BlockChain, error
return &fakeBlockChain{&bchain.BaseChain{Parser: parser}}, nil
}
func (b *fakeBlockChain) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, error) {
return bchain.NewMempoolBitcoinType(chain, 1, 1), nil
}
func (c *fakeBlockChain) Initialize() error {
return nil
}
func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
return nil
}
func (c *fakeBlockChain) Shutdown(ctx context.Context) error {
return nil
}
@ -118,10 +126,6 @@ func (c *fakeBlockChain) GetBlockInfo(hash string) (v *bchain.BlockInfo, err err
return nil, bchain.ErrBlockNotFound
}
func (c *fakeBlockChain) GetMempool() (v []string, err error) {
return nil, errors.New("Not implemented")
}
func getTxInBlock(b *bchain.Block, txid string) *bchain.Tx {
for _, tx := range b.Txs {
if tx.Txid == txid {
@ -179,22 +183,12 @@ func (c *fakeBlockChain) SendRawTransaction(tx string) (v string, err error) {
return "", errors.New("Invalid data")
}
func (c *fakeBlockChain) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) {
return 0, errors.New("Not implemented")
}
func (c *fakeBlockChain) GetMempoolTransactions(address string) (v []bchain.Outpoint, err error) {
return nil, errors.New("Not implemented")
}
func (c *fakeBlockChain) GetMempoolTransactionsForAddrDesc(addrDesc bchain.AddressDescriptor) (v []bchain.Outpoint, err error) {
return []bchain.Outpoint{}, nil
}
func (c *fakeBlockChain) GetMempoolEntry(txid string) (v *bchain.MempoolEntry, err error) {
return nil, errors.New("Not implemented")
}
// GetChainParser returns parser for the blockchain
func (c *fakeBlockChain) GetChainParser() bchain.BlockChainParser {
return c.Parser
}
// GetMempoolTransactions returns transactions in mempool
func (b *fakeBlockChain) GetMempoolTransactions() ([]string, error) {
return nil, errors.New("Not implemented")
}

View File

@ -5,7 +5,7 @@ package tests
import (
"blockbook/bchain"
"blockbook/bchain/coins"
"blockbook/build/tools"
build "blockbook/build/tools"
"blockbook/tests/rpc"
"blockbook/tests/sync"
"encoding/json"
@ -23,7 +23,7 @@ import (
"github.com/martinboehm/btcutil/chaincfg"
)
type TestFunc func(t *testing.T, coin string, chain bchain.BlockChain, testConfig json.RawMessage)
type TestFunc func(t *testing.T, coin string, chain bchain.BlockChain, mempool bchain.Mempool, testConfig json.RawMessage)
var integrationTests = map[string]TestFunc{
"rpc": rpc.IntegrationTest,
@ -76,7 +76,7 @@ func runTests(t *testing.T, coin string, cfg map[string]json.RawMessage) {
}
defer chaincfg.ResetParams()
bc, err := makeBlockChain(coin)
bc, m, err := makeBlockChain(coin)
if err != nil {
if err == notConnectedError {
t.Fatal(err)
@ -86,44 +86,44 @@ func runTests(t *testing.T, coin string, cfg map[string]json.RawMessage) {
for test, c := range cfg {
if fn, found := integrationTests[test]; found {
t.Run(test, func(t *testing.T) { fn(t, coin, bc, c) })
t.Run(test, func(t *testing.T) { fn(t, coin, bc, m, c) })
} else {
t.Errorf("Test not found: %s", test)
}
}
}
func makeBlockChain(coin string) (bchain.BlockChain, error) {
func makeBlockChain(coin string) (bchain.BlockChain, bchain.Mempool, error) {
c, err := build.LoadConfig("../configs", coin)
if err != nil {
return nil, err
return nil, nil, err
}
outputDir, err := ioutil.TempDir("", "integration_test")
if err != nil {
return nil, err
return nil, nil, err
}
defer os.RemoveAll(outputDir)
err = build.GeneratePackageDefinitions(c, "../build/templates", outputDir)
if err != nil {
return nil, err
return nil, nil, err
}
b, err := ioutil.ReadFile(filepath.Join(outputDir, "blockbook", "blockchaincfg.json"))
if err != nil {
return nil, err
return nil, nil, err
}
var cfg json.RawMessage
err = json.Unmarshal(b, &cfg)
if err != nil {
return nil, err
return nil, nil, err
}
coinName, err := getName(cfg)
if err != nil {
return nil, err
return nil, nil, err
}
return initBlockChain(coinName, cfg)
@ -147,29 +147,39 @@ func getName(raw json.RawMessage) (string, error) {
}
}
func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, error) {
func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, bchain.Mempool, error) {
factory, found := coins.BlockChainFactories[coinName]
if !found {
return nil, fmt.Errorf("Factory function not found")
return nil, nil, fmt.Errorf("Factory function not found")
}
cli, err := factory(cfg, func(_ bchain.NotificationType) {})
if err != nil {
if isNetError(err) {
return nil, notConnectedError
return nil, nil, notConnectedError
}
return nil, fmt.Errorf("Factory function failed: %s", err)
return nil, nil, fmt.Errorf("Factory function failed: %s", err)
}
err = cli.Initialize()
if err != nil {
if isNetError(err) {
return nil, notConnectedError
return nil, nil, notConnectedError
}
return nil, fmt.Errorf("BlockChain initialization failed: %s", err)
return nil, nil, fmt.Errorf("BlockChain initialization failed: %s", err)
}
return cli, nil
mempool, err := cli.CreateMempool(cli)
if err != nil {
return nil, nil, fmt.Errorf("Mempool creation failed: %s", err)
}
err = cli.InitializeMempool(nil, nil)
if err != nil {
return nil, nil, fmt.Errorf("Mempool initialization failed: %s", err)
}
return cli, mempool, nil
}
func isNetError(err error) bool {

View File

@ -30,6 +30,7 @@ var testMap = map[string]func(t *testing.T, th *TestHandler){
type TestHandler struct {
Chain bchain.BlockChain
Mempool bchain.Mempool
TestData *TestData
}
@ -42,7 +43,7 @@ type TestData struct {
TxDetails map[string]*bchain.Tx `json:"txDetails"`
}
func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, testConfig json.RawMessage) {
func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, mempool bchain.Mempool, testConfig json.RawMessage) {
tests, err := getTests(testConfig)
if err != nil {
t.Fatalf("Failed loading of test list: %s", err)
@ -54,7 +55,11 @@ func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, testCon
t.Fatalf("Failed loading of test data: %s", err)
}
h := TestHandler{Chain: chain, TestData: td}
h := TestHandler{
Chain: chain,
Mempool: mempool,
TestData: td,
}
for _, test := range tests {
if f, found := testMap[test]; found {
@ -195,7 +200,7 @@ func testMempoolSync(t *testing.T, h *TestHandler) {
for i := 0; i < 3; i++ {
txs := getMempool(t, h)
n, err := h.Chain.ResyncMempool(nil)
n, err := h.Mempool.Resync()
if err != nil {
t.Fatal(err)
}
@ -217,7 +222,7 @@ func testMempoolSync(t *testing.T, h *TestHandler) {
for txid, addrs := range txid2addrs {
for _, a := range addrs {
got, err := h.Chain.GetMempoolTransactions(a)
got, err := h.Mempool.GetTransactions(a)
if err != nil {
t.Fatalf("address %q: %s", a, err)
}
@ -337,7 +342,7 @@ func testGetBlockHeader(t *testing.T, h *TestHandler) {
}
func getMempool(t *testing.T, h *TestHandler) []string {
txs, err := h.Chain.GetMempool()
txs, err := h.Chain.GetMempoolTransactions()
if err != nil {
t.Fatal(err)
}

View File

@ -54,7 +54,7 @@ type BlockInfo struct {
TxDetails []*bchain.Tx `json:"txDetails"`
}
func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, testConfig json.RawMessage) {
func IntegrationTest(t *testing.T, coin string, chain bchain.BlockChain, mempool bchain.Mempool, testConfig json.RawMessage) {
tests, err := getTests(testConfig)
if err != nil {
t.Fatalf("Failed loading of test list: %s", err)