Compare commits
3 Commits
deepcrayon
...
subscribeM
Author | SHA1 | Date |
---|---|---|
Martin Boehm | 2c9abdce42 | |
Martin Boehm | d2580674fa | |
Catenocrypt | aa4c6bd4f4 |
|
@ -1421,6 +1421,20 @@ func websocketTestsBitcoinType(t *testing.T, ts *httptest.Server) {
|
|||
},
|
||||
want: `{"id":"36","data":[{"time":1521514800,"txs":1,"received":"1","sent":"0","sentToSelf":"0","rates":{"eur":1301,"usd":2001}}]}`,
|
||||
},
|
||||
{
|
||||
name: "websocket subscribeMempool",
|
||||
req: websocketReq{
|
||||
Method: "subscribeMempool",
|
||||
},
|
||||
want: `{"id":"37","data":{"subscribed":true}}`,
|
||||
},
|
||||
{
|
||||
name: "websocket unsubscribeMempool",
|
||||
req: websocketReq{
|
||||
Method: "unsubscribeMempool",
|
||||
},
|
||||
want: `{"id":"38","data":{"subscribed":false}}`,
|
||||
},
|
||||
}
|
||||
|
||||
// send all requests at once
|
||||
|
|
|
@ -72,6 +72,8 @@ type WebsocketServer struct {
|
|||
newBlockSubscriptionsLock sync.Mutex
|
||||
addressSubscriptions map[string]map[*websocketChannel]string
|
||||
addressSubscriptionsLock sync.Mutex
|
||||
mempoolSubscriptions map[*websocketChannel]string
|
||||
mempoolSubscriptionsLock sync.Mutex
|
||||
fiatRatesSubscriptions map[string]map[*websocketChannel]string
|
||||
fiatRatesSubscriptionsLock sync.Mutex
|
||||
}
|
||||
|
@ -103,6 +105,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.
|
|||
block0hash: b0,
|
||||
newBlockSubscriptions: make(map[*websocketChannel]string),
|
||||
addressSubscriptions: make(map[string]map[*websocketChannel]string),
|
||||
mempoolSubscriptions: make(map[*websocketChannel]string),
|
||||
fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string),
|
||||
}
|
||||
return s, nil
|
||||
|
@ -333,6 +336,12 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs
|
|||
"unsubscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
|
||||
return s.unsubscribeAddresses(c)
|
||||
},
|
||||
"subscribeMempool": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
|
||||
return s.subscribeMempool(c, req)
|
||||
},
|
||||
"unsubscribeMempool": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
|
||||
return s.unsubscribeMempool(c)
|
||||
},
|
||||
"subscribeFiatRates": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
|
||||
r := struct {
|
||||
Currency string `json:"currency"`
|
||||
|
@ -697,6 +706,20 @@ func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interfa
|
|||
return &subscriptionResponse{false}, nil
|
||||
}
|
||||
|
||||
func (s *WebsocketServer) subscribeMempool(c *websocketChannel, req *websocketReq) (res interface{}, err error) {
|
||||
s.mempoolSubscriptionsLock.Lock()
|
||||
defer s.mempoolSubscriptionsLock.Unlock()
|
||||
s.mempoolSubscriptions[c] = req.ID
|
||||
return &subscriptionResponse{true}, nil
|
||||
}
|
||||
|
||||
func (s *WebsocketServer) unsubscribeMempool(c *websocketChannel) (res interface{}, err error) {
|
||||
s.mempoolSubscriptionsLock.Lock()
|
||||
defer s.mempoolSubscriptionsLock.Unlock()
|
||||
delete(s.mempoolSubscriptions, c)
|
||||
return &subscriptionResponse{false}, nil
|
||||
}
|
||||
|
||||
// subscribeFiatRates subscribes all FiatRates subscriptions by this channel
|
||||
func (s *WebsocketServer) subscribeFiatRates(c *websocketChannel, currency string, req *websocketReq) (res interface{}, err error) {
|
||||
// unsubscribe all previous subscriptions
|
||||
|
@ -755,49 +778,108 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
|
|||
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
|
||||
}
|
||||
|
||||
// // OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address
|
||||
// func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor) {
|
||||
// // check if there is any subscription but release the lock immediately, GetTransactionFromBchainTx may take some time
|
||||
// s.addressSubscriptionsLock.Lock()
|
||||
// as, ok := s.addressSubscriptions[string(addrDesc)]
|
||||
// lenAs := len(as)
|
||||
// s.addressSubscriptionsLock.Unlock()
|
||||
// if ok && lenAs > 0 {
|
||||
// addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
|
||||
// if err != nil {
|
||||
// glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc)
|
||||
// return
|
||||
// }
|
||||
// if len(addr) == 1 {
|
||||
// atx, err := s.api.GetTransactionFromBchainTx(tx, 0, false, false)
|
||||
// if err != nil {
|
||||
// glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid)
|
||||
// return
|
||||
// }
|
||||
// data := struct {
|
||||
// Address string `json:"address"`
|
||||
// Tx *api.Tx `json:"tx"`
|
||||
// }{
|
||||
// Address: addr[0],
|
||||
// Tx: atx,
|
||||
// }
|
||||
// // get the list of subscriptions again, this time keep the lock
|
||||
// s.addressSubscriptionsLock.Lock()
|
||||
// defer s.addressSubscriptionsLock.Unlock()
|
||||
// as, ok = s.addressSubscriptions[string(addrDesc)]
|
||||
// if ok {
|
||||
// for c, id := range as {
|
||||
// if c.IsAlive() {
|
||||
// c.out <- &websocketRes{
|
||||
// ID: id,
|
||||
// Data: &data,
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// glog.Info("broadcasting new tx ", tx.Txid, " for addr ", addr[0], " to ", len(as), " channels")
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address
|
||||
func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor) {
|
||||
// check if there is any subscription but release the lock immediately, GetTransactionFromBchainTx may take some time
|
||||
// check if there is any subscription, to this address or to any address
|
||||
subs := make(map[*websocketChannel]string)
|
||||
// check if there is any subscription for this address (release lock immediately)
|
||||
s.addressSubscriptionsLock.Lock()
|
||||
as, ok := s.addressSubscriptions[string(addrDesc)]
|
||||
lenAs := len(as)
|
||||
s.addressSubscriptionsLock.Unlock()
|
||||
if ok && lenAs > 0 {
|
||||
addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
|
||||
if err != nil {
|
||||
glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc)
|
||||
return
|
||||
if ok && len(as) > 0 {
|
||||
for c, id := range as {
|
||||
subs[c] = id
|
||||
}
|
||||
if len(addr) == 1 {
|
||||
atx, err := s.api.GetTransactionFromBchainTx(tx, 0, false, false)
|
||||
if err != nil {
|
||||
glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid)
|
||||
return
|
||||
}
|
||||
data := struct {
|
||||
Address string `json:"address"`
|
||||
Tx *api.Tx `json:"tx"`
|
||||
}{
|
||||
Address: addr[0],
|
||||
Tx: atx,
|
||||
}
|
||||
// get the list of subscriptions again, this time keep the lock
|
||||
s.addressSubscriptionsLock.Lock()
|
||||
defer s.addressSubscriptionsLock.Unlock()
|
||||
as, ok = s.addressSubscriptions[string(addrDesc)]
|
||||
if ok {
|
||||
for c, id := range as {
|
||||
if c.IsAlive() {
|
||||
c.out <- &websocketRes{
|
||||
ID: id,
|
||||
Data: &data,
|
||||
}
|
||||
}
|
||||
}
|
||||
glog.Info("broadcasting new tx ", tx.Txid, " for addr ", addr[0], " to ", len(as), " channels")
|
||||
}
|
||||
s.addressSubscriptionsLock.Unlock()
|
||||
// check is there is any subscription for any address
|
||||
s.mempoolSubscriptionsLock.Lock()
|
||||
for c, id := range s.mempoolSubscriptions {
|
||||
subs[c] = id
|
||||
}
|
||||
s.mempoolSubscriptionsLock.Unlock()
|
||||
|
||||
if len(subs) == 0 {
|
||||
return
|
||||
}
|
||||
addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
|
||||
if err != nil {
|
||||
glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc)
|
||||
return
|
||||
}
|
||||
if len(addr) != 1 {
|
||||
return
|
||||
}
|
||||
atx, err := s.api.GetTransactionFromBchainTx(tx, 0, false, false)
|
||||
if err != nil {
|
||||
glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid)
|
||||
return
|
||||
}
|
||||
data := struct {
|
||||
Address string `json:"address"`
|
||||
Tx *api.Tx `json:"tx"`
|
||||
}{
|
||||
Address: addr[0],
|
||||
Tx: atx,
|
||||
}
|
||||
// get the list of subscriptions again, keep locks
|
||||
s.addressSubscriptionsLock.Lock()
|
||||
defer s.addressSubscriptionsLock.Unlock()
|
||||
s.mempoolSubscriptionsLock.Lock()
|
||||
defer s.mempoolSubscriptionsLock.Unlock()
|
||||
for c, id := range subs {
|
||||
if c.IsAlive() {
|
||||
c.out <- &websocketRes{
|
||||
ID: id,
|
||||
Data: &data,
|
||||
}
|
||||
}
|
||||
}
|
||||
glog.Info("broadcasting new tx ", tx.Txid, " for addr ", addr[0], " to ", len(subs), " channels")
|
||||
}
|
||||
|
||||
func (s *WebsocketServer) broadcastTicker(currency string, rates map[string]float64) {
|
||||
|
|
|
@ -56,6 +56,7 @@
|
|||
subscriptions = {};
|
||||
subscribeNewBlockId = "";
|
||||
subscribeAddressesId = "";
|
||||
subscribeMempoolId = "";
|
||||
if (server.startsWith("http")) {
|
||||
server = server.replace("http", "ws");
|
||||
}
|
||||
|
@ -299,6 +300,33 @@
|
|||
});
|
||||
}
|
||||
|
||||
function subscribeMempool() {
|
||||
const method = 'subscribeMempool';
|
||||
if (subscribeMempoolId) {
|
||||
delete subscriptions[subscribeMempoolId];
|
||||
subscribeMempoolId = "";
|
||||
}
|
||||
const params = {
|
||||
};
|
||||
subscribeMempoolId = subscribe(method, params, function (result) {
|
||||
document.getElementById('subscribeMempoolResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
|
||||
});
|
||||
document.getElementById('subscribeMempoolIds').innerText = subscribeMempoolId;
|
||||
document.getElementById('unsubscribeMempoolButton').setAttribute("style", "display: inherit;");
|
||||
}
|
||||
|
||||
function unsubscribeMempool() {
|
||||
const method = 'unsubscribeMempool';
|
||||
const params = {
|
||||
};
|
||||
unsubscribe(method, subscribeMempoolId, params, function (result) {
|
||||
subscribeMempoolId = "";
|
||||
document.getElementById('subscribeMempoolResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
|
||||
document.getElementById('subscribeMempoolIds').innerText = "";
|
||||
document.getElementById('unsubscribeMempoolButton').setAttribute("style", "display: none;");
|
||||
});
|
||||
}
|
||||
|
||||
function getFiatRatesForTimestamps() {
|
||||
const method = 'getFiatRatesForTimestamps';
|
||||
var timestamps = document.getElementById('getFiatRatesForTimestampsList').value.split(",");
|
||||
|
@ -602,6 +630,20 @@
|
|||
<div class="row">
|
||||
<div class="col" id="subscribeAddressesResult"></div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col">
|
||||
<input class="btn btn-secondary" type="button" value="subscribe mempool" onclick="subscribeMempool()">
|
||||
</div>
|
||||
<div class="col">
|
||||
<span id="subscribeMempoolIds"></span>
|
||||
</div>
|
||||
<div class="col">
|
||||
<input class="btn btn-secondary" id="unsubscribeMempoolButton" style="display: none;" type="button" value="unsubscribe" onclick="unsubscribeMempool()">
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col" id="subscribeMempoolResult"></div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-3">
|
||||
<input class="btn btn-secondary" type="button" value="subscribe new fiat rates" onclick="subscribeNewFiatRatesTicker()">
|
||||
|
|
Loading…
Reference in New Issue