From bab500d3f86672f9d849db04083a3b8edd4cbe51 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 19 Dec 2018 10:06:25 +0100 Subject: [PATCH] Notify on mempool erc20 transfer transaction --- api/worker.go | 2 +- bchain/baseparser.go | 5 ++++ bchain/coins/eth/erc20.go | 20 +++++---------- bchain/coins/eth/erc20_test.go | 7 +++--- bchain/coins/eth/ethparser.go | 6 ++--- bchain/mempool_bitcoin_type.go | 2 +- bchain/mempool_ethereum_type.go | 43 +++++++++++++++++++++++---------- bchain/types.go | 12 ++++++++- blockbook.go | 4 +-- db/rocksdb_ethereumtype.go | 2 +- server/public.go | 6 ++--- server/socketio.go | 5 +--- server/websocket.go | 4 +-- static/test-websocket.html | 4 +-- 14 files changed, 71 insertions(+), 51 deletions(-) diff --git a/api/worker.go b/api/worker.go index 7b63b315..6a16660b 100644 --- a/api/worker.go +++ b/api/worker.go @@ -218,7 +218,7 @@ func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height uint32, } pValInSat = &valInSat } else if w.chainType == bchain.ChainEthereumType { - ets, err := eth.GetErc20FromTx(bchainTx) + ets, err := w.chainParser.EthereumTypeGetErc20FromTx(bchainTx) if err != nil { glog.Errorf("GetErc20FromTx error %v, %v", err, bchainTx) } diff --git a/bchain/baseparser.go b/bchain/baseparser.go index 57f4617d..3060b3c2 100644 --- a/bchain/baseparser.go +++ b/bchain/baseparser.go @@ -255,3 +255,8 @@ func (p *BaseParser) UnpackTx(buf []byte) (*Tx, uint32, error) { } return &tx, pt.Height, nil } + +// EthereumTypeGetErc20FromTx is unsupported +func (p *BaseParser) EthereumTypeGetErc20FromTx(tx *Tx) ([]Erc20Transfer, error) { + return nil, errors.New("Not supported") +} diff --git a/bchain/coins/eth/erc20.go b/bchain/coins/eth/erc20.go index 3786d3a6..217a0d60 100644 --- a/bchain/coins/eth/erc20.go +++ b/bchain/coins/eth/erc20.go @@ -37,14 +37,6 @@ const erc20SymbolSignature = "0x95d89b41" const erc20DecimalsSignature = "0x313ce567" const erc20BalanceOf = "0x70a08231" -// Erc20Transfer contains a single ERC20 token transfer -type Erc20Transfer struct { - Contract string - From string - To string - Tokens big.Int -} - var cachedContracts = make(map[string]*bchain.Erc20Contract) var cachedContractsMux sync.Mutex @@ -63,8 +55,8 @@ func addressFromPaddedHex(s string) (string, error) { return a.String(), nil } -func erc20GetTransfersFromLog(logs []*rpcLog) ([]Erc20Transfer, error) { - var r []Erc20Transfer +func erc20GetTransfersFromLog(logs []*rpcLog) ([]bchain.Erc20Transfer, error) { + var r []bchain.Erc20Transfer for _, l := range logs { if len(l.Topics) == 3 && l.Topics[0] == erc20TransferEventSignature { var t big.Int @@ -80,7 +72,7 @@ func erc20GetTransfersFromLog(logs []*rpcLog) ([]Erc20Transfer, error) { if err != nil { return nil, err } - r = append(r, Erc20Transfer{ + r = append(r, bchain.Erc20Transfer{ Contract: strings.ToLower(l.Address), From: strings.ToLower(from), To: strings.ToLower(to), @@ -91,8 +83,8 @@ func erc20GetTransfersFromLog(logs []*rpcLog) ([]Erc20Transfer, error) { return r, nil } -func erc20GetTransfersFromTx(tx *rpcTransaction) ([]Erc20Transfer, error) { - var r []Erc20Transfer +func erc20GetTransfersFromTx(tx *rpcTransaction) ([]bchain.Erc20Transfer, error) { + var r []bchain.Erc20Transfer if len(tx.Payload) == 128+len(erc20TransferMethodSignature) && strings.HasPrefix(tx.Payload, erc20TransferMethodSignature) { to, err := addressFromPaddedHex(tx.Payload[len(erc20TransferMethodSignature) : 64+len(erc20TransferMethodSignature)]) if err != nil { @@ -103,7 +95,7 @@ func erc20GetTransfersFromTx(tx *rpcTransaction) ([]Erc20Transfer, error) { if !ok { return nil, errors.New("Data is not a number") } - r = append(r, Erc20Transfer{ + r = append(r, bchain.Erc20Transfer{ Contract: strings.ToLower(tx.To), From: strings.ToLower(tx.From), To: strings.ToLower(to), diff --git a/bchain/coins/eth/erc20_test.go b/bchain/coins/eth/erc20_test.go index 215f9a5f..d9e23586 100644 --- a/bchain/coins/eth/erc20_test.go +++ b/bchain/coins/eth/erc20_test.go @@ -3,6 +3,7 @@ package eth import ( + "blockbook/bchain" "blockbook/tests/dbtestdata" fmt "fmt" "math/big" @@ -146,17 +147,17 @@ func TestErc20_erc20GetTransfersFromTx(t *testing.T) { tests := []struct { name string args *rpcTransaction - want []Erc20Transfer + want []bchain.Erc20Transfer }{ { name: "0", args: (b.Txs[0].CoinSpecificData.(completeTransaction)).Tx, - want: []Erc20Transfer{}, + want: []bchain.Erc20Transfer{}, }, { name: "1", args: (b.Txs[1].CoinSpecificData.(completeTransaction)).Tx, - want: []Erc20Transfer{ + want: []bchain.Erc20Transfer{ { Contract: "0x4af4114f73d1c1c903ac9e0361b379d1291808a2", From: "0x20cd153de35d469ba46127a0c8f18626b59a256a", diff --git a/bchain/coins/eth/ethparser.go b/bchain/coins/eth/ethparser.go index 6ed8f3e9..1118dc93 100644 --- a/bchain/coins/eth/ethparser.go +++ b/bchain/coins/eth/ethparser.go @@ -398,9 +398,9 @@ func GetHeightFromTx(tx *bchain.Tx) (uint32, error) { return uint32(n), nil } -// GetErc20FromTx returns Erc20 data from bchain.Tx -func GetErc20FromTx(tx *bchain.Tx) ([]Erc20Transfer, error) { - var r []Erc20Transfer +// EthereumTypeGetErc20FromTx returns Erc20 data from bchain.Tx +func (p *EthereumParser) EthereumTypeGetErc20FromTx(tx *bchain.Tx) ([]bchain.Erc20Transfer, error) { + var r []bchain.Erc20Transfer var err error csd, ok := tx.CoinSpecificData.(completeTransaction) if ok { diff --git a/bchain/mempool_bitcoin_type.go b/bchain/mempool_bitcoin_type.go index e6ccf868..db12ffd4 100644 --- a/bchain/mempool_bitcoin_type.go +++ b/bchain/mempool_bitcoin_type.go @@ -122,7 +122,7 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch io = append(io, addrIndex{string(addrDesc), int32(output.N)}) } if m.onNewTxAddr != nil { - m.onNewTxAddr(tx, addrDesc, true) + m.onNewTxAddr(tx, addrDesc) } } dispatched := 0 diff --git a/bchain/mempool_ethereum_type.go b/bchain/mempool_ethereum_type.go index ec5493d0..769d9812 100644 --- a/bchain/mempool_ethereum_type.go +++ b/bchain/mempool_ethereum_type.go @@ -44,6 +44,18 @@ func (m *MempoolEthereumType) updateMappings(newTxToInputOutput map[string][]add m.addrDescToTx = newAddrDescToTx } +func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) []addrIndex { + if len(a) > 0 { + addrDesc, err := parser.GetAddrDescFromAddress(a) + if err != nil { + glog.Error("error in input addrDesc in ", a, ": ", err) + return io + } + io = append(io, addrIndex{string(addrDesc), i}) + } + return io +} + // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. @@ -78,22 +90,27 @@ func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { if len(addrDesc) > 0 { io = append(io, addrIndex{string(addrDesc), int32(output.N)}) } - if onNewTxAddr != nil { - onNewTxAddr(tx, addrDesc, true) - } } for _, input := range tx.Vin { for i, a := range input.Addresses { - if len(a) > 0 { - addrDesc, err := parser.GetAddrDescFromAddress(a) - if err != nil { - glog.Error("error in input addrDesc in ", txid, " ", a, ": ", err) - continue - } - io = append(io, addrIndex{string(addrDesc), int32(^i)}) - if onNewTxAddr != nil { - onNewTxAddr(tx, addrDesc, false) - } + appendAddress(io, ^int32(i), a, parser) + } + } + t, err := parser.EthereumTypeGetErc20FromTx(tx) + if err != nil { + glog.Error("GetErc20FromTx for tx ", txid, ", ", err) + } else { + for i := range t { + io = appendAddress(io, ^int32(i+1), t[i].From, parser) + io = appendAddress(io, int32(i+1), t[i].To, parser) + } + } + if onNewTxAddr != nil { + sent := make(map[string]struct{}) + for _, si := range io { + if _, found := sent[si.addrDesc]; !found { + onNewTxAddr(tx, AddressDescriptor(si.addrDesc)) + sent[si.addrDesc] = struct{}{} } } } diff --git a/bchain/types.go b/bchain/types.go index be6496a2..293d4e58 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -175,11 +175,19 @@ type Erc20Contract struct { Decimals int `json:"decimals"` } +// Erc20Transfer contains a single ERC20 token transfer +type Erc20Transfer struct { + Contract string + From string + To string + Tokens big.Int +} + // OnNewBlockFunc is used to send notification about a new block type OnNewBlockFunc func(hash string, height uint32) // OnNewTxAddrFunc is used to send notification about a new transaction/address -type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor, isOutput bool) +type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor) // BlockChain defines common interface to block chain daemon type BlockChain interface { @@ -252,4 +260,6 @@ type BlockChainParser interface { PackBlockHash(hash string) ([]byte, error) UnpackBlockHash(buf []byte) (string, error) ParseBlock(b []byte) (*Block, error) + // EthereumType specific + EthereumTypeGetErc20FromTx(tx *Tx) ([]Erc20Transfer, error) } diff --git a/blockbook.go b/blockbook.go index ffc7cd28..a939cf3a 100644 --- a/blockbook.go +++ b/blockbook.go @@ -498,9 +498,9 @@ func storeInternalStateLoop() { glog.Info("storeInternalStateLoop stopped") } -func onNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor, isOutput bool) { +func onNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor) { for _, c := range callbacksOnNewTxAddr { - c(tx, desc, isOutput) + c(tx, desc) } } diff --git a/db/rocksdb_ethereumtype.go b/db/rocksdb_ethereumtype.go index 2f9d5018..ad6a96e4 100644 --- a/db/rocksdb_ethereumtype.go +++ b/db/rocksdb_ethereumtype.go @@ -174,7 +174,7 @@ func (d *RocksDB) processAddressesEthereumType(block *bchain.Block, addresses ma blockTx.from = addrDesc } // store erc20 transfers - erc20, err := eth.GetErc20FromTx(&tx) + erc20, err := d.chainParser.EthereumTypeGetErc20FromTx(&tx) if err != nil { glog.Warningf("rocksdb: GetErc20FromTx %v - height %d, tx %v", err, block.Height, tx.Txid) } diff --git a/server/public.go b/server/public.go index 9302eb54..363b57e1 100644 --- a/server/public.go +++ b/server/public.go @@ -201,9 +201,9 @@ func (s *PublicServer) OnNewBlock(hash string, height uint32) { } // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block -func (s *PublicServer) OnNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor, isOutput bool) { - s.socketio.OnNewTxAddr(tx.Txid, desc, isOutput) - s.websocket.OnNewTxAddr(tx, desc, isOutput) +func (s *PublicServer) OnNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor) { + s.socketio.OnNewTxAddr(tx.Txid, desc) + s.websocket.OnNewTxAddr(tx, desc) } func (s *PublicServer) txRedirect(w http.ResponseWriter, r *http.Request) { diff --git a/server/socketio.go b/server/socketio.go index 837c6c54..e24cb1f0 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -722,15 +722,12 @@ func (s *SocketIoServer) OnNewBlockHash(hash string) { } // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block -func (s *SocketIoServer) OnNewTxAddr(txid string, desc bchain.AddressDescriptor, isOutput bool) { +func (s *SocketIoServer) OnNewTxAddr(txid string, desc bchain.AddressDescriptor) { addr, searchable, err := s.chainParser.GetAddressesFromAddrDesc(desc) if err != nil { glog.Error("GetAddressesFromAddrDesc error ", err, " for descriptor ", desc) } else if searchable && len(addr) == 1 { data := map[string]interface{}{"address": addr[0], "txid": txid} - if !isOutput { - data["input"] = true - } c := s.server.BroadcastTo("bitcoind/addresstxid-"+string(desc), "bitcoind/addresstxid", data) if c > 0 { glog.Info("broadcasting new txid ", txid, " for addr ", addr[0], " to ", c, " channels") diff --git a/server/websocket.go b/server/websocket.go index 2807d648..afda0c16 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -531,7 +531,7 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { } // OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address -func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor, isOutput bool) { +func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor) { // check if there is any subscription but release the lock immediately, GetTransactionFromBchainTx may take some time s.addressSubscriptionsLock.Lock() as, ok := s.addressSubscriptions[string(addrDesc)] @@ -550,11 +550,9 @@ func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDesc } data := struct { Address string `json:"address"` - Input bool `json:"input"` Tx *api.Tx `json:"tx"` }{ Address: addr[0], - Input: !isOutput, Tx: atx, } // get the list of subscriptions again, this time keep the lock diff --git a/static/test-websocket.html b/static/test-websocket.html index 0a367f6c..5af7a7f8 100644 --- a/static/test-websocket.html +++ b/static/test-websocket.html @@ -257,7 +257,7 @@
- +
- +