diff --git a/api/worker.go b/api/worker.go index c61fd9c1..143f5113 100644 --- a/api/worker.go +++ b/api/worker.go @@ -253,32 +253,7 @@ func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height int, spe if err != nil { glog.Errorf("GetErc20FromTx error %v, %v", err, bchainTx) } - tokens = make([]TokenTransfer, len(ets)) - for i := range ets { - e := &ets[i] - cd, err := w.chainParser.GetAddrDescFromAddress(e.Contract) - if err != nil { - glog.Errorf("GetAddrDescFromAddress error %v, contract %v", err, e.Contract) - continue - } - erc20c, err := w.chain.EthereumTypeGetErc20ContractInfo(cd) - if err != nil { - glog.Errorf("GetErc20ContractInfo error %v, contract %v", err, e.Contract) - } - if erc20c == nil { - erc20c = &bchain.Erc20Contract{Name: e.Contract} - } - tokens[i] = TokenTransfer{ - Type: ERC20TokenType, - Token: e.Contract, - From: e.From, - To: e.To, - Decimals: erc20c.Decimals, - Value: (*Amount)(&e.Tokens), - Name: erc20c.Name, - Symbol: erc20c.Symbol, - } - } + tokens = w.getTokensFromErc20(ets) ethTxData := eth.GetEthereumTxData(bchainTx) // mempool txs do not have fees yet if ethTxData.GasUsed != nil { @@ -332,6 +307,133 @@ func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height int, spe return r, nil } +// GetTransactionFromMempoolTx converts bchain.MempoolTx to Tx, with limited amount of data +// it is not doing any request to backend or to db +func (w *Worker) GetTransactionFromMempoolTx(mempoolTx *bchain.MempoolTx) (*Tx, error) { + var err error + var valInSat, valOutSat, feesSat big.Int + var pValInSat *big.Int + var tokens []TokenTransfer + var ethSpecific *EthereumSpecific + vins := make([]Vin, len(mempoolTx.Vin)) + rbf := false + for i := range mempoolTx.Vin { + bchainVin := &mempoolTx.Vin[i] + vin := &vins[i] + vin.Txid = bchainVin.Txid + vin.N = i + vin.Vout = bchainVin.Vout + vin.Sequence = int64(bchainVin.Sequence) + // detect explicit Replace-by-Fee transactions as defined by BIP125 + if bchainVin.Sequence < 0xffffffff-1 { + rbf = true + } + vin.Hex = bchainVin.ScriptSig.Hex + vin.Coinbase = bchainVin.Coinbase + if w.chainType == bchain.ChainBitcoinType { + // bchainVin.Txid=="" is coinbase transaction + if bchainVin.Txid != "" { + vin.ValueSat = (*Amount)(&bchainVin.ValueSat) + vin.AddrDesc = bchainVin.AddrDesc + vin.Addresses, vin.IsAddress, err = w.chainParser.GetAddressesFromAddrDesc(vin.AddrDesc) + if vin.ValueSat != nil { + valInSat.Add(&valInSat, (*big.Int)(vin.ValueSat)) + } + } + } else if w.chainType == bchain.ChainEthereumType { + if len(bchainVin.Addresses) > 0 { + vin.AddrDesc, err = w.chainParser.GetAddrDescFromAddress(bchainVin.Addresses[0]) + if err != nil { + glog.Errorf("GetAddrDescFromAddress error %v, tx %v, bchainVin %v", err, mempoolTx.Txid, bchainVin) + } + vin.Addresses = bchainVin.Addresses + vin.IsAddress = true + } + } + } + vouts := make([]Vout, len(mempoolTx.Vout)) + for i := range mempoolTx.Vout { + bchainVout := &mempoolTx.Vout[i] + vout := &vouts[i] + vout.N = i + vout.ValueSat = (*Amount)(&bchainVout.ValueSat) + valOutSat.Add(&valOutSat, &bchainVout.ValueSat) + vout.Hex = bchainVout.ScriptPubKey.Hex + vout.AddrDesc, vout.Addresses, vout.IsAddress, err = w.getAddressesFromVout(bchainVout) + if err != nil { + glog.V(2).Infof("getAddressesFromVout error %v, %v, output %v", err, mempoolTx.Txid, bchainVout.N) + } + } + if w.chainType == bchain.ChainBitcoinType { + // for coinbase transactions valIn is 0 + feesSat.Sub(&valInSat, &valOutSat) + if feesSat.Sign() == -1 { + feesSat.SetUint64(0) + } + pValInSat = &valInSat + } else if w.chainType == bchain.ChainEthereumType { + if len(mempoolTx.Vout) > 0 { + valOutSat = mempoolTx.Vout[0].ValueSat + } + tokens = w.getTokensFromErc20(mempoolTx.Erc20) + ethTxData := eth.GetEthereumTxDataFromSpecificData(mempoolTx.CoinSpecificData) + ethSpecific = &EthereumSpecific{ + GasLimit: ethTxData.GasLimit, + GasPrice: (*Amount)(ethTxData.GasPrice), + GasUsed: ethTxData.GasUsed, + Nonce: ethTxData.Nonce, + Status: ethTxData.Status, + Data: ethTxData.Data, + } + } + r := &Tx{ + Blocktime: mempoolTx.Blocktime, + FeesSat: (*Amount)(&feesSat), + Locktime: mempoolTx.LockTime, + Txid: mempoolTx.Txid, + ValueInSat: (*Amount)(pValInSat), + ValueOutSat: (*Amount)(&valOutSat), + Version: mempoolTx.Version, + Hex: mempoolTx.Hex, + Rbf: rbf, + Vin: vins, + Vout: vouts, + TokenTransfers: tokens, + EthereumSpecific: ethSpecific, + } + return r, nil +} + +func (w *Worker) getTokensFromErc20(erc20 []bchain.Erc20Transfer) []TokenTransfer { + tokens := make([]TokenTransfer, len(erc20)) + for i := range erc20 { + e := &erc20[i] + cd, err := w.chainParser.GetAddrDescFromAddress(e.Contract) + if err != nil { + glog.Errorf("GetAddrDescFromAddress error %v, contract %v", err, e.Contract) + continue + } + erc20c, err := w.chain.EthereumTypeGetErc20ContractInfo(cd) + if err != nil { + glog.Errorf("GetErc20ContractInfo error %v, contract %v", err, e.Contract) + } + if erc20c == nil { + erc20c = &bchain.Erc20Contract{Name: e.Contract} + } + tokens[i] = TokenTransfer{ + Type: ERC20TokenType, + Token: e.Contract, + From: e.From, + To: e.To, + Decimals: erc20c.Decimals, + Value: (*Amount)(&e.Tokens), + Name: erc20c.Name, + Symbol: erc20c.Symbol, + } + } + return tokens +} + func (w *Worker) getAddressTxids(addrDesc bchain.AddressDescriptor, mempool bool, filter *AddressFilter, maxResults int) ([]string, error) { var err error txids := make([]string, 0, 4) diff --git a/bchain/basemempool.go b/bchain/basemempool.go index ae585bcc..79fe3b18 100644 --- a/bchain/basemempool.go +++ b/bchain/basemempool.go @@ -3,6 +3,7 @@ package bchain import ( "sort" "sync" + "time" ) type addrIndex struct { @@ -27,6 +28,7 @@ type BaseMempool struct { txEntries map[string]txEntry addrDescToTx map[string][]Outpoint OnNewTxAddr OnNewTxAddrFunc + OnNewTx OnNewTxFunc } // GetTransactions returns slice of mempool transactions for given address @@ -113,3 +115,22 @@ func (m *BaseMempool) GetTransactionTime(txid string) uint32 { } return e.time } + +func (m *BaseMempool) txToMempoolTx(tx *Tx) *MempoolTx { + mtx := MempoolTx{ + Hex: tx.Hex, + Blocktime: time.Now().Unix(), + LockTime: tx.LockTime, + Txid: tx.Txid, + Version: tx.Version, + Vout: tx.Vout, + CoinSpecificData: tx.CoinSpecificData, + } + mtx.Vin = make([]MempoolVin, len(tx.Vin)) + for i, vin := range tx.Vin { + mtx.Vin[i] = MempoolVin{ + Vin: vin, + } + } + return &mtx +} diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index fd0307e5..06be7605 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -186,8 +186,8 @@ func (c *blockChainWithMetrics) CreateMempool(chain bchain.BlockChain) (bchain.M return c.b.CreateMempool(chain) } -func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { - return c.b.InitializeMempool(addrDescForOutpoint, onNewTxAddr) +func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc, onNewTx bchain.OnNewTxFunc) error { + return c.b.InitializeMempool(addrDescForOutpoint, onNewTxAddr, onNewTx) } func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error { diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index 04446601..6aba5c45 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -155,12 +155,13 @@ func (b *BitcoinRPC) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, err } // InitializeMempool creates ZeroMQ subscription and sets AddrDescForOutpointFunc to the Mempool -func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { +func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc, onNewTx bchain.OnNewTxFunc) error { if b.Mempool == nil { return errors.New("Mempool not created") } b.Mempool.AddrDescForOutpoint = addrDescForOutpoint b.Mempool.OnNewTxAddr = onNewTxAddr + b.Mempool.OnNewTx = onNewTx if b.mq == nil { mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler) if err != nil { diff --git a/bchain/coins/eth/ethparser.go b/bchain/coins/eth/ethparser.go index a9911f4f..b564ed14 100644 --- a/bchain/coins/eth/ethparser.go +++ b/bchain/coins/eth/ethparser.go @@ -480,8 +480,13 @@ type EthereumTxData struct { // GetEthereumTxData returns EthereumTxData from bchain.Tx func GetEthereumTxData(tx *bchain.Tx) *EthereumTxData { + return GetEthereumTxDataFromSpecificData(tx.CoinSpecificData) +} + +// GetEthereumTxDataFromSpecificData returns EthereumTxData from coinSpecificData +func GetEthereumTxDataFromSpecificData(coinSpecificData interface{}) *EthereumTxData { etd := EthereumTxData{Status: txStatusPending} - csd, ok := tx.CoinSpecificData.(completeTransaction) + csd, ok := coinSpecificData.(completeTransaction) if ok { if csd.Tx != nil { etd.Nonce, _ = hexutil.DecodeUint64(csd.Tx.AccountNonce) diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 21878ddd..7d5c3e6a 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -178,7 +178,7 @@ func (b *EthereumRPC) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, er } // InitializeMempool creates subscriptions to newHeads and newPendingTransactions -func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { +func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc, onNewTx bchain.OnNewTxFunc) error { if b.Mempool == nil { return errors.New("Mempool not created") } @@ -193,6 +193,7 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu } b.Mempool.OnNewTxAddr = onNewTxAddr + b.Mempool.OnNewTx = onNewTx if err = b.subscribeEvents(); err != nil { return err diff --git a/bchain/mempool_bitcoin_type.go b/bchain/mempool_bitcoin_type.go index 68882886..009073dd 100644 --- a/bchain/mempool_bitcoin_type.go +++ b/bchain/mempool_bitcoin_type.go @@ -1,11 +1,17 @@ package bchain import ( + "math/big" "time" "github.com/golang/glog" ) +type chanInputPayload struct { + tx *MempoolTx + index int +} + // MempoolBitcoinType is mempool handle. type MempoolBitcoinType struct { BaseMempool @@ -28,12 +34,12 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *Mempo } for i := 0; i < workers; i++ { go func(i int) { - chanInput := make(chan Outpoint, 1) + chanInput := make(chan chanInputPayload, 1) chanResult := make(chan *addrIndex, 1) for j := 0; j < subworkers; j++ { go func(j int) { - for input := range chanInput { - ai := m.getInputAddress(input) + for payload := range chanInput { + ai := m.getInputAddress(&payload) chanResult <- ai } }(j) @@ -51,38 +57,44 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *Mempo return m } -func (m *MempoolBitcoinType) getInputAddress(input Outpoint) *addrIndex { +func (m *MempoolBitcoinType) getInputAddress(payload *chanInputPayload) *addrIndex { var addrDesc AddressDescriptor + var value *big.Int + vin := &payload.tx.Vin[payload.index] if m.AddrDescForOutpoint != nil { - addrDesc = m.AddrDescForOutpoint(input) + addrDesc, value = m.AddrDescForOutpoint(Outpoint{vin.Txid, int32(vin.Vout)}) } if addrDesc == nil { - itx, err := m.chain.GetTransactionForMempool(input.Txid) + itx, err := m.chain.GetTransactionForMempool(vin.Txid) if err != nil { - glog.Error("cannot get transaction ", input.Txid, ": ", err) + glog.Error("cannot get transaction ", vin.Txid, ": ", err) return nil } - if int(input.Vout) >= len(itx.Vout) { - glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout) + if int(vin.Vout) >= len(itx.Vout) { + glog.Error("Vout len in transaction ", vin.Txid, " ", len(itx.Vout), " input.Vout=", vin.Vout) return nil } - addrDesc, err = m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout]) + addrDesc, err = m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[vin.Vout]) if err != nil { - glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err) + glog.Error("error in addrDesc in ", vin.Txid, " ", vin.Vout, ": ", err) return nil } + value = &itx.Vout[vin.Vout].ValueSat } - return &addrIndex{string(addrDesc), ^input.Vout} + vin.AddrDesc = addrDesc + vin.ValueSat = *value + return &addrIndex{string(addrDesc), ^int32(vin.Vout)} } -func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, chanResult chan *addrIndex) ([]addrIndex, bool) { +func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan chanInputPayload, chanResult chan *addrIndex) ([]addrIndex, bool) { tx, err := m.chain.GetTransactionForMempool(txid) if err != nil { glog.Error("cannot get transaction ", txid, ": ", err) return nil, false } glog.V(2).Info("mempool: gettxaddrs ", txid, ", ", len(tx.Vin), " inputs") + mtx := m.txToMempoolTx(tx) io := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin)) for _, output := range tx.Vout { addrDesc, err := m.chain.GetChainParser().GetAddrDescFromVout(&output) @@ -98,11 +110,12 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch } } dispatched := 0 - for _, input := range tx.Vin { + for i := range tx.Vin { + input := &tx.Vin[i] if input.Coinbase != "" { continue } - o := Outpoint{input.Txid, int32(input.Vout)} + payload := chanInputPayload{mtx, i} loop: for { select { @@ -113,7 +126,7 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch } dispatched-- // send input to be processed - case chanInput <- o: + case chanInput <- payload: dispatched++ break loop } @@ -125,6 +138,9 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch io = append(io, *ai) } } + if m.OnNewTx != nil { + m.OnNewTx(mtx) + } return io, true } diff --git a/bchain/mempool_ethereum_type.go b/bchain/mempool_ethereum_type.go index 33e44e72..91fdeeac 100644 --- a/bchain/mempool_ethereum_type.go +++ b/bchain/mempool_ethereum_type.go @@ -31,16 +31,18 @@ func NewMempoolEthereumType(chain BlockChain, mempoolTxTimeoutHours int, queryBa } } -func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) []addrIndex { +func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) ([]addrIndex, AddressDescriptor) { + var addrDesc AddressDescriptor + var err error if len(a) > 0 { - addrDesc, err := parser.GetAddrDescFromAddress(a) + addrDesc, err = parser.GetAddrDescFromAddress(a) if err != nil { glog.Error("error in input addrDesc in ", a, ": ", err) - return io + return io, nil } io = append(io, addrIndex{string(addrDesc), i}) } - return io + return io, addrDesc } func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry, bool) { @@ -51,9 +53,10 @@ func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry } return txEntry{}, false } + mtx := m.txToMempoolTx(tx) parser := m.chain.GetChainParser() - addrIndexes := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin)) - for _, output := range tx.Vout { + addrIndexes := make([]addrIndex, 0, len(mtx.Vout)+len(mtx.Vin)) + for _, output := range mtx.Vout { addrDesc, err := parser.GetAddrDescFromVout(&output) if err != nil { if err != ErrAddressMissing { @@ -65,18 +68,20 @@ func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry addrIndexes = append(addrIndexes, addrIndex{string(addrDesc), int32(output.N)}) } } - for _, input := range tx.Vin { + for j := range mtx.Vin { + input := &mtx.Vin[j] for i, a := range input.Addresses { - addrIndexes = appendAddress(addrIndexes, ^int32(i), a, parser) + addrIndexes, input.AddrDesc = appendAddress(addrIndexes, ^int32(i), a, parser) } } t, err := parser.EthereumTypeGetErc20FromTx(tx) if err != nil { glog.Error("GetErc20FromTx for tx ", txid, ", ", err) } else { + mtx.Erc20 = t for i := range t { - addrIndexes = appendAddress(addrIndexes, ^int32(i+1), t[i].From, parser) - addrIndexes = appendAddress(addrIndexes, int32(i+1), t[i].To, parser) + addrIndexes, _ = appendAddress(addrIndexes, ^int32(i+1), t[i].From, parser) + addrIndexes, _ = appendAddress(addrIndexes, int32(i+1), t[i].To, parser) } } if m.OnNewTxAddr != nil { @@ -88,6 +93,9 @@ func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry } } } + if m.OnNewTx != nil { + m.OnNewTx(mtx) + } return txEntry{addrIndexes: addrIndexes, time: txTime}, true } diff --git a/bchain/types.go b/bchain/types.go index 0df7f506..6f536153 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -49,7 +49,7 @@ type ScriptSig struct { Hex string `json:"hex"` } -// Vin contains data about tx output +// Vin contains data about tx input type Vin struct { Coinbase string `json:"coinbase"` Txid string `json:"txid"` @@ -92,6 +92,27 @@ type Tx struct { CoinSpecificData interface{} `json:"-"` } +// MempoolVin contains data about tx input +type MempoolVin struct { + Vin + AddrDesc AddressDescriptor `json:"-"` + ValueSat big.Int +} + +// MempoolTx is blockchain transaction in mempool +// optimized for onNewTx notification +type MempoolTx struct { + Hex string `json:"hex"` + Txid string `json:"txid"` + Version int32 `json:"version"` + LockTime uint32 `json:"locktime"` + Vin []MempoolVin `json:"vin"` + Vout []Vout `json:"vout"` + Blocktime int64 `json:"blocktime,omitempty"` + Erc20 []Erc20Transfer `json:"-"` + CoinSpecificData interface{} `json:"-"` +} + // Block is block header and list of transactions type Block struct { BlockHeader @@ -211,8 +232,11 @@ type OnNewBlockFunc func(hash string, height uint32) // OnNewTxAddrFunc is used to send notification about a new transaction/address type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor) -// AddrDescForOutpointFunc defines function that returns address descriptorfor given outpoint or nil if outpoint not found -type AddrDescForOutpointFunc func(outpoint Outpoint) AddressDescriptor +// OnNewTxFunc is used to send notification about a new transaction/address +type OnNewTxFunc func(tx *MempoolTx) + +// AddrDescForOutpointFunc returns address descriptor and value for given outpoint or nil if outpoint not found +type AddrDescForOutpointFunc func(outpoint Outpoint) (AddressDescriptor, *big.Int) // BlockChain defines common interface to block chain daemon type BlockChain interface { @@ -222,7 +246,7 @@ type BlockChain interface { // create mempool but do not initialize it CreateMempool(BlockChain) (Mempool, error) // initialize mempool, create ZeroMQ (or other) subscription - InitializeMempool(AddrDescForOutpointFunc, OnNewTxAddrFunc) error + InitializeMempool(AddrDescForOutpointFunc, OnNewTxAddrFunc, OnNewTxFunc) error // shutdown mempool, ZeroMQ and block chain connections Shutdown(ctx context.Context) error // chain info diff --git a/blockbook.go b/blockbook.go index 3c222ca6..85d41701 100644 --- a/blockbook.go +++ b/blockbook.go @@ -100,6 +100,7 @@ var ( internalState *common.InternalState callbacksOnNewBlock []bchain.OnNewBlockFunc callbacksOnNewTxAddr []bchain.OnNewTxAddrFunc + callbacksOnNewTx []bchain.OnNewTxFunc callbacksOnNewFiatRatesTicker []fiat.OnNewFiatRatesTicker chanOsSignal chan os.Signal inShutdown int32 @@ -298,7 +299,7 @@ func mainWithExitCode() int { if chain.GetChainParser().GetChainType() == bchain.ChainBitcoinType { addrDescForOutpoint = index.AddrDescForOutpoint } - err = chain.InitializeMempool(addrDescForOutpoint, onNewTxAddr) + err = chain.InitializeMempool(addrDescForOutpoint, onNewTxAddr, onNewTx) if err != nil { glog.Error("initializeMempool ", err) return exitCodeFatal @@ -319,6 +320,7 @@ func mainWithExitCode() int { // start full public interface callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock) callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr) + callbacksOnNewTx = append(callbacksOnNewTx, publicServer.OnNewTx) callbacksOnNewFiatRatesTicker = append(callbacksOnNewFiatRatesTicker, publicServer.OnNewFiatRatesTicker) publicServer.ConnectFullPublicInterface() } @@ -620,6 +622,12 @@ func onNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor) { } } +func onNewTx(tx *bchain.MempoolTx) { + for _, c := range callbacksOnNewTx { + c(tx) + } +} + func pushSynchronizationHandler(nt bchain.NotificationType) { glog.V(1).Info("MQ: notification ", nt) if atomic.LoadInt32(&inShutdown) != 0 { diff --git a/db/rocksdb.go b/db/rocksdb.go index f7696a84..84e716a1 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -1016,23 +1016,23 @@ func (d *RocksDB) GetTxAddresses(txid string) (*TxAddresses, error) { return d.getTxAddresses(btxID) } -// AddrDescForOutpoint defines function that returns address descriptorfor given outpoint or nil if outpoint not found -func (d *RocksDB) AddrDescForOutpoint(outpoint bchain.Outpoint) bchain.AddressDescriptor { +// AddrDescForOutpoint is a function that returns address descriptor and value for given outpoint or nil if outpoint not found +func (d *RocksDB) AddrDescForOutpoint(outpoint bchain.Outpoint) (bchain.AddressDescriptor, *big.Int) { ta, err := d.GetTxAddresses(outpoint.Txid) if err != nil || ta == nil { - return nil + return nil, nil } if outpoint.Vout < 0 { vin := ^outpoint.Vout if len(ta.Inputs) <= int(vin) { - return nil + return nil, nil } - return ta.Inputs[vin].AddrDesc + return ta.Inputs[vin].AddrDesc, &ta.Inputs[vin].ValueSat } if len(ta.Outputs) <= int(outpoint.Vout) { - return nil + return nil, nil } - return ta.Outputs[outpoint.Vout].AddrDesc + return ta.Outputs[outpoint.Vout].AddrDesc, &ta.Outputs[outpoint.Vout].ValueSat } func packTxAddresses(ta *TxAddresses, buf []byte, varBuf []byte) []byte { diff --git a/server/public.go b/server/public.go index 7edc6320..95754991 100644 --- a/server/public.go +++ b/server/public.go @@ -219,10 +219,14 @@ func (s *PublicServer) OnNewFiatRatesTicker(ticker *db.CurrencyRatesTicker) { s.websocket.OnNewFiatRatesTicker(ticker) } -// OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block +// OnNewTxAddr notifies users subscribed to notification about new tx func (s *PublicServer) OnNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor) { s.socketio.OnNewTxAddr(tx.Txid, desc) - s.websocket.OnNewTxAddr(tx, desc) +} + +// OnNewTx notifies users subscribed to notification about new tx +func (s *PublicServer) OnNewTx(tx *bchain.MempoolTx) { + s.websocket.OnNewTx(tx) } func (s *PublicServer) txRedirect(w http.ResponseWriter, r *http.Request) { diff --git a/server/websocket.go b/server/websocket.go index 436c61af..db019c18 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -757,47 +757,91 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") } -// OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address -func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor) { - // check if there is any subscription but release the lock immediately, GetTransactionFromBchainTx may take some time - s.addressSubscriptionsLock.Lock() - as, ok := s.addressSubscriptions[string(addrDesc)] - lenAs := len(as) - s.addressSubscriptionsLock.Unlock() - if ok && lenAs > 0 { - addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc) - if err != nil { - glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc) - return +func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx) { + addrDesc := bchain.AddressDescriptor(stringAddressDescriptor) + addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc) + if err != nil { + glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc) + return + } + if len(addr) == 1 { + data := struct { + Address string `json:"address"` + Tx *api.Tx `json:"tx"` + }{ + Address: addr[0], + Tx: tx, } - if len(addr) == 1 { - atx, err := s.api.GetTransactionFromBchainTx(tx, 0, false, false) - if err != nil { - glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid) - return - } - data := struct { - Address string `json:"address"` - Tx *api.Tx `json:"tx"` - }{ - Address: addr[0], - Tx: atx, - } - // get the list of subscriptions again, this time keep the lock - s.addressSubscriptionsLock.Lock() - defer s.addressSubscriptionsLock.Unlock() - as, ok = s.addressSubscriptions[string(addrDesc)] - if ok { - for c, id := range as { - if c.IsAlive() { - c.out <- &websocketRes{ - ID: id, - Data: &data, - } + // get the list of subscriptions again, this time keep the lock + s.addressSubscriptionsLock.Lock() + defer s.addressSubscriptionsLock.Unlock() + as, ok := s.addressSubscriptions[stringAddressDescriptor] + if ok { + for c, id := range as { + if c.IsAlive() { + c.out <- &websocketRes{ + ID: id, + Data: &data, } } - glog.Info("broadcasting new tx ", tx.Txid, " for addr ", addr[0], " to ", len(as), " channels") } + glog.Info("broadcasting new tx ", tx.Txid, ", addr ", addr[0], " to ", len(as), " channels") + } + } +} + +// OnNewTx is a callback that broadcasts info about a tx affecting subscribed address +func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) { + // check if there is any subscription in inputs, outputs and erc20 + // release the lock immediately, GetTransactionFromMempoolTx is potentially slow + subscribed := make(map[string]struct{}) + s.addressSubscriptionsLock.Lock() + for i := range tx.Vin { + sad := string(tx.Vin[i].AddrDesc) + if len(sad) > 0 { + as, ok := s.addressSubscriptions[sad] + if ok && len(as) > 0 { + subscribed[sad] = struct{}{} + } + } + } + for i := range tx.Vout { + addrDesc, err := s.chainParser.GetAddrDescFromVout(&tx.Vout[i]) + if err == nil && len(addrDesc) > 0 { + sad := string(addrDesc) + as, ok := s.addressSubscriptions[sad] + if ok && len(as) > 0 { + subscribed[sad] = struct{}{} + } + } + } + for i := range tx.Erc20 { + addrDesc, err := s.chainParser.GetAddrDescFromAddress(tx.Erc20[i].From) + if err == nil && len(addrDesc) > 0 { + sad := string(addrDesc) + as, ok := s.addressSubscriptions[sad] + if ok && len(as) > 0 { + subscribed[sad] = struct{}{} + } + } + addrDesc, err = s.chainParser.GetAddrDescFromAddress(tx.Erc20[i].To) + if err == nil && len(addrDesc) > 0 { + sad := string(addrDesc) + as, ok := s.addressSubscriptions[sad] + if ok && len(as) > 0 { + subscribed[sad] = struct{}{} + } + } + } + s.addressSubscriptionsLock.Unlock() + if len(subscribed) > 0 { + atx, err := s.api.GetTransactionFromMempoolTx(tx) + if err != nil { + glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid) + return + } + for stringAddressDescriptor := range subscribed { + s.sendOnNewTxAddr(stringAddressDescriptor, atx) } } } diff --git a/tests/dbtestdata/fakechain.go b/tests/dbtestdata/fakechain.go index 4a104345..bf19dfb9 100644 --- a/tests/dbtestdata/fakechain.go +++ b/tests/dbtestdata/fakechain.go @@ -26,7 +26,7 @@ func (c *fakeBlockChain) Initialize() error { return nil } -func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error { +func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc, onNewTx bchain.OnNewTxFunc) error { return nil } diff --git a/tests/integration.go b/tests/integration.go index 24f65ad1..7b98add2 100644 --- a/tests/integration.go +++ b/tests/integration.go @@ -182,7 +182,7 @@ func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, bc return nil, nil, fmt.Errorf("Mempool creation failed: %s", err) } - err = chain.InitializeMempool(nil, nil) + err = chain.InitializeMempool(nil, nil, nil) if err != nil { return nil, nil, fmt.Errorf("Mempool initialization failed: %s", err) }