Send websocket notification on new tx for input addresses

pull/436/head
Martin Boehm 2020-05-24 17:58:29 +02:00
parent bc001ce3a3
commit 0a3ea6e225
15 changed files with 344 additions and 110 deletions

View File

@ -253,32 +253,7 @@ func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height int, spe
if err != nil {
glog.Errorf("GetErc20FromTx error %v, %v", err, bchainTx)
}
tokens = make([]TokenTransfer, len(ets))
for i := range ets {
e := &ets[i]
cd, err := w.chainParser.GetAddrDescFromAddress(e.Contract)
if err != nil {
glog.Errorf("GetAddrDescFromAddress error %v, contract %v", err, e.Contract)
continue
}
erc20c, err := w.chain.EthereumTypeGetErc20ContractInfo(cd)
if err != nil {
glog.Errorf("GetErc20ContractInfo error %v, contract %v", err, e.Contract)
}
if erc20c == nil {
erc20c = &bchain.Erc20Contract{Name: e.Contract}
}
tokens[i] = TokenTransfer{
Type: ERC20TokenType,
Token: e.Contract,
From: e.From,
To: e.To,
Decimals: erc20c.Decimals,
Value: (*Amount)(&e.Tokens),
Name: erc20c.Name,
Symbol: erc20c.Symbol,
}
}
tokens = w.getTokensFromErc20(ets)
ethTxData := eth.GetEthereumTxData(bchainTx)
// mempool txs do not have fees yet
if ethTxData.GasUsed != nil {
@ -332,6 +307,133 @@ func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height int, spe
return r, nil
}
// GetTransactionFromMempoolTx converts bchain.MempoolTx to Tx, with limited amount of data
// it is not doing any request to backend or to db
func (w *Worker) GetTransactionFromMempoolTx(mempoolTx *bchain.MempoolTx) (*Tx, error) {
var err error
var valInSat, valOutSat, feesSat big.Int
var pValInSat *big.Int
var tokens []TokenTransfer
var ethSpecific *EthereumSpecific
vins := make([]Vin, len(mempoolTx.Vin))
rbf := false
for i := range mempoolTx.Vin {
bchainVin := &mempoolTx.Vin[i]
vin := &vins[i]
vin.Txid = bchainVin.Txid
vin.N = i
vin.Vout = bchainVin.Vout
vin.Sequence = int64(bchainVin.Sequence)
// detect explicit Replace-by-Fee transactions as defined by BIP125
if bchainVin.Sequence < 0xffffffff-1 {
rbf = true
}
vin.Hex = bchainVin.ScriptSig.Hex
vin.Coinbase = bchainVin.Coinbase
if w.chainType == bchain.ChainBitcoinType {
// bchainVin.Txid=="" is coinbase transaction
if bchainVin.Txid != "" {
vin.ValueSat = (*Amount)(&bchainVin.ValueSat)
vin.AddrDesc = bchainVin.AddrDesc
vin.Addresses, vin.IsAddress, err = w.chainParser.GetAddressesFromAddrDesc(vin.AddrDesc)
if vin.ValueSat != nil {
valInSat.Add(&valInSat, (*big.Int)(vin.ValueSat))
}
}
} else if w.chainType == bchain.ChainEthereumType {
if len(bchainVin.Addresses) > 0 {
vin.AddrDesc, err = w.chainParser.GetAddrDescFromAddress(bchainVin.Addresses[0])
if err != nil {
glog.Errorf("GetAddrDescFromAddress error %v, tx %v, bchainVin %v", err, mempoolTx.Txid, bchainVin)
}
vin.Addresses = bchainVin.Addresses
vin.IsAddress = true
}
}
}
vouts := make([]Vout, len(mempoolTx.Vout))
for i := range mempoolTx.Vout {
bchainVout := &mempoolTx.Vout[i]
vout := &vouts[i]
vout.N = i
vout.ValueSat = (*Amount)(&bchainVout.ValueSat)
valOutSat.Add(&valOutSat, &bchainVout.ValueSat)
vout.Hex = bchainVout.ScriptPubKey.Hex
vout.AddrDesc, vout.Addresses, vout.IsAddress, err = w.getAddressesFromVout(bchainVout)
if err != nil {
glog.V(2).Infof("getAddressesFromVout error %v, %v, output %v", err, mempoolTx.Txid, bchainVout.N)
}
}
if w.chainType == bchain.ChainBitcoinType {
// for coinbase transactions valIn is 0
feesSat.Sub(&valInSat, &valOutSat)
if feesSat.Sign() == -1 {
feesSat.SetUint64(0)
}
pValInSat = &valInSat
} else if w.chainType == bchain.ChainEthereumType {
if len(mempoolTx.Vout) > 0 {
valOutSat = mempoolTx.Vout[0].ValueSat
}
tokens = w.getTokensFromErc20(mempoolTx.Erc20)
ethTxData := eth.GetEthereumTxDataFromSpecificData(mempoolTx.CoinSpecificData)
ethSpecific = &EthereumSpecific{
GasLimit: ethTxData.GasLimit,
GasPrice: (*Amount)(ethTxData.GasPrice),
GasUsed: ethTxData.GasUsed,
Nonce: ethTxData.Nonce,
Status: ethTxData.Status,
Data: ethTxData.Data,
}
}
r := &Tx{
Blocktime: mempoolTx.Blocktime,
FeesSat: (*Amount)(&feesSat),
Locktime: mempoolTx.LockTime,
Txid: mempoolTx.Txid,
ValueInSat: (*Amount)(pValInSat),
ValueOutSat: (*Amount)(&valOutSat),
Version: mempoolTx.Version,
Hex: mempoolTx.Hex,
Rbf: rbf,
Vin: vins,
Vout: vouts,
TokenTransfers: tokens,
EthereumSpecific: ethSpecific,
}
return r, nil
}
func (w *Worker) getTokensFromErc20(erc20 []bchain.Erc20Transfer) []TokenTransfer {
tokens := make([]TokenTransfer, len(erc20))
for i := range erc20 {
e := &erc20[i]
cd, err := w.chainParser.GetAddrDescFromAddress(e.Contract)
if err != nil {
glog.Errorf("GetAddrDescFromAddress error %v, contract %v", err, e.Contract)
continue
}
erc20c, err := w.chain.EthereumTypeGetErc20ContractInfo(cd)
if err != nil {
glog.Errorf("GetErc20ContractInfo error %v, contract %v", err, e.Contract)
}
if erc20c == nil {
erc20c = &bchain.Erc20Contract{Name: e.Contract}
}
tokens[i] = TokenTransfer{
Type: ERC20TokenType,
Token: e.Contract,
From: e.From,
To: e.To,
Decimals: erc20c.Decimals,
Value: (*Amount)(&e.Tokens),
Name: erc20c.Name,
Symbol: erc20c.Symbol,
}
}
return tokens
}
func (w *Worker) getAddressTxids(addrDesc bchain.AddressDescriptor, mempool bool, filter *AddressFilter, maxResults int) ([]string, error) {
var err error
txids := make([]string, 0, 4)

View File

@ -3,6 +3,7 @@ package bchain
import (
"sort"
"sync"
"time"
)
type addrIndex struct {
@ -27,6 +28,7 @@ type BaseMempool struct {
txEntries map[string]txEntry
addrDescToTx map[string][]Outpoint
OnNewTxAddr OnNewTxAddrFunc
OnNewTx OnNewTxFunc
}
// GetTransactions returns slice of mempool transactions for given address
@ -113,3 +115,22 @@ func (m *BaseMempool) GetTransactionTime(txid string) uint32 {
}
return e.time
}
func (m *BaseMempool) txToMempoolTx(tx *Tx) *MempoolTx {
mtx := MempoolTx{
Hex: tx.Hex,
Blocktime: time.Now().Unix(),
LockTime: tx.LockTime,
Txid: tx.Txid,
Version: tx.Version,
Vout: tx.Vout,
CoinSpecificData: tx.CoinSpecificData,
}
mtx.Vin = make([]MempoolVin, len(tx.Vin))
for i, vin := range tx.Vin {
mtx.Vin[i] = MempoolVin{
Vin: vin,
}
}
return &mtx
}

View File

@ -186,8 +186,8 @@ func (c *blockChainWithMetrics) CreateMempool(chain bchain.BlockChain) (bchain.M
return c.b.CreateMempool(chain)
}
func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
return c.b.InitializeMempool(addrDescForOutpoint, onNewTxAddr)
func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc, onNewTx bchain.OnNewTxFunc) error {
return c.b.InitializeMempool(addrDescForOutpoint, onNewTxAddr, onNewTx)
}
func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error {

View File

@ -155,12 +155,13 @@ func (b *BitcoinRPC) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, err
}
// InitializeMempool creates ZeroMQ subscription and sets AddrDescForOutpointFunc to the Mempool
func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc, onNewTx bchain.OnNewTxFunc) error {
if b.Mempool == nil {
return errors.New("Mempool not created")
}
b.Mempool.AddrDescForOutpoint = addrDescForOutpoint
b.Mempool.OnNewTxAddr = onNewTxAddr
b.Mempool.OnNewTx = onNewTx
if b.mq == nil {
mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler)
if err != nil {

View File

@ -480,8 +480,13 @@ type EthereumTxData struct {
// GetEthereumTxData returns EthereumTxData from bchain.Tx
func GetEthereumTxData(tx *bchain.Tx) *EthereumTxData {
return GetEthereumTxDataFromSpecificData(tx.CoinSpecificData)
}
// GetEthereumTxDataFromSpecificData returns EthereumTxData from coinSpecificData
func GetEthereumTxDataFromSpecificData(coinSpecificData interface{}) *EthereumTxData {
etd := EthereumTxData{Status: txStatusPending}
csd, ok := tx.CoinSpecificData.(completeTransaction)
csd, ok := coinSpecificData.(completeTransaction)
if ok {
if csd.Tx != nil {
etd.Nonce, _ = hexutil.DecodeUint64(csd.Tx.AccountNonce)

View File

@ -178,7 +178,7 @@ func (b *EthereumRPC) CreateMempool(chain bchain.BlockChain) (bchain.Mempool, er
}
// InitializeMempool creates subscriptions to newHeads and newPendingTransactions
func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc, onNewTx bchain.OnNewTxFunc) error {
if b.Mempool == nil {
return errors.New("Mempool not created")
}
@ -193,6 +193,7 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu
}
b.Mempool.OnNewTxAddr = onNewTxAddr
b.Mempool.OnNewTx = onNewTx
if err = b.subscribeEvents(); err != nil {
return err

View File

@ -1,11 +1,17 @@
package bchain
import (
"math/big"
"time"
"github.com/golang/glog"
)
type chanInputPayload struct {
tx *MempoolTx
index int
}
// MempoolBitcoinType is mempool handle.
type MempoolBitcoinType struct {
BaseMempool
@ -28,12 +34,12 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *Mempo
}
for i := 0; i < workers; i++ {
go func(i int) {
chanInput := make(chan Outpoint, 1)
chanInput := make(chan chanInputPayload, 1)
chanResult := make(chan *addrIndex, 1)
for j := 0; j < subworkers; j++ {
go func(j int) {
for input := range chanInput {
ai := m.getInputAddress(input)
for payload := range chanInput {
ai := m.getInputAddress(&payload)
chanResult <- ai
}
}(j)
@ -51,38 +57,44 @@ func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *Mempo
return m
}
func (m *MempoolBitcoinType) getInputAddress(input Outpoint) *addrIndex {
func (m *MempoolBitcoinType) getInputAddress(payload *chanInputPayload) *addrIndex {
var addrDesc AddressDescriptor
var value *big.Int
vin := &payload.tx.Vin[payload.index]
if m.AddrDescForOutpoint != nil {
addrDesc = m.AddrDescForOutpoint(input)
addrDesc, value = m.AddrDescForOutpoint(Outpoint{vin.Txid, int32(vin.Vout)})
}
if addrDesc == nil {
itx, err := m.chain.GetTransactionForMempool(input.Txid)
itx, err := m.chain.GetTransactionForMempool(vin.Txid)
if err != nil {
glog.Error("cannot get transaction ", input.Txid, ": ", err)
glog.Error("cannot get transaction ", vin.Txid, ": ", err)
return nil
}
if int(input.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", input.Txid, " ", len(itx.Vout), " input.Vout=", input.Vout)
if int(vin.Vout) >= len(itx.Vout) {
glog.Error("Vout len in transaction ", vin.Txid, " ", len(itx.Vout), " input.Vout=", vin.Vout)
return nil
}
addrDesc, err = m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[input.Vout])
addrDesc, err = m.chain.GetChainParser().GetAddrDescFromVout(&itx.Vout[vin.Vout])
if err != nil {
glog.Error("error in addrDesc in ", input.Txid, " ", input.Vout, ": ", err)
glog.Error("error in addrDesc in ", vin.Txid, " ", vin.Vout, ": ", err)
return nil
}
value = &itx.Vout[vin.Vout].ValueSat
}
return &addrIndex{string(addrDesc), ^input.Vout}
vin.AddrDesc = addrDesc
vin.ValueSat = *value
return &addrIndex{string(addrDesc), ^int32(vin.Vout)}
}
func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, chanResult chan *addrIndex) ([]addrIndex, bool) {
func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan chanInputPayload, chanResult chan *addrIndex) ([]addrIndex, bool) {
tx, err := m.chain.GetTransactionForMempool(txid)
if err != nil {
glog.Error("cannot get transaction ", txid, ": ", err)
return nil, false
}
glog.V(2).Info("mempool: gettxaddrs ", txid, ", ", len(tx.Vin), " inputs")
mtx := m.txToMempoolTx(tx)
io := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin))
for _, output := range tx.Vout {
addrDesc, err := m.chain.GetChainParser().GetAddrDescFromVout(&output)
@ -98,11 +110,12 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch
}
}
dispatched := 0
for _, input := range tx.Vin {
for i := range tx.Vin {
input := &tx.Vin[i]
if input.Coinbase != "" {
continue
}
o := Outpoint{input.Txid, int32(input.Vout)}
payload := chanInputPayload{mtx, i}
loop:
for {
select {
@ -113,7 +126,7 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch
}
dispatched--
// send input to be processed
case chanInput <- o:
case chanInput <- payload:
dispatched++
break loop
}
@ -125,6 +138,9 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch
io = append(io, *ai)
}
}
if m.OnNewTx != nil {
m.OnNewTx(mtx)
}
return io, true
}

View File

@ -31,16 +31,18 @@ func NewMempoolEthereumType(chain BlockChain, mempoolTxTimeoutHours int, queryBa
}
}
func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) []addrIndex {
func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) ([]addrIndex, AddressDescriptor) {
var addrDesc AddressDescriptor
var err error
if len(a) > 0 {
addrDesc, err := parser.GetAddrDescFromAddress(a)
addrDesc, err = parser.GetAddrDescFromAddress(a)
if err != nil {
glog.Error("error in input addrDesc in ", a, ": ", err)
return io
return io, nil
}
io = append(io, addrIndex{string(addrDesc), i})
}
return io
return io, addrDesc
}
func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry, bool) {
@ -51,9 +53,10 @@ func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry
}
return txEntry{}, false
}
mtx := m.txToMempoolTx(tx)
parser := m.chain.GetChainParser()
addrIndexes := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin))
for _, output := range tx.Vout {
addrIndexes := make([]addrIndex, 0, len(mtx.Vout)+len(mtx.Vin))
for _, output := range mtx.Vout {
addrDesc, err := parser.GetAddrDescFromVout(&output)
if err != nil {
if err != ErrAddressMissing {
@ -65,18 +68,20 @@ func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry
addrIndexes = append(addrIndexes, addrIndex{string(addrDesc), int32(output.N)})
}
}
for _, input := range tx.Vin {
for j := range mtx.Vin {
input := &mtx.Vin[j]
for i, a := range input.Addresses {
addrIndexes = appendAddress(addrIndexes, ^int32(i), a, parser)
addrIndexes, input.AddrDesc = appendAddress(addrIndexes, ^int32(i), a, parser)
}
}
t, err := parser.EthereumTypeGetErc20FromTx(tx)
if err != nil {
glog.Error("GetErc20FromTx for tx ", txid, ", ", err)
} else {
mtx.Erc20 = t
for i := range t {
addrIndexes = appendAddress(addrIndexes, ^int32(i+1), t[i].From, parser)
addrIndexes = appendAddress(addrIndexes, int32(i+1), t[i].To, parser)
addrIndexes, _ = appendAddress(addrIndexes, ^int32(i+1), t[i].From, parser)
addrIndexes, _ = appendAddress(addrIndexes, int32(i+1), t[i].To, parser)
}
}
if m.OnNewTxAddr != nil {
@ -88,6 +93,9 @@ func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry
}
}
}
if m.OnNewTx != nil {
m.OnNewTx(mtx)
}
return txEntry{addrIndexes: addrIndexes, time: txTime}, true
}

View File

@ -49,7 +49,7 @@ type ScriptSig struct {
Hex string `json:"hex"`
}
// Vin contains data about tx output
// Vin contains data about tx input
type Vin struct {
Coinbase string `json:"coinbase"`
Txid string `json:"txid"`
@ -92,6 +92,27 @@ type Tx struct {
CoinSpecificData interface{} `json:"-"`
}
// MempoolVin contains data about tx input
type MempoolVin struct {
Vin
AddrDesc AddressDescriptor `json:"-"`
ValueSat big.Int
}
// MempoolTx is blockchain transaction in mempool
// optimized for onNewTx notification
type MempoolTx struct {
Hex string `json:"hex"`
Txid string `json:"txid"`
Version int32 `json:"version"`
LockTime uint32 `json:"locktime"`
Vin []MempoolVin `json:"vin"`
Vout []Vout `json:"vout"`
Blocktime int64 `json:"blocktime,omitempty"`
Erc20 []Erc20Transfer `json:"-"`
CoinSpecificData interface{} `json:"-"`
}
// Block is block header and list of transactions
type Block struct {
BlockHeader
@ -211,8 +232,11 @@ type OnNewBlockFunc func(hash string, height uint32)
// OnNewTxAddrFunc is used to send notification about a new transaction/address
type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor)
// AddrDescForOutpointFunc defines function that returns address descriptorfor given outpoint or nil if outpoint not found
type AddrDescForOutpointFunc func(outpoint Outpoint) AddressDescriptor
// OnNewTxFunc is used to send notification about a new transaction/address
type OnNewTxFunc func(tx *MempoolTx)
// AddrDescForOutpointFunc returns address descriptor and value for given outpoint or nil if outpoint not found
type AddrDescForOutpointFunc func(outpoint Outpoint) (AddressDescriptor, *big.Int)
// BlockChain defines common interface to block chain daemon
type BlockChain interface {
@ -222,7 +246,7 @@ type BlockChain interface {
// create mempool but do not initialize it
CreateMempool(BlockChain) (Mempool, error)
// initialize mempool, create ZeroMQ (or other) subscription
InitializeMempool(AddrDescForOutpointFunc, OnNewTxAddrFunc) error
InitializeMempool(AddrDescForOutpointFunc, OnNewTxAddrFunc, OnNewTxFunc) error
// shutdown mempool, ZeroMQ and block chain connections
Shutdown(ctx context.Context) error
// chain info

View File

@ -100,6 +100,7 @@ var (
internalState *common.InternalState
callbacksOnNewBlock []bchain.OnNewBlockFunc
callbacksOnNewTxAddr []bchain.OnNewTxAddrFunc
callbacksOnNewTx []bchain.OnNewTxFunc
callbacksOnNewFiatRatesTicker []fiat.OnNewFiatRatesTicker
chanOsSignal chan os.Signal
inShutdown int32
@ -298,7 +299,7 @@ func mainWithExitCode() int {
if chain.GetChainParser().GetChainType() == bchain.ChainBitcoinType {
addrDescForOutpoint = index.AddrDescForOutpoint
}
err = chain.InitializeMempool(addrDescForOutpoint, onNewTxAddr)
err = chain.InitializeMempool(addrDescForOutpoint, onNewTxAddr, onNewTx)
if err != nil {
glog.Error("initializeMempool ", err)
return exitCodeFatal
@ -319,6 +320,7 @@ func mainWithExitCode() int {
// start full public interface
callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock)
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr)
callbacksOnNewTx = append(callbacksOnNewTx, publicServer.OnNewTx)
callbacksOnNewFiatRatesTicker = append(callbacksOnNewFiatRatesTicker, publicServer.OnNewFiatRatesTicker)
publicServer.ConnectFullPublicInterface()
}
@ -620,6 +622,12 @@ func onNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor) {
}
}
func onNewTx(tx *bchain.MempoolTx) {
for _, c := range callbacksOnNewTx {
c(tx)
}
}
func pushSynchronizationHandler(nt bchain.NotificationType) {
glog.V(1).Info("MQ: notification ", nt)
if atomic.LoadInt32(&inShutdown) != 0 {

View File

@ -1016,23 +1016,23 @@ func (d *RocksDB) GetTxAddresses(txid string) (*TxAddresses, error) {
return d.getTxAddresses(btxID)
}
// AddrDescForOutpoint defines function that returns address descriptorfor given outpoint or nil if outpoint not found
func (d *RocksDB) AddrDescForOutpoint(outpoint bchain.Outpoint) bchain.AddressDescriptor {
// AddrDescForOutpoint is a function that returns address descriptor and value for given outpoint or nil if outpoint not found
func (d *RocksDB) AddrDescForOutpoint(outpoint bchain.Outpoint) (bchain.AddressDescriptor, *big.Int) {
ta, err := d.GetTxAddresses(outpoint.Txid)
if err != nil || ta == nil {
return nil
return nil, nil
}
if outpoint.Vout < 0 {
vin := ^outpoint.Vout
if len(ta.Inputs) <= int(vin) {
return nil
return nil, nil
}
return ta.Inputs[vin].AddrDesc
return ta.Inputs[vin].AddrDesc, &ta.Inputs[vin].ValueSat
}
if len(ta.Outputs) <= int(outpoint.Vout) {
return nil
return nil, nil
}
return ta.Outputs[outpoint.Vout].AddrDesc
return ta.Outputs[outpoint.Vout].AddrDesc, &ta.Outputs[outpoint.Vout].ValueSat
}
func packTxAddresses(ta *TxAddresses, buf []byte, varBuf []byte) []byte {

View File

@ -219,10 +219,14 @@ func (s *PublicServer) OnNewFiatRatesTicker(ticker *db.CurrencyRatesTicker) {
s.websocket.OnNewFiatRatesTicker(ticker)
}
// OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block
// OnNewTxAddr notifies users subscribed to notification about new tx
func (s *PublicServer) OnNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor) {
s.socketio.OnNewTxAddr(tx.Txid, desc)
s.websocket.OnNewTxAddr(tx, desc)
}
// OnNewTx notifies users subscribed to notification about new tx
func (s *PublicServer) OnNewTx(tx *bchain.MempoolTx) {
s.websocket.OnNewTx(tx)
}
func (s *PublicServer) txRedirect(w http.ResponseWriter, r *http.Request) {

View File

@ -757,47 +757,91 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
}
// OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address
func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor) {
// check if there is any subscription but release the lock immediately, GetTransactionFromBchainTx may take some time
s.addressSubscriptionsLock.Lock()
as, ok := s.addressSubscriptions[string(addrDesc)]
lenAs := len(as)
s.addressSubscriptionsLock.Unlock()
if ok && lenAs > 0 {
addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
if err != nil {
glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc)
return
func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx) {
addrDesc := bchain.AddressDescriptor(stringAddressDescriptor)
addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
if err != nil {
glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc)
return
}
if len(addr) == 1 {
data := struct {
Address string `json:"address"`
Tx *api.Tx `json:"tx"`
}{
Address: addr[0],
Tx: tx,
}
if len(addr) == 1 {
atx, err := s.api.GetTransactionFromBchainTx(tx, 0, false, false)
if err != nil {
glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid)
return
}
data := struct {
Address string `json:"address"`
Tx *api.Tx `json:"tx"`
}{
Address: addr[0],
Tx: atx,
}
// get the list of subscriptions again, this time keep the lock
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
as, ok = s.addressSubscriptions[string(addrDesc)]
if ok {
for c, id := range as {
if c.IsAlive() {
c.out <- &websocketRes{
ID: id,
Data: &data,
}
// get the list of subscriptions again, this time keep the lock
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
as, ok := s.addressSubscriptions[stringAddressDescriptor]
if ok {
for c, id := range as {
if c.IsAlive() {
c.out <- &websocketRes{
ID: id,
Data: &data,
}
}
glog.Info("broadcasting new tx ", tx.Txid, " for addr ", addr[0], " to ", len(as), " channels")
}
glog.Info("broadcasting new tx ", tx.Txid, ", addr ", addr[0], " to ", len(as), " channels")
}
}
}
// OnNewTx is a callback that broadcasts info about a tx affecting subscribed address
func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
// check if there is any subscription in inputs, outputs and erc20
// release the lock immediately, GetTransactionFromMempoolTx is potentially slow
subscribed := make(map[string]struct{})
s.addressSubscriptionsLock.Lock()
for i := range tx.Vin {
sad := string(tx.Vin[i].AddrDesc)
if len(sad) > 0 {
as, ok := s.addressSubscriptions[sad]
if ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
}
}
for i := range tx.Vout {
addrDesc, err := s.chainParser.GetAddrDescFromVout(&tx.Vout[i])
if err == nil && len(addrDesc) > 0 {
sad := string(addrDesc)
as, ok := s.addressSubscriptions[sad]
if ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
}
}
for i := range tx.Erc20 {
addrDesc, err := s.chainParser.GetAddrDescFromAddress(tx.Erc20[i].From)
if err == nil && len(addrDesc) > 0 {
sad := string(addrDesc)
as, ok := s.addressSubscriptions[sad]
if ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
}
addrDesc, err = s.chainParser.GetAddrDescFromAddress(tx.Erc20[i].To)
if err == nil && len(addrDesc) > 0 {
sad := string(addrDesc)
as, ok := s.addressSubscriptions[sad]
if ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
}
}
s.addressSubscriptionsLock.Unlock()
if len(subscribed) > 0 {
atx, err := s.api.GetTransactionFromMempoolTx(tx)
if err != nil {
glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid)
return
}
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx)
}
}
}

View File

@ -26,7 +26,7 @@ func (c *fakeBlockChain) Initialize() error {
return nil
}
func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc, onNewTx bchain.OnNewTxFunc) error {
return nil
}

View File

@ -182,7 +182,7 @@ func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, bc
return nil, nil, fmt.Errorf("Mempool creation failed: %s", err)
}
err = chain.InitializeMempool(nil, nil)
err = chain.InitializeMempool(nil, nil, nil)
if err != nil {
return nil, nil, fmt.Errorf("Mempool initialization failed: %s", err)
}