Use txids returned from pendingTransactions subscriptionfor mempool

ethereum
Martin Boehm 2018-12-19 12:03:19 +01:00
parent 9288a12f1d
commit e12641ae7d
1 changed files with 61 additions and 32 deletions

View File

@ -41,20 +41,22 @@ type Configuration struct {
// EthereumRPC is an interface to JSON-RPC eth service.
type EthereumRPC struct {
*bchain.BaseChain
client *ethclient.Client
rpc *rpc.Client
timeout time.Duration
Parser *EthereumParser
Mempool *bchain.MempoolEthereumType
bestHeaderMu sync.Mutex
bestHeader *ethtypes.Header
bestHeaderTime time.Time
chanNewBlock chan *ethtypes.Header
newBlockSubscription *rpc.ClientSubscription
chanNewTx chan ethcommon.Hash
newTxSubscription *rpc.ClientSubscription
ChainConfig *Configuration
isETC bool
client *ethclient.Client
rpc *rpc.Client
timeout time.Duration
Parser *EthereumParser
Mempool *bchain.MempoolEthereumType
bestHeaderLock sync.Mutex
bestHeader *ethtypes.Header
bestHeaderTime time.Time
chanNewBlock chan *ethtypes.Header
newBlockSubscription *rpc.ClientSubscription
chanNewTx chan ethcommon.Hash
newTxSubscription *rpc.ClientSubscription
pendingTransactions map[string]struct{}
pendingTransactionsLock sync.Mutex
ChainConfig *Configuration
isETC bool
}
// NewEthereumRPC returns new EthRPC instance.
@ -76,10 +78,11 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
ec := ethclient.NewClient(rc)
s := &EthereumRPC{
BaseChain: &bchain.BaseChain{},
client: ec,
rpc: rc,
ChainConfig: &c,
BaseChain: &bchain.BaseChain{},
client: ec,
rpc: rc,
ChainConfig: &c,
pendingTransactions: make(map[string]struct{}),
}
// always create parser
@ -100,10 +103,10 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
}
glog.V(2).Info("rpc: new block header ", h.Number)
// update best header to the new header
s.bestHeaderMu.Lock()
s.bestHeaderLock.Lock()
s.bestHeader = h
s.bestHeaderTime = time.Now()
s.bestHeaderMu.Unlock()
s.bestHeaderLock.Unlock()
// notify blockbook
pushHandler(bchain.NotificationNewBlock)
}
@ -118,9 +121,13 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
if !ok {
break
}
hex := t.Hex()
if glog.V(2) {
glog.Info("rpc: new tx ", t.Hex())
glog.Info("rpc: new tx ", hex)
}
s.pendingTransactionsLock.Lock()
s.pendingTransactions[hex] = struct{}{}
s.pendingTransactionsLock.Unlock()
pushHandler(bchain.NotificationNewTx)
}
}()
@ -296,8 +303,8 @@ func (b *EthereumRPC) GetChainInfo() (*bchain.ChainInfo, error) {
}
func (b *EthereumRPC) getBestHeader() (*ethtypes.Header, error) {
b.bestHeaderMu.Lock()
defer b.bestHeaderMu.Unlock()
b.bestHeaderLock.Lock()
defer b.bestHeaderLock.Unlock()
// ETC does not have newBlocks subscription, bestHeader must be updated very often (each 1 second)
if b.isETC {
if b.bestHeaderTime.Add(1 * time.Second).Before(time.Now()) {
@ -467,12 +474,16 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error
return nil, err
}
btxs := make([]bchain.Tx, len(body.Transactions))
for i, tx := range body.Transactions {
btx, err := b.Parser.ethTxToTx(&tx, &rpcReceipt{Logs: logs[tx.Hash]}, bbh.Time, uint32(bbh.Confirmations))
for i := range body.Transactions {
tx := &body.Transactions[i]
btx, err := b.Parser.ethTxToTx(tx, &rpcReceipt{Logs: logs[tx.Hash]}, bbh.Time, uint32(bbh.Confirmations))
if err != nil {
return nil, errors.Annotatef(err, "hash %v, height %v, txid %v", hash, height, tx.Hash)
}
btxs[i] = *btx
b.pendingTransactionsLock.Lock()
delete(b.pendingTransactions, tx.Hash)
b.pendingTransactionsLock.Unlock()
}
bbk := bchain.Block{
BlockHeader: *bbh,
@ -507,7 +518,14 @@ func (b *EthereumRPC) GetBlockInfo(hash string) (*bchain.BlockInfo, error) {
// GetTransactionForMempool returns a transaction by the transaction ID.
// It could be optimized for mempool, i.e. without block time and confirmations
func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) {
return b.GetTransaction(txid)
tx, err := b.GetTransaction(txid)
// it there is an error getting the tx or the tx is confirmed, remove it from pending transactions
if err != nil || (tx != nil && tx.Confirmations > 0) {
b.pendingTransactionsLock.Lock()
delete(b.pendingTransactions, txid)
b.pendingTransactionsLock.Unlock()
}
return tx, err
}
// GetTransaction returns a transaction by the transaction ID.
@ -613,14 +631,25 @@ func (b *EthereumRPC) GetMempool() ([]string, error) {
if err != nil {
return nil, err
}
if len(raw) == 0 {
return nil, bchain.ErrBlockNotFound
}
var body rpcBlockTxids
if err := json.Unmarshal(raw, &body); err != nil {
return nil, err
if len(raw) > 0 {
if err := json.Unmarshal(raw, &body); err != nil {
return nil, err
}
}
return body.Transactions, nil
b.pendingTransactionsLock.Lock()
// join transactions returned by getBlockRaw with pendingTransactions from subscription
for _, txid := range body.Transactions {
b.pendingTransactions[txid] = struct{}{}
}
txids := make([]string, len(b.pendingTransactions))
i := 0
for txid := range b.pendingTransactions {
txids[i] = txid
i++
}
b.pendingTransactionsLock.Unlock()
return txids, nil
}
// EstimateFee returns fee estimation