Merge branch 'master' into v0.0.7

pull/56/head
Martin Boehm 2018-09-11 13:44:36 +02:00
commit 503ff4389a
14 changed files with 99 additions and 59 deletions

6
Gopkg.lock generated
View File

@ -69,9 +69,9 @@
[[projects]]
name = "github.com/ethereum/go-ethereum"
packages = [".","common","common/hexutil","common/math","core/types","crypto","crypto/secp256k1","crypto/sha3","ethclient","ethdb","log","metrics","params","rlp","rpc","trie"]
revision = "329ac18ef617d0238f71637bffe78f028b0f13f7"
version = "v1.8.3"
packages = [".","common","common/hexutil","common/math","core/types","crypto","crypto/secp256k1","crypto/sha3","ethclient","ethdb","log","metrics","p2p/netutil","params","rlp","rpc","trie"]
revision = "89451f7c382ad2185987ee369f16416f89c28a7d"
version = "v1.8.15"
[[projects]]
name = "github.com/go-stack/stack"

View File

@ -35,6 +35,7 @@ func init() {
blockChainFactories["Zcash"] = zec.NewZCashRPC
blockChainFactories["Zcash Testnet"] = zec.NewZCashRPC
blockChainFactories["Ethereum"] = eth.NewEthereumRPC
blockChainFactories["Ethereum Classic"] = eth.NewEthereumRPC
blockChainFactories["Ethereum Testnet Ropsten"] = eth.NewEthereumRPC
blockChainFactories["Bcash"] = bch.NewBCashRPC
blockChainFactories["Bcash Testnet"] = bch.NewBCashRPC
@ -191,7 +192,7 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err
return c.b.SendRawTransaction(tx)
}
func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string)) (count int, err error) {
func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) {
defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now())
count, err = c.b.ResyncMempool(onNewTxAddr)
if err == nil {

View File

@ -613,7 +613,7 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) {
// ResyncMempool gets mempool transactions and maps output scripts to transactions.
// ResyncMempool is not reentrant, it should be called from a single thread.
// It returns number of transactions in mempool
func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) {
func (b *BitcoinRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) {
return b.Mempool.Resync(onNewTxAddr)
}

View File

@ -41,9 +41,7 @@ type EthereumRPC struct {
client *ethclient.Client
rpc *rpc.Client
timeout time.Duration
rpcURL string
Parser *EthereumParser
CoinName string
Testnet bool
Network string
Mempool *bchain.NonUTXOMempool
@ -54,6 +52,7 @@ type EthereumRPC struct {
chanNewTx chan ethcommon.Hash
newTxSubscription *rpc.ClientSubscription
ChainConfig *Configuration
isETC bool
}
// NewEthereumRPC returns new EthRPC instance.
@ -80,6 +79,9 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
s.Parser = NewEthereumParser()
s.timeout = time.Duration(c.RPCTimeout) * time.Second
// detect ethereum classic
s.isETC = s.ChainConfig.CoinName == "Ethereum Classic"
// new blocks notifications handling
// the subscription is done in Initialize
s.chanNewBlock = make(chan *ethtypes.Header)
@ -143,21 +145,25 @@ func (b *EthereumRPC) Initialize() error {
}
glog.Info("rpc: block chain ", b.Network)
// subscriptions
if err = b.subscribe(func() (*rpc.ClientSubscription, error) {
// invalidate the previous subscription - it is either the first one or there was an error
b.newBlockSubscription = nil
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel()
sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads")
if err != nil {
return nil, errors.Annotatef(err, "EthSubscribe newHeads")
if b.isETC {
glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads")
} else {
// subscriptions
if err = b.subscribe(func() (*rpc.ClientSubscription, error) {
// invalidate the previous subscription - it is either the first one or there was an error
b.newBlockSubscription = nil
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel()
sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads")
if err != nil {
return nil, errors.Annotatef(err, "EthSubscribe newHeads")
}
b.newBlockSubscription = sub
glog.Info("Subscribed to newHeads")
return sub, nil
}); err != nil {
return err
}
b.newBlockSubscription = sub
glog.Info("Subscribed to newHeads")
return sub, nil
}); err != nil {
return err
}
if err = b.subscribe(func() (*rpc.ClientSubscription, error) {
// invalidate the previous subscription - it is either the first one or there was an error
@ -427,7 +433,11 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
} else if tx == nil {
return nil, ethereum.NotFound
} else if tx.R == "" {
return nil, errors.Annotatef(fmt.Errorf("server returned transaction without signature"), "txid %v", txid)
if !b.isETC {
return nil, errors.Annotatef(fmt.Errorf("server returned transaction without signature"), "txid %v", txid)
} else {
glog.Warning("server returned transaction without signature, txid ", txid)
}
}
var btx *bchain.Tx
if tx.BlockNumber == "" {
@ -503,12 +513,28 @@ func (b *EthereumRPC) EstimateSmartFee(blocks int, conservative bool) (big.Int,
return r, nil
}
// SendRawTransaction sends raw transaction
func (b *EthereumRPC) SendRawTransaction(tx string) (string, error) {
return "", errors.New("SendRawTransaction: not implemented")
// SendRawTransaction sends raw transaction.
func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel()
var raw json.RawMessage
err := b.rpc.CallContext(ctx, &raw, "eth_sendRawTransaction", hex)
if err != nil {
return "", err
} else if len(raw) == 0 {
return "", errors.New("SendRawTransaction: failed")
}
var result string
if err := json.Unmarshal(raw, &result); err != nil {
return "", errors.Annotatef(err, "raw result %v", raw)
}
if result == "" {
return "", errors.New("SendRawTransaction: failed, empty result")
}
return result, nil
}
func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) {
func (b *EthereumRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) {
return b.Mempool.Resync(onNewTxAddr)
}

View File

@ -52,7 +52,7 @@ func (m *NonUTXOMempool) updateMappings(newTxToInputOutput map[string][]addrInde
// Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) {
func (m *NonUTXOMempool) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
start := time.Now()
glog.V(1).Info("Mempool: resync")
txs, err := m.chain.GetMempool()
@ -84,7 +84,7 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int
io = append(io, addrIndex{string(addrDesc), int32(output.N)})
}
if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 {
onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0])
onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0], true)
}
}
for _, input := range tx.Vin {
@ -96,6 +96,9 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int
continue
}
io = append(io, addrIndex{string(addrDesc), int32(^i)})
if onNewTxAddr != nil {
onNewTxAddr(tx.Txid, a, false)
}
}
}
}

