Add general newTransaction subscription for all mempool transactions

pull/574/head
kevin 2021-03-08 17:20:24 -07:00
parent 8f3106d009
commit 0d47b69b3c
1 changed files with 70 additions and 30 deletions

View File

@ -57,23 +57,25 @@ type websocketChannel struct {
// WebsocketServer is a handle to websocket server
type WebsocketServer struct {
socket *websocket.Conn
upgrader *websocket.Upgrader
db *db.RocksDB
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
mempool bchain.Mempool
metrics *common.Metrics
is *common.InternalState
api *api.Worker
block0hash string
newBlockSubscriptions map[*websocketChannel]string
newBlockSubscriptionsLock sync.Mutex
addressSubscriptions map[string]map[*websocketChannel]string
addressSubscriptionsLock sync.Mutex
fiatRatesSubscriptions map[string]map[*websocketChannel]string
fiatRatesSubscriptionsLock sync.Mutex
socket *websocket.Conn
upgrader *websocket.Upgrader
db *db.RocksDB
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
mempool bchain.Mempool
metrics *common.Metrics
is *common.InternalState
api *api.Worker
block0hash string
newBlockSubscriptions map[*websocketChannel]string
newBlockSubscriptionsLock sync.Mutex
newTransactionSubscriptions map[*websocketChannel]string
newTransactionSubscriptionsLock sync.Mutex
addressSubscriptions map[string]map[*websocketChannel]string
addressSubscriptionsLock sync.Mutex
fiatRatesSubscriptions map[string]map[*websocketChannel]string
fiatRatesSubscriptionsLock sync.Mutex
}
// NewWebsocketServer creates new websocket interface to blockbook and returns its handle
@ -92,18 +94,19 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.
WriteBufferSize: 1024 * 32,
CheckOrigin: checkOrigin,
},
db: db,
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
mempool: mempool,
metrics: metrics,
is: is,
api: api,
block0hash: b0,
newBlockSubscriptions: make(map[*websocketChannel]string),
addressSubscriptions: make(map[string]map[*websocketChannel]string),
fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string),
db: db,
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
mempool: mempool,
metrics: metrics,
is: is,
api: api,
block0hash: b0,
newBlockSubscriptions: make(map[*websocketChannel]string),
newTransactionSubscriptions: make(map[*websocketChannel]string),
addressSubscriptions: make(map[string]map[*websocketChannel]string),
fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string),
}
return s, nil
}
@ -346,6 +349,12 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs
"unsubscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.unsubscribeNewBlock(c)
},
"subscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.subscribeNewTransaction(c, req)
},
"unsubscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.unsubscribeNewTransaction(c)
},
"subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
ad, err := s.unmarshalAddresses(req.Params)
if err == nil {
@ -672,6 +681,22 @@ func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interfac
return &subscriptionResponse{false}, nil
}
func (s *WebsocketServer) subscribeNewTransaction(c *websocketChannel, req *websocketReq) (res interface{}, err error) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
glog.Infof("subscribeNewTransaction: %+v\n%v\n", c, req.ID)
s.newTransactionSubscriptions[c] = req.ID
return &subscriptionResponse{true}, nil
}
func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res interface{}, err error) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
glog.Infof("unsubscribeNewTransaction: %+v\n", c)
delete(s.newTransactionSubscriptions, c)
return &subscriptionResponse{false}, nil
}
func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]bchain.AddressDescriptor, error) {
r := struct {
Addresses []string `json:"addresses"`
@ -790,6 +815,18 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
}
func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
for c, id := range s.newTransactionSubscriptions {
c.DataOut(&websocketRes{
ID: id,
Data: &tx,
})
}
glog.Info("broadcasting new tx ", tx.Txid, " to ", len(s.newTransactionSubscriptions), " channels")
}
func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx) {
addrDesc := bchain.AddressDescriptor(stringAddressDescriptor)
addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
@ -868,12 +905,15 @@ func (s *WebsocketServer) getNewTxSubscriptions(tx *bchain.MempoolTx) map[string
// OnNewTx is a callback that broadcasts info about a tx affecting subscribed address
func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
subscribed := s.getNewTxSubscriptions(tx)
if len(subscribed) > 0 {
if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 {
atx, err := s.api.GetTransactionFromMempoolTx(tx)
if err != nil {
glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid)
return
}
s.sendOnNewTx(atx)
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx)
}