Rename subscribeAnyAddresses to subscribeMempool

subscribeMempool
Martin Boehm 2020-05-22 00:14:51 +02:00
parent d2580674fa
commit 2c9abdce42
3 changed files with 121 additions and 88 deletions

View File

@ -1422,16 +1422,16 @@ func websocketTestsBitcoinType(t *testing.T, ts *httptest.Server) {
want: `{"id":"36","data":[{"time":1521514800,"txs":1,"received":"1","sent":"0","sentToSelf":"0","rates":{"eur":1301,"usd":2001}}]}`, want: `{"id":"36","data":[{"time":1521514800,"txs":1,"received":"1","sent":"0","sentToSelf":"0","rates":{"eur":1301,"usd":2001}}]}`,
}, },
{ {
name: "websocket subscribeAnyAddresses", name: "websocket subscribeMempool",
req: websocketReq{ req: websocketReq{
Method: "subscribeAnyAddresses", Method: "subscribeMempool",
}, },
want: `{"id":"37","data":{"subscribed":true}}`, want: `{"id":"37","data":{"subscribed":true}}`,
}, },
{ {
name: "websocket unsubscribeAnyAddresses", name: "websocket unsubscribeMempool",
req: websocketReq{ req: websocketReq{
Method: "unsubscribeAnyAddresses", Method: "unsubscribeMempool",
}, },
want: `{"id":"38","data":{"subscribed":false}}`, want: `{"id":"38","data":{"subscribed":false}}`,
}, },

View File

@ -57,25 +57,25 @@ type websocketChannel struct {
// WebsocketServer is a handle to websocket server // WebsocketServer is a handle to websocket server
type WebsocketServer struct { type WebsocketServer struct {
socket *websocket.Conn socket *websocket.Conn
upgrader *websocket.Upgrader upgrader *websocket.Upgrader
db *db.RocksDB db *db.RocksDB
txCache *db.TxCache txCache *db.TxCache
chain bchain.BlockChain chain bchain.BlockChain
chainParser bchain.BlockChainParser chainParser bchain.BlockChainParser
mempool bchain.Mempool mempool bchain.Mempool
metrics *common.Metrics metrics *common.Metrics
is *common.InternalState is *common.InternalState
api *api.Worker api *api.Worker
block0hash string block0hash string
newBlockSubscriptions map[*websocketChannel]string newBlockSubscriptions map[*websocketChannel]string
newBlockSubscriptionsLock sync.Mutex newBlockSubscriptionsLock sync.Mutex
addressSubscriptions map[string]map[*websocketChannel]string addressSubscriptions map[string]map[*websocketChannel]string
addressSubscriptionsLock sync.Mutex addressSubscriptionsLock sync.Mutex
anyAddressSubscriptions map[*websocketChannel]string mempoolSubscriptions map[*websocketChannel]string
anyAddressSubscriptionsLock sync.Mutex mempoolSubscriptionsLock sync.Mutex
fiatRatesSubscriptions map[string]map[*websocketChannel]string fiatRatesSubscriptions map[string]map[*websocketChannel]string
fiatRatesSubscriptionsLock sync.Mutex fiatRatesSubscriptionsLock sync.Mutex
} }
// NewWebsocketServer creates new websocket interface to blockbook and returns its handle // NewWebsocketServer creates new websocket interface to blockbook and returns its handle
@ -94,19 +94,19 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.
WriteBufferSize: 1024 * 32, WriteBufferSize: 1024 * 32,
CheckOrigin: checkOrigin, CheckOrigin: checkOrigin,
}, },
db: db, db: db,
txCache: txCache, txCache: txCache,
chain: chain, chain: chain,
chainParser: chain.GetChainParser(), chainParser: chain.GetChainParser(),
mempool: mempool, mempool: mempool,
metrics: metrics, metrics: metrics,
is: is, is: is,
api: api, api: api,
block0hash: b0, block0hash: b0,
newBlockSubscriptions: make(map[*websocketChannel]string), newBlockSubscriptions: make(map[*websocketChannel]string),
addressSubscriptions: make(map[string]map[*websocketChannel]string), addressSubscriptions: make(map[string]map[*websocketChannel]string),
anyAddressSubscriptions: make(map[*websocketChannel]string), mempoolSubscriptions: make(map[*websocketChannel]string),
fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string), fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string),
} }
return s, nil return s, nil
} }
@ -336,11 +336,11 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs
"unsubscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { "unsubscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.unsubscribeAddresses(c) return s.unsubscribeAddresses(c)
}, },
"subscribeAnyAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { "subscribeMempool": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.subscribeAnyAddresses(c, req) return s.subscribeMempool(c, req)
}, },
"unsubscribeAnyAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { "unsubscribeMempool": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.unsubscribeAnyAddresses(c) return s.unsubscribeMempool(c)
}, },
"subscribeFiatRates": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { "subscribeFiatRates": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
r := struct { r := struct {
@ -706,23 +706,17 @@ func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interfa
return &subscriptionResponse{false}, nil return &subscriptionResponse{false}, nil
} }
func (s *WebsocketServer) subscribeAnyAddresses(c *websocketChannel, req *websocketReq) (res interface{}, err error) { func (s *WebsocketServer) subscribeMempool(c *websocketChannel, req *websocketReq) (res interface{}, err error) {
// unsubscribe first s.mempoolSubscriptionsLock.Lock()
s.unsubscribeAnyAddresses(c) defer s.mempoolSubscriptionsLock.Unlock()
s.anyAddressSubscriptionsLock.Lock() s.mempoolSubscriptions[c] = req.ID
defer s.anyAddressSubscriptionsLock.Unlock()
s.anyAddressSubscriptions[c] = req.ID
return &subscriptionResponse{true}, nil return &subscriptionResponse{true}, nil
} }
func (s *WebsocketServer) unsubscribeAnyAddresses(c *websocketChannel) (res interface{}, err error) { func (s *WebsocketServer) unsubscribeMempool(c *websocketChannel) (res interface{}, err error) {
s.anyAddressSubscriptionsLock.Lock() s.mempoolSubscriptionsLock.Lock()
defer s.anyAddressSubscriptionsLock.Unlock() defer s.mempoolSubscriptionsLock.Unlock()
for sc := range s.anyAddressSubscriptions { delete(s.mempoolSubscriptions, c)
if sc == c {
delete(s.anyAddressSubscriptions, c)
}
}
return &subscriptionResponse{false}, nil return &subscriptionResponse{false}, nil
} }
@ -784,6 +778,51 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
} }
// // OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address
// func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor) {
// // check if there is any subscription but release the lock immediately, GetTransactionFromBchainTx may take some time
// s.addressSubscriptionsLock.Lock()
// as, ok := s.addressSubscriptions[string(addrDesc)]
// lenAs := len(as)
// s.addressSubscriptionsLock.Unlock()
// if ok && lenAs > 0 {
// addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
// if err != nil {
// glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc)
// return
// }
// if len(addr) == 1 {
// atx, err := s.api.GetTransactionFromBchainTx(tx, 0, false, false)
// if err != nil {
// glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid)
// return
// }
// data := struct {
// Address string `json:"address"`
// Tx *api.Tx `json:"tx"`
// }{
// Address: addr[0],
// Tx: atx,
// }
// // get the list of subscriptions again, this time keep the lock
// s.addressSubscriptionsLock.Lock()
// defer s.addressSubscriptionsLock.Unlock()
// as, ok = s.addressSubscriptions[string(addrDesc)]
// if ok {
// for c, id := range as {
// if c.IsAlive() {
// c.out <- &websocketRes{
// ID: id,
// Data: &data,
// }
// }
// }
// glog.Info("broadcasting new tx ", tx.Txid, " for addr ", addr[0], " to ", len(as), " channels")
// }
// }
// }
// }
// OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address // OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address
func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor) { func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor) {
// check if there is any subscription, to this address or to any address // check if there is any subscription, to this address or to any address
@ -798,11 +837,11 @@ func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDesc
} }
s.addressSubscriptionsLock.Unlock() s.addressSubscriptionsLock.Unlock()
// check is there is any subscription for any address // check is there is any subscription for any address
s.anyAddressSubscriptionsLock.Lock() s.mempoolSubscriptionsLock.Lock()
for c, id := range s.anyAddressSubscriptions { for c, id := range s.mempoolSubscriptions {
subs[c] = id subs[c] = id
} }
s.anyAddressSubscriptionsLock.Unlock() s.mempoolSubscriptionsLock.Unlock()
if len(subs) == 0 { if len(subs) == 0 {
return return
@ -830,8 +869,8 @@ func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDesc
// get the list of subscriptions again, keep locks // get the list of subscriptions again, keep locks
s.addressSubscriptionsLock.Lock() s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock() defer s.addressSubscriptionsLock.Unlock()
s.anyAddressSubscriptionsLock.Lock() s.mempoolSubscriptionsLock.Lock()
defer s.anyAddressSubscriptionsLock.Unlock() defer s.mempoolSubscriptionsLock.Unlock()
for c, id := range subs { for c, id := range subs {
if c.IsAlive() { if c.IsAlive() {
c.out <- &websocketRes{ c.out <- &websocketRes{

View File

@ -56,7 +56,7 @@
subscriptions = {}; subscriptions = {};
subscribeNewBlockId = ""; subscribeNewBlockId = "";
subscribeAddressesId = ""; subscribeAddressesId = "";
subscribeAnyAddressesId = ""; subscribeMempoolId = "";
if (server.startsWith("http")) { if (server.startsWith("http")) {
server = server.replace("http", "ws"); server = server.replace("http", "ws");
} }
@ -300,33 +300,30 @@
}); });
} }
function subscribeAnyAddresses() { function subscribeMempool() {
const method = 'subscribeAnyAddresses'; const method = 'subscribeMempool';
var addresses = document.getElementById('subscribeAnyAddressesName').value.split(","); if (subscribeMempoolId) {
addresses = addresses.map(s => s.trim()); delete subscriptions[subscribeMempoolId];
const params = { subscribeMempoolId = "";
addresses
};
if (subscribeAnyAddressesId) {
delete subscriptions[subscribeAnyAddressesId];
subscribeAnyAddressesId = "";
} }
subscribeAnyAddressesId = subscribe(method, params, function (result) { const params = {
document.getElementById('subscribeAnyAddressesResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; };
subscribeMempoolId = subscribe(method, params, function (result) {
document.getElementById('subscribeMempoolResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
}); });
document.getElementById('subscribeAnyAddressesIds').innerText = subscribeAnyAddressesId; document.getElementById('subscribeMempoolIds').innerText = subscribeMempoolId;
document.getElementById('unsubscribeAnyAddressesButton').setAttribute("style", "display: inherit;"); document.getElementById('unsubscribeMempoolButton').setAttribute("style", "display: inherit;");
} }
function unsubscribeAnyAddresses() { function unsubscribeMempool() {
const method = 'unsubscribeAnyAddresses'; const method = 'unsubscribeMempool';
const params = { const params = {
}; };
unsubscribe(method, subscribeAnyAddressesId, params, function (result) { unsubscribe(method, subscribeMempoolId, params, function (result) {
subscribeAnyAddressesId = ""; subscribeMempoolId = "";
document.getElementById('subscribeAnyAddressesResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; document.getElementById('subscribeMempoolResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
document.getElementById('subscribeAnyAddressesIds').innerText = ""; document.getElementById('subscribeMempoolIds').innerText = "";
document.getElementById('unsubscribeAnyAddressesButton').setAttribute("style", "display: none;"); document.getElementById('unsubscribeMempoolButton').setAttribute("style", "display: none;");
}); });
} }
@ -635,20 +632,17 @@
</div> </div>
<div class="row"> <div class="row">
<div class="col"> <div class="col">
<input class="btn btn-secondary" type="button" value="subscribe address" onclick="subscribeAnyAddresses()"> <input class="btn btn-secondary" type="button" value="subscribe mempool" onclick="subscribeMempool()">
</div>
<div class="col-8">
<input type="text" class="form-control" id="subscribeAnyAddressesName" value="0xba98d6a5ac827632e3457de7512d211e4ff7e8bd,0x73d0385f4d8e00c5e6504c6030f47bf6212736a8">
</div> </div>
<div class="col"> <div class="col">
<span id="subscribeAnyAddressesIds"></span> <span id="subscribeMempoolIds"></span>
</div> </div>
<div class="col"> <div class="col">
<input class="btn btn-secondary" id="unsubscribeAnyAddressesButton" style="display: none;" type="button" value="unsubscribe" onclick="unsubscribeAnyAddresses()"> <input class="btn btn-secondary" id="unsubscribeMempoolButton" style="display: none;" type="button" value="unsubscribe" onclick="unsubscribeMempool()">
</div> </div>
</div> </div>
<div class="row"> <div class="row">
<div class="col" id="subscribeAnyAddressesResult"></div> <div class="col" id="subscribeMempoolResult"></div>
</div> </div>
<div class="row"> <div class="row">
<div class="col-3"> <div class="col-3">