View File

@ -31,7 +31,7 @@ type UTXOMempool struct {
addrDescToTx map[string][]outpoint
chanTxid chan string
chanAddrIndex chan txidio
onNewTxAddr func(txid string, addr string)
onNewTxAddr OnNewTxAddrFunc
}
// NewUTXOMempool creates new mempool handler.
@ -133,7 +133,7 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul
io = append(io, addrIndex{string(addrDesc), int32(output.N)})
}
if m.onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 {
m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0])
m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0], true)
}
}
dispatched := 0
@ -170,7 +170,7 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul
// Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) {
func (m *UTXOMempool) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
start := time.Now()
glog.V(1).Info("mempool: resync")
m.onNewTxAddr = onNewTxAddr

View File

@ -119,6 +119,12 @@ func (ad AddressDescriptor) String() string {
return "ad:" + hex.EncodeToString(ad)
}
// OnNewBlockFunc is used to send notification about a new block
type OnNewBlockFunc func(hash string, height uint32)
// OnNewTxAddrFunc is used to send notification about a new transaction/address
type OnNewTxAddrFunc func(txid string, addr string, isOutput bool)
// BlockChain defines common interface to block chain daemon
type BlockChain interface {
// life-cycle methods
@ -143,7 +149,7 @@ type BlockChain interface {
EstimateFee(blocks int) (big.Int, error)
SendRawTransaction(tx string) (string, error)
// mempool
ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error)
ResyncMempool(onNewTxAddr OnNewTxAddrFunc) (int, error)
GetMempoolTransactions(address string) ([]string, error)
GetMempoolTransactionsForAddrDesc(addrDesc AddressDescriptor) ([]string, error)
GetMempoolEntry(txid string) (*MempoolEntry, error)

View File

@ -86,8 +86,8 @@ var (
txCache *db.TxCache
syncWorker *db.SyncWorker
internalState *common.InternalState
callbacksOnNewBlockHash []func(hash string)
callbacksOnNewTxAddr []func(txid string, addr string)
callbacksOnNewBlock []bchain.OnNewBlockFunc
callbacksOnNewTxAddr []bchain.OnNewTxAddrFunc
chanOsSignal chan os.Signal
inShutdown int32
)
@ -288,7 +288,7 @@ func main() {
}
}
}()
callbacksOnNewBlockHash = append(callbacksOnNewBlockHash, publicServer.OnNewBlockHash)
callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock)
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr)
}
@ -399,9 +399,9 @@ func syncIndexLoop() {
glog.Info("syncIndexLoop stopped")
}
func onNewBlockHash(hash string) {
for _, c := range callbacksOnNewBlockHash {
c(hash)
func onNewBlockHash(hash string, height uint32) {
for _, c := range callbacksOnNewBlock {
c(hash, height)
}
}
@ -457,9 +457,9 @@ func storeInternalStateLoop() {
glog.Info("storeInternalStateLoop stopped")
}
func onNewTxAddr(txid string, addr string) {
func onNewTxAddr(txid string, addr string, isOutput bool) {
for _, c := range callbacksOnNewTxAddr {
c(txid, addr)
c(txid, addr, isOutput)
}
}

View File

@ -40,7 +40,7 @@
"internal_binding_template": ":{{.Ports.BlockbookInternal}}",
"public_binding_template": ":{{.Ports.BlockbookPublic}}",
"explorer_url": "https://gastracker.io/",
"additional_params": "",
"additional_params": "-resyncindexperiod=4441",
"block_chain": {
"parse": true,
"mempool_workers": 8,

View File

@ -21,10 +21,10 @@
"package_name": "backend-ethereum",
"package_revision": "satoshilabs-1",
"system_user": "ethereum",
"version": "1.8.10-eae63c51",
"binary_url": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.10-eae63c51.tar.gz",
"version": "1.8.15-89451f7c",
"binary_url": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.15-89451f7c.tar.gz",
"verification_type": "gpg",
"verification_source": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.10-eae63c51.tar.gz.asc",
"verification_source": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.15-89451f7c.tar.gz.asc",
"extract_command": "tar -C backend --strip 1 -xf",
"exclude_files": [],
"exec_command_template": "/bin/sh -c '{{.Env.BackendInstallPath}}/{{.Coin.Alias}}/geth --ipcdisable --syncmode full --cache 1024 --nat none --datadir {{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend --port 38336 --ws --wsaddr 0.0.0.0 --wsport {{.Ports.BackendRPC}} --wsorigins \"*\" --rpc --rpcport 8136 -rpcaddr 0.0.0.0 --rpccorsdomain \"*\" --rpcvhosts \"*\" 2>>{{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend/{{.Coin.Alias}}.log'",

View File

@ -20,13 +20,13 @@
"package_name": "backend-ethereum-testnet-ropsten",
"package_revision": "satoshilabs-1",
"system_user": "ethereum",
"version": "1.8.10-eae63c51",
"binary_url": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.10-eae63c51.tar.gz",
"version": "1.8.15-89451f7c",
"binary_url": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.15-89451f7c.tar.gz",
"verification_type": "gpg",
"verification_source": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.10-eae63c51.tar.gz.asc",
"verification_source": "https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.8.15-89451f7c.tar.gz.asc",
"extract_command": "tar -C backend --strip 1 -xf",
"exclude_files": [],
"exec_command_template": "/bin/sh -c '{{.Env.BackendInstallPath}}/{{.Coin.Alias}}/geth --testnet --ipcdisable --cache 1024 --nat none --datadir {{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend --port 48336 --ws --wsaddr 0.0.0.0 --wsport {{.Ports.BackendRPC}} --wsorigins \"*\" 2>>{{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend/{{.Coin.Alias}}.log'",
"exec_command_template": "/bin/sh -c '{{.Env.BackendInstallPath}}/{{.Coin.Alias}}/geth --testnet --syncmode full --ipcdisable --cache 1024 --nat none --datadir {{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend --port 48336 --ws --wsaddr 0.0.0.0 --wsport {{.Ports.BackendRPC}} --wsorigins \"*\" 2>>{{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend/{{.Coin.Alias}}.log'",
"logrotate_files_template": "{{.Env.BackendDataPath}}/{{.Coin.Alias}}/backend/{{.Coin.Alias}}.log",
"postinst_script_template": "",
"service_type": "simple",

View File

@ -47,7 +47,7 @@ var errSynced = errors.New("synced")
// ResyncIndex synchronizes index to the top of the blockchain
// onNewBlock is called when new block is connected, but not in initial parallel sync
func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
func (w *SyncWorker) ResyncIndex(onNewBlock bchain.OnNewBlockFunc) error {
start := time.Now()
w.is.StartedSync()
@ -75,7 +75,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
return err
}
func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc) error {
remoteBestHash, err := w.chain.GetBestBlockHash()
if err != nil {
return err
@ -135,7 +135,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
return w.connectBlocks(onNewBlock)
}
func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock func(hash string)) error {
func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock bchain.OnNewBlockFunc) error {
// find forked blocks, disconnect them and then synchronize again
var height uint32
hashes := []string{localBestHash}
@ -163,7 +163,7 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on
return w.resyncIndex(onNewBlock)
}
func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error {
func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc) error {
bch := make(chan blockResult, 8)
done := make(chan struct{})
defer close(done)
@ -181,7 +181,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error {
return err
}
if onNewBlock != nil {
onNewBlock(res.block.Hash)
onNewBlock(res.block.Hash, res.block.Height)
}
if res.block.Height > 0 && res.block.Height%1000 == 0 {
glog.Info("connected block ", res.block.Height, " ", res.block.Hash)

View File

@ -124,14 +124,14 @@ func (s *PublicServer) Shutdown(ctx context.Context) error {
return s.https.Shutdown(ctx)
}
// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block
func (s *PublicServer) OnNewBlockHash(hash string) {
// OnNewBlock notifies users subscribed to bitcoind/hashblock about new block
func (s *PublicServer) OnNewBlock(hash string, height uint32) {
s.socketio.OnNewBlockHash(hash)
}
// OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block
func (s *PublicServer) OnNewTxAddr(txid string, addr string) {
s.socketio.OnNewTxAddr(txid, addr)
func (s *PublicServer) OnNewTxAddr(txid string, addr string, isOutput bool) {
s.socketio.OnNewTxAddr(txid, addr, isOutput)
}
func (s *PublicServer) txRedirect(w http.ResponseWriter, r *http.Request) {

View File

@ -748,8 +748,12 @@ func (s *SocketIoServer) OnNewBlockHash(hash string) {
}
// OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block
func (s *SocketIoServer) OnNewTxAddr(txid string, addr string) {
c := s.server.BroadcastTo("bitcoind/addresstxid-"+addr, "bitcoind/addresstxid", map[string]string{"address": addr, "txid": txid})
func (s *SocketIoServer) OnNewTxAddr(txid string, addr string, isOutput bool) {
data := map[string]interface{}{"address": addr, "txid": txid}
if !isOutput {
data["input"] = true
}
c := s.server.BroadcastTo("bitcoind/addresstxid-"+addr, "bitcoind/addresstxid", data)
if c > 0 {
glog.Info("broadcasting new txid ", txid, " for addr ", addr, " to ", c, " channels")
}