From 9b2d408ee49b37cbb8db338253aee2057cdb5098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Krac=C3=ADk?= Date: Wed, 5 Sep 2018 12:17:07 +0200 Subject: [PATCH 01/12] ETH Ropsten added syncmode full (#50) --- configs/coins/ethereum_testnet_ropsten.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configs/coins/ethereum_testnet_ropsten.json b/configs/coins/ethereum_testnet_ropsten.json index a236e16d..6887b9f6 100644 --- a/configs/coins/ethereum_testnet_ropsten.json +++ b/configs/coins/ethereum_testnet_ropsten.json @@ -26,7 +26,7 @@ "verification_source": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.10-eae63c51.tar.gz.asc", "extract_command": "tar -C backend --strip 1 -xf", "exclude_files": [], - "exec_command_template": "/bin/sh -c '{{.Env.BackendInstallPath}}/{{.Coin.Alias}}/geth --testnet --ipcdisable --cache 1024 --nat none --datadir {{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend --port 48336 --ws --wsaddr 0.0.0.0 --wsport {{.Ports.BackendRPC}} --wsorigins \"*\" 2>>{{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend/{{.Coin.Alias}}.log'", + "exec_command_template": "/bin/sh -c '{{.Env.BackendInstallPath}}/{{.Coin.Alias}}/geth --testnet --syncmode full --ipcdisable --cache 1024 --nat none --datadir {{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend --port 48336 --ws --wsaddr 0.0.0.0 --wsport {{.Ports.BackendRPC}} --wsorigins \"*\" 2>>{{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend/{{.Coin.Alias}}.log'", "logrotate_files_template": "{{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend/{{.Coin.Alias}}.log", "postinst_script_template": "", "service_type": "simple", From b8dede857ce7af52f3fe8c4d696ecb3184247f65 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 5 Sep 2018 15:50:27 +0200 Subject: [PATCH 02/12] Add experimental Ethereum Classic blockbook implementation --- bchain/coins/blockchain.go | 1 + 1 file changed, 1 insertion(+) diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index 7a7a7241..eb09af3c 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -34,6 +34,7 @@ func init() { blockChainFactories["Zcash"] = zec.NewZCashRPC blockChainFactories["Zcash Testnet"] = zec.NewZCashRPC blockChainFactories["Ethereum"] = eth.NewEthereumRPC + blockChainFactories["Ethereum Classic"] = eth.NewEthereumRPC blockChainFactories["Ethereum Testnet Ropsten"] = eth.NewEthereumRPC blockChainFactories["Bcash"] = bch.NewBCashRPC blockChainFactories["Bcash Testnet"] = bch.NewBCashRPC From b1317789dec44de9a4b7d692515e5560eb2b6da5 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 5 Sep 2018 16:27:34 +0200 Subject: [PATCH 03/12] Handle unsupported newHeads subscription in Ethereum Classic --- bchain/coins/eth/ethrpc.go | 34 +++++++++++++++-------------- configs/coins/ethereum-classic.json | 2 +- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 1285c354..6a8f5240 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -41,9 +41,7 @@ type EthereumRPC struct { client *ethclient.Client rpc *rpc.Client timeout time.Duration - rpcURL string Parser *EthereumParser - CoinName string Testnet bool Network string Mempool *bchain.NonUTXOMempool @@ -143,21 +141,25 @@ func (b *EthereumRPC) Initialize() error { } glog.Info("rpc: block chain ", b.Network) - // subscriptions - if err = b.subscribe(func() (*rpc.ClientSubscription, error) { - // invalidate the previous subscription - it is either the first one or there was an error - b.newBlockSubscription = nil - ctx, cancel := context.WithTimeout(context.Background(), b.timeout) - defer cancel() - sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads") - if err != nil { - return nil, errors.Annotatef(err, "EthSubscribe newHeads") + if b.ChainConfig.CoinName == "Ethereum Classic" { + glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads") + } else { + // subscriptions + if err = b.subscribe(func() (*rpc.ClientSubscription, error) { + // invalidate the previous subscription - it is either the first one or there was an error + b.newBlockSubscription = nil + ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + defer cancel() + sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads") + if err != nil { + return nil, errors.Annotatef(err, "EthSubscribe newHeads") + } + b.newBlockSubscription = sub + glog.Info("Subscribed to newHeads") + return sub, nil + }); err != nil { + return err } - b.newBlockSubscription = sub - glog.Info("Subscribed to newHeads") - return sub, nil - }); err != nil { - return err } if err = b.subscribe(func() (*rpc.ClientSubscription, error) { // invalidate the previous subscription - it is either the first one or there was an error diff --git a/configs/coins/ethereum-classic.json b/configs/coins/ethereum-classic.json index e800da46..f7e49b56 100644 --- a/configs/coins/ethereum-classic.json +++ b/configs/coins/ethereum-classic.json @@ -40,7 +40,7 @@ "internal_binding_template": ":{{.Ports.BlockbookInternal}}", "public_binding_template": ":{{.Ports.BlockbookPublic}}", "explorer_url": "https://gastracker.io/", - "additional_params": "", + "additional_params": "-resyncindexperiod=4441", "block_chain": { "parse": true, "mempool_workers": 8, From 239274ec9b33e8bfe906f134edc67a60ccc11468 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 5 Sep 2018 17:55:56 +0200 Subject: [PATCH 04/12] Experimental implementation of Ethereum send transaction --- bchain/coins/eth/ethrpc.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 6a8f5240..ada7b1e9 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -502,8 +502,28 @@ func (b *EthereumRPC) EstimateSmartFee(blocks int, conservative bool) (float64, } // SendRawTransaction sends raw transaction. -func (b *EthereumRPC) SendRawTransaction(tx string) (string, error) { - return "", errors.New("SendRawTransaction: not implemented") +func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + defer cancel() + glog.Info("SendRawTransaction hex ", hex) + var raw json.RawMessage + err := b.rpc.CallContext(ctx, &raw, "eth_sendRawTransaction", hex) + if err != nil { + return "", err + } else if len(raw) == 0 { + return "", errors.New("SendRawTransaction: failed") + } + type rpcSendResult struct { + Result string `json:"result"` + } + var r rpcSendResult + if err := json.Unmarshal(raw, &r); err != nil { + return "", errors.Annotatef(err, "raw result %v", raw) + } + if r.Result == "" { + return "", errors.New("SendRawTransaction: failed, empty result") + } + return r.Result, nil } func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) { From ccfc3502058d59d8ccc20a5379444805db885d3c Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 5 Sep 2018 17:58:20 +0200 Subject: [PATCH 05/12] Notify on new input address in nonutxo mempool --- bchain/mempool_nonutxo.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bchain/mempool_nonutxo.go b/bchain/mempool_nonutxo.go index 8ccf018c..d46956aa 100644 --- a/bchain/mempool_nonutxo.go +++ b/bchain/mempool_nonutxo.go @@ -91,6 +91,9 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int continue } io = append(io, addrIndex{string(addrID), int32(^i)}) + if onNewTxAddr != nil { + onNewTxAddr(tx.Txid, a) + } } } } From 2fd84dd97bcf72a78b13f2e2d2d80d2b67b6c3ef Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 5 Sep 2018 18:57:12 +0200 Subject: [PATCH 06/12] Fix handling of result in ethereum SendRawTransaction --- bchain/coins/eth/ethrpc.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index ada7b1e9..91f1fd82 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -513,17 +513,14 @@ func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) { } else if len(raw) == 0 { return "", errors.New("SendRawTransaction: failed") } - type rpcSendResult struct { - Result string `json:"result"` - } - var r rpcSendResult - if err := json.Unmarshal(raw, &r); err != nil { + var result string + if err := json.Unmarshal(raw, &result); err != nil { return "", errors.Annotatef(err, "raw result %v", raw) } - if r.Result == "" { + if result == "" { return "", errors.New("SendRawTransaction: failed, empty result") } - return r.Result, nil + return result, nil } func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) { From a6690a0f1962eddcedf563051eb5e33c50a8d9fc Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Wed, 5 Sep 2018 19:41:52 +0200 Subject: [PATCH 07/12] Remove unnecessary debug log in ethereum SendRawTransaction --- bchain/coins/eth/ethrpc.go | 1 - 1 file changed, 1 deletion(-) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 91f1fd82..d1a7bb56 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -505,7 +505,6 @@ func (b *EthereumRPC) EstimateSmartFee(blocks int, conservative bool) (float64, func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), b.timeout) defer cancel() - glog.Info("SendRawTransaction hex ", hex) var raw json.RawMessage err := b.rpc.CallContext(ctx, &raw, "eth_sendRawTransaction", hex) if err != nil { From 0abdf3691b8730e18a235a4cccf02702fff57c87 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Mon, 10 Sep 2018 11:12:33 +0200 Subject: [PATCH 08/12] Bump go-ethereum version to 1.8.15 --- Gopkg.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 8618ddd9..3dd541e8 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -69,9 +69,9 @@ [[projects]] name = "github.com/ethereum/go-ethereum" - packages = [".","common","common/hexutil","common/math","core/types","crypto","crypto/secp256k1","crypto/sha3","ethclient","ethdb","log","metrics","params","rlp","rpc","trie"] - revision = "329ac18ef617d0238f71637bffe78f028b0f13f7" - version = "v1.8.3" + packages = [".","common","common/hexutil","common/math","core/types","crypto","crypto/secp256k1","crypto/sha3","ethclient","ethdb","log","metrics","p2p/netutil","params","rlp","rpc","trie"] + revision = "89451f7c382ad2185987ee369f16416f89c28a7d" + version = "v1.8.15" [[projects]] name = "github.com/go-stack/stack" From 80b28ed8485bf0f64c895ef8c05bf981ae674f50 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Mon, 10 Sep 2018 13:31:23 +0200 Subject: [PATCH 09/12] Bump ethereum backend to version 1.8.15 --- configs/coins/ethereum.json | 6 +++--- configs/coins/ethereum_testnet_ropsten.json | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/configs/coins/ethereum.json b/configs/coins/ethereum.json index fcc63d3d..5423c5ed 100644 --- a/configs/coins/ethereum.json +++ b/configs/coins/ethereum.json @@ -21,10 +21,10 @@ "package_name": "backend-ethereum", "package_revision": "satoshilabs-1", "system_user": "ethereum", - "version": "1.8.10-eae63c51", - "binary_url": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.10-eae63c51.tar.gz", + "version": "1.8.15-89451f7c", + "binary_url": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.15-89451f7c.tar.gz", "verification_type": "gpg", - "verification_source": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.10-eae63c51.tar.gz.asc", + "verification_source": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.15-89451f7c.tar.gz.asc", "extract_command": "tar -C backend --strip 1 -xf", "exclude_files": [], "exec_command_template": "/bin/sh -c '{{.Env.BackendInstallPath}}/{{.Coin.Alias}}/geth --ipcdisable --syncmode full --cache 1024 --nat none --datadir {{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend --port 38336 --ws --wsaddr 0.0.0.0 --wsport {{.Ports.BackendRPC}} --wsorigins \"*\" --rpc --rpcport 8136 -rpcaddr 0.0.0.0 --rpccorsdomain \"*\" --rpcvhosts \"*\" 2>>{{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend/{{.Coin.Alias}}.log'", diff --git a/configs/coins/ethereum_testnet_ropsten.json b/configs/coins/ethereum_testnet_ropsten.json index 6887b9f6..42fe3fd0 100644 --- a/configs/coins/ethereum_testnet_ropsten.json +++ b/configs/coins/ethereum_testnet_ropsten.json @@ -20,10 +20,10 @@ "package_name": "backend-ethereum-testnet-ropsten", "package_revision": "satoshilabs-1", "system_user": "ethereum", - "version": "1.8.10-eae63c51", - "binary_url": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.10-eae63c51.tar.gz", + "version": "1.8.15-89451f7c", + "binary_url": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.15-89451f7c.tar.gz", "verification_type": "gpg", - "verification_source": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.10-eae63c51.tar.gz.asc", + "verification_source": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.15-89451f7c.tar.gz.asc", "extract_command": "tar -C backend --strip 1 -xf", "exclude_files": [], "exec_command_template": "/bin/sh -c '{{.Env.BackendInstallPath}}/{{.Coin.Alias}}/geth --testnet --syncmode full --ipcdisable --cache 1024 --nat none --datadir {{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend --port 48336 --ws --wsaddr 0.0.0.0 --wsport {{.Ports.BackendRPC}} --wsorigins \"*\" 2>>{{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend/{{.Coin.Alias}}.log'", From f829d21eebcfa0912144ad63450f648a1c04ef46 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Mon, 10 Sep 2018 23:23:41 +0200 Subject: [PATCH 10/12] Handle Ethereum Classic transactions --- bchain/coins/eth/ethrpc.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index d1a7bb56..76c603d9 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -52,6 +52,7 @@ type EthereumRPC struct { chanNewTx chan ethcommon.Hash newTxSubscription *rpc.ClientSubscription ChainConfig *Configuration + isETC bool } // NewEthereumRPC returns new EthRPC instance. @@ -78,6 +79,9 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification s.Parser = NewEthereumParser() s.timeout = time.Duration(c.RPCTimeout) * time.Second + // detect ethereum classic + s.isETC = s.ChainConfig.CoinName == "Ethereum Classic" + // new blocks notifications handling // the subscription is done in Initialize s.chanNewBlock = make(chan *ethtypes.Header) @@ -141,7 +145,7 @@ func (b *EthereumRPC) Initialize() error { } glog.Info("rpc: block chain ", b.Network) - if b.ChainConfig.CoinName == "Ethereum Classic" { + if b.isETC { glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads") } else { // subscriptions @@ -427,7 +431,11 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) { } else if tx == nil { return nil, ethereum.NotFound } else if tx.R == "" { - return nil, errors.Annotatef(fmt.Errorf("server returned transaction without signature"), "txid %v", txid) + if !b.isETC { + return nil, errors.Annotatef(fmt.Errorf("server returned transaction without signature"), "txid %v", txid) + } else { + glog.Warning("server returned transaction without signature, txid ", txid) + } } var btx *bchain.Tx if tx.BlockNumber == "" { From 6dcf3fd45d603a61383ea984e8b2b4e6f16c00d1 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Tue, 11 Sep 2018 13:19:56 +0200 Subject: [PATCH 11/12] Send the side of address (input/output) for a new tx notification --- bchain/coins/blockchain.go | 2 +- bchain/coins/btc/bitcoinrpc.go | 2 +- bchain/coins/eth/ethrpc.go | 2 +- bchain/mempool_nonutxo.go | 6 +++--- bchain/mempool_utxo.go | 6 +++--- bchain/types.go | 2 +- blockbook.go | 6 +++--- server/public.go | 4 ++-- server/socketio.go | 8 ++++++-- 9 files changed, 21 insertions(+), 17 deletions(-) diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index eb09af3c..d2406f22 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -191,7 +191,7 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err return c.b.SendRawTransaction(tx) } -func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string)) (count int, err error) { +func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (count int, err error) { defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) count, err = c.b.ResyncMempool(onNewTxAddr) if err == nil { diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index e0176d16..59d2a46d 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -607,7 +607,7 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) { // ResyncMempool gets mempool transactions and maps output scripts to transactions. // ResyncMempool is not reentrant, it should be called from a single thread. // It returns number of transactions in mempool -func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) { +func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 76c603d9..43fb0a96 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -530,7 +530,7 @@ func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) { return result, nil } -func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) { +func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/mempool_nonutxo.go b/bchain/mempool_nonutxo.go index d46956aa..d32a8917 100644 --- a/bchain/mempool_nonutxo.go +++ b/bchain/mempool_nonutxo.go @@ -47,7 +47,7 @@ func (m *NonUTXOMempool) updateMappings(newTxToInputOutput map[string][]addrInde // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) { +func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { start := time.Now() glog.V(1).Info("Mempool: resync") txs, err := m.chain.GetMempool() @@ -79,7 +79,7 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int io = append(io, addrIndex{string(addrID), int32(output.N)}) } if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { - onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) + onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0], true) } } for _, input := range tx.Vin { @@ -92,7 +92,7 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int } io = append(io, addrIndex{string(addrID), int32(^i)}) if onNewTxAddr != nil { - onNewTxAddr(tx.Txid, a) + onNewTxAddr(tx.Txid, a, false) } } } diff --git a/bchain/mempool_utxo.go b/bchain/mempool_utxo.go index 1963bc19..32eb87af 100644 --- a/bchain/mempool_utxo.go +++ b/bchain/mempool_utxo.go @@ -31,7 +31,7 @@ type UTXOMempool struct { addrIDToTx map[string][]outpoint chanTxid chan string chanAddrIndex chan txidio - onNewTxAddr func(txid string, addr string) + onNewTxAddr func(txid string, addr string, isOutput bool) } // NewUTXOMempool creates new mempool handler. @@ -129,7 +129,7 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul io = append(io, addrIndex{string(addrID), int32(output.N)}) } if m.onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 { - m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0]) + m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0], true) } } dispatched := 0 @@ -166,7 +166,7 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) { +func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { start := time.Now() glog.V(1).Info("mempool: resync") m.onNewTxAddr = onNewTxAddr diff --git a/bchain/types.go b/bchain/types.go index e2aa7276..6cba260f 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -136,7 +136,7 @@ type BlockChain interface { EstimateFee(blocks int) (float64, error) SendRawTransaction(tx string) (string, error) // mempool - ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) + ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) GetMempoolTransactions(address string) ([]string, error) GetMempoolEntry(txid string) (*MempoolEntry, error) // parser diff --git a/blockbook.go b/blockbook.go index cea2c5b6..6111e564 100644 --- a/blockbook.go +++ b/blockbook.go @@ -86,7 +86,7 @@ var ( syncWorker *db.SyncWorker internalState *common.InternalState callbacksOnNewBlockHash []func(hash string) - callbacksOnNewTxAddr []func(txid string, addr string) + callbacksOnNewTxAddr []func(txid string, addr string, isOutput bool) chanOsSignal chan os.Signal inShutdown int32 ) @@ -444,9 +444,9 @@ func storeInternalStateLoop() { glog.Info("storeInternalStateLoop stopped") } -func onNewTxAddr(txid string, addr string) { +func onNewTxAddr(txid string, addr string, isOutput bool) { for _, c := range callbacksOnNewTxAddr { - c(txid, addr) + c(txid, addr, isOutput) } } diff --git a/server/public.go b/server/public.go index 54555325..ebaa3beb 100644 --- a/server/public.go +++ b/server/public.go @@ -145,8 +145,8 @@ func (s *PublicServer) OnNewBlockHash(hash string) { } // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block -func (s *PublicServer) OnNewTxAddr(txid string, addr string) { - s.socketio.OnNewTxAddr(txid, addr) +func (s *PublicServer) OnNewTxAddr(txid string, addr string, isOutput bool) { + s.socketio.OnNewTxAddr(txid, addr, isOutput) } func splitBinding(binding string) (addr string, path string) { diff --git a/server/socketio.go b/server/socketio.go index 46c0a563..241df197 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -694,8 +694,12 @@ func (s *SocketIoServer) OnNewBlockHash(hash string) { } // OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block -func (s *SocketIoServer) OnNewTxAddr(txid string, addr string) { - c := s.server.BroadcastTo("bitcoind/addresstxid-"+addr, "bitcoind/addresstxid", map[string]string{"address": addr, "txid": txid}) +func (s *SocketIoServer) OnNewTxAddr(txid string, addr string, isOutput bool) { + data := map[string]interface{}{"address": addr, "txid": txid} + if !isOutput { + data["input"] = true + } + c := s.server.BroadcastTo("bitcoind/addresstxid-"+addr, "bitcoind/addresstxid", data) if c > 0 { glog.Info("broadcasting new txid ", txid, " for addr ", addr, " to ", c, " channels") } From 8bdf4b0ae308f6083e16d8a9d8228522c7788df6 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Tue, 11 Sep 2018 13:37:12 +0200 Subject: [PATCH 12/12] Prepare notification of block height in OnNewBlock --- bchain/coins/blockchain.go | 2 +- bchain/coins/btc/bitcoinrpc.go | 2 +- bchain/coins/eth/ethrpc.go | 2 +- bchain/mempool_nonutxo.go | 2 +- bchain/mempool_utxo.go | 4 ++-- bchain/types.go | 8 +++++++- blockbook.go | 12 ++++++------ db/sync.go | 10 +++++----- server/public.go | 4 ++-- 9 files changed, 26 insertions(+), 20 deletions(-) diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index d2406f22..6469966e 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -191,7 +191,7 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err return c.b.SendRawTransaction(tx) } -func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (count int, err error) { +func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) { defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) count, err = c.b.ResyncMempool(onNewTxAddr) if err == nil { diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index 59d2a46d..e0211b28 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -607,7 +607,7 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) { // ResyncMempool gets mempool transactions and maps output scripts to transactions. // ResyncMempool is not reentrant, it should be called from a single thread. // It returns number of transactions in mempool -func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { +func (b *BitcoinRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 43fb0a96..ec3790a2 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -530,7 +530,7 @@ func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) { return result, nil } -func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { +func (b *EthereumRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) { return b.Mempool.Resync(onNewTxAddr) } diff --git a/bchain/mempool_nonutxo.go b/bchain/mempool_nonutxo.go index d32a8917..23f950c6 100644 --- a/bchain/mempool_nonutxo.go +++ b/bchain/mempool_nonutxo.go @@ -47,7 +47,7 @@ func (m *NonUTXOMempool) updateMappings(newTxToInputOutput map[string][]addrInde // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { +func (m *NonUTXOMempool) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { start := time.Now() glog.V(1).Info("Mempool: resync") txs, err := m.chain.GetMempool() diff --git a/bchain/mempool_utxo.go b/bchain/mempool_utxo.go index 32eb87af..5483ca8e 100644 --- a/bchain/mempool_utxo.go +++ b/bchain/mempool_utxo.go @@ -31,7 +31,7 @@ type UTXOMempool struct { addrIDToTx map[string][]outpoint chanTxid chan string chanAddrIndex chan txidio - onNewTxAddr func(txid string, addr string, isOutput bool) + onNewTxAddr OnNewTxAddrFunc } // NewUTXOMempool creates new mempool handler. @@ -166,7 +166,7 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul // Resync gets mempool transactions and maps outputs to transactions. // Resync is not reentrant, it should be called from a single thread. // Read operations (GetTransactions) are safe. -func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { +func (m *UTXOMempool) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) { start := time.Now() glog.V(1).Info("mempool: resync") m.onNewTxAddr = onNewTxAddr diff --git a/bchain/types.go b/bchain/types.go index 6cba260f..c2d350bd 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -112,6 +112,12 @@ func (e *RPCError) Error() string { return fmt.Sprintf("%d: %s", e.Code, e.Message) } +// OnNewBlockFunc is used to send notification about a new block +type OnNewBlockFunc func(hash string, height uint32) + +// OnNewTxAddrFunc is used to send notification about a new transaction/address +type OnNewTxAddrFunc func(txid string, addr string, isOutput bool) + // BlockChain defines common interface to block chain daemon type BlockChain interface { // life-cycle methods @@ -136,7 +142,7 @@ type BlockChain interface { EstimateFee(blocks int) (float64, error) SendRawTransaction(tx string) (string, error) // mempool - ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) + ResyncMempool(onNewTxAddr OnNewTxAddrFunc) (int, error) GetMempoolTransactions(address string) ([]string, error) GetMempoolEntry(txid string) (*MempoolEntry, error) // parser diff --git a/blockbook.go b/blockbook.go index 6111e564..1de45946 100644 --- a/blockbook.go +++ b/blockbook.go @@ -85,8 +85,8 @@ var ( txCache *db.TxCache syncWorker *db.SyncWorker internalState *common.InternalState - callbacksOnNewBlockHash []func(hash string) - callbacksOnNewTxAddr []func(txid string, addr string, isOutput bool) + callbacksOnNewBlock []bchain.OnNewBlockFunc + callbacksOnNewTxAddr []bchain.OnNewTxAddrFunc chanOsSignal chan os.Signal inShutdown int32 ) @@ -281,7 +281,7 @@ func main() { } } }() - callbacksOnNewBlockHash = append(callbacksOnNewBlockHash, publicServer.OnNewBlockHash) + callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock) callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr) } @@ -392,9 +392,9 @@ func syncIndexLoop() { glog.Info("syncIndexLoop stopped") } -func onNewBlockHash(hash string) { - for _, c := range callbacksOnNewBlockHash { - c(hash) +func onNewBlockHash(hash string, height uint32) { + for _, c := range callbacksOnNewBlock { + c(hash, height) } } diff --git a/db/sync.go b/db/sync.go index a664e94f..6c2e281e 100644 --- a/db/sync.go +++ b/db/sync.go @@ -47,7 +47,7 @@ var errSynced = errors.New("synced") // ResyncIndex synchronizes index to the top of the blockchain // onNewBlock is called when new block is connected, but not in initial parallel sync -func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { +func (w *SyncWorker) ResyncIndex(onNewBlock bchain.OnNewBlockFunc) error { start := time.Now() w.is.StartedSync() @@ -75,7 +75,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { return err } -func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { +func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc) error { remoteBestHash, err := w.chain.GetBestBlockHash() if err != nil { return err @@ -135,7 +135,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { return w.connectBlocks(onNewBlock) } -func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock func(hash string)) error { +func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock bchain.OnNewBlockFunc) error { // find forked blocks, disconnect them and then synchronize again var height uint32 hashes := []string{localBestHash} @@ -163,7 +163,7 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on return w.resyncIndex(onNewBlock) } -func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { +func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc) error { bch := make(chan blockResult, 8) done := make(chan struct{}) defer close(done) @@ -181,7 +181,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { return err } if onNewBlock != nil { - onNewBlock(res.block.Hash) + onNewBlock(res.block.Hash, res.block.Height) } if res.block.Height > 0 && res.block.Height%1000 == 0 { glog.Info("connected block ", res.block.Height, " ", res.block.Hash) diff --git a/server/public.go b/server/public.go index ebaa3beb..d208a3de 100644 --- a/server/public.go +++ b/server/public.go @@ -139,8 +139,8 @@ func (s *PublicServer) Shutdown(ctx context.Context) error { return s.https.Shutdown(ctx) } -// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block -func (s *PublicServer) OnNewBlockHash(hash string) { +// OnNewBlock notifies users subscribed to bitcoind/hashblock about new block +func (s *PublicServer) OnNewBlock(hash string, height uint32) { s.socketio.OnNewBlockHash(hash) }