Improve locking and add panic handlers to websocket functionality
parent
214d0144ef
commit
3d9954bf79
20
blockbook.go
20
blockbook.go
|
@ -545,12 +545,22 @@ func syncIndexLoop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func onNewBlockHash(hash string, height uint32) {
|
func onNewBlockHash(hash string, height uint32) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
glog.Error("onNewBlockHash recovered from panic: ", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
for _, c := range callbacksOnNewBlock {
|
for _, c := range callbacksOnNewBlock {
|
||||||
c(hash, height)
|
c(hash, height)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func onNewFiatRatesTicker(ticker *db.CurrencyRatesTicker) {
|
func onNewFiatRatesTicker(ticker *db.CurrencyRatesTicker) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
glog.Error("onNewFiatRatesTicker recovered from panic: ", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
for _, c := range callbacksOnNewFiatRatesTicker {
|
for _, c := range callbacksOnNewFiatRatesTicker {
|
||||||
c(ticker)
|
c(ticker)
|
||||||
}
|
}
|
||||||
|
@ -617,12 +627,22 @@ func storeInternalStateLoop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func onNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor) {
|
func onNewTxAddr(tx *bchain.Tx, desc bchain.AddressDescriptor) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
glog.Error("onNewTxAddr recovered from panic: ", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
for _, c := range callbacksOnNewTxAddr {
|
for _, c := range callbacksOnNewTxAddr {
|
||||||
c(tx, desc)
|
c(tx, desc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func onNewTx(tx *bchain.MempoolTx) {
|
func onNewTx(tx *bchain.MempoolTx) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
glog.Error("onNewTx recovered from panic: ", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
for _, c := range callbacksOnNewTx {
|
for _, c := range callbacksOnNewTx {
|
||||||
c(tx)
|
c(tx)
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,6 +143,12 @@ func (s *WebsocketServer) GetHandler() http.Handler {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WebsocketServer) closeChannel(c *websocketChannel) {
|
func (s *WebsocketServer) closeChannel(c *websocketChannel) {
|
||||||
|
if c.CloseOut() {
|
||||||
|
s.onDisconnect(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *websocketChannel) CloseOut() bool {
|
||||||
c.aliveLock.Lock()
|
c.aliveLock.Lock()
|
||||||
defer c.aliveLock.Unlock()
|
defer c.aliveLock.Unlock()
|
||||||
if c.alive {
|
if c.alive {
|
||||||
|
@ -153,14 +159,24 @@ func (s *WebsocketServer) closeChannel(c *websocketChannel) {
|
||||||
for len(c.out) > 0 {
|
for len(c.out) > 0 {
|
||||||
<-c.out
|
<-c.out
|
||||||
}
|
}
|
||||||
s.onDisconnect(c)
|
return true
|
||||||
}
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *websocketChannel) IsAlive() bool {
|
func (c *websocketChannel) DataOut(data *websocketRes) {
|
||||||
c.aliveLock.Lock()
|
c.aliveLock.Lock()
|
||||||
defer c.aliveLock.Unlock()
|
defer c.aliveLock.Unlock()
|
||||||
return c.alive
|
if c.alive {
|
||||||
|
if len(c.out) < outChannelSize-1 {
|
||||||
|
c.out <- data
|
||||||
|
} else {
|
||||||
|
glog.Warning("Channel ", c.id, " overflow, closing")
|
||||||
|
// close the connection but do not call CloseOut - would call duplicate c.aliveLock.Lock
|
||||||
|
// CloseOut will be called because the closed connection will cause break in the inputLoop
|
||||||
|
c.conn.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WebsocketServer) inputLoop(c *websocketChannel) {
|
func (s *WebsocketServer) inputLoop(c *websocketChannel) {
|
||||||
|
@ -204,11 +220,18 @@ func (s *WebsocketServer) inputLoop(c *websocketChannel) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WebsocketServer) outputLoop(c *websocketChannel) {
|
func (s *WebsocketServer) outputLoop(c *websocketChannel) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
glog.Error("recovered from panic: ", r, ", ", c.id)
|
||||||
|
s.closeChannel(c)
|
||||||
|
}
|
||||||
|
}()
|
||||||
for m := range c.out {
|
for m := range c.out {
|
||||||
err := c.conn.WriteJSON(m)
|
err := c.conn.WriteJSON(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("Error sending message to ", c.id, ", ", err)
|
glog.Error("Error sending message to ", c.id, ", ", err)
|
||||||
s.closeChannel(c)
|
s.closeChannel(c)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -383,18 +406,6 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendResponse(c *websocketChannel, req *websocketReq, data interface{}) {
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
glog.Error("Client ", c.id, ", onRequest ", req.Method, " recovered from panic: ", r)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
c.out <- &websocketRes{
|
|
||||||
ID: req.ID,
|
|
||||||
Data: data,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *WebsocketServer) onRequest(c *websocketChannel, req *websocketReq) {
|
func (s *WebsocketServer) onRequest(c *websocketChannel, req *websocketReq) {
|
||||||
var err error
|
var err error
|
||||||
var data interface{}
|
var data interface{}
|
||||||
|
@ -408,7 +419,10 @@ func (s *WebsocketServer) onRequest(c *websocketChannel, req *websocketReq) {
|
||||||
}
|
}
|
||||||
// nil data means no response
|
// nil data means no response
|
||||||
if data != nil {
|
if data != nil {
|
||||||
sendResponse(c, req, data)
|
c.DataOut(&websocketRes{
|
||||||
|
ID: req.ID,
|
||||||
|
Data: data,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
@ -429,7 +443,7 @@ func (s *WebsocketServer) onRequest(c *websocketChannel, req *websocketReq) {
|
||||||
data = e
|
data = e
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
glog.Warning("Client ", c.id, " onMessage ", req.Method, ": unknown method, data ", string(req.Params))
|
glog.V(1).Info("Client ", c.id, " onMessage ", req.Method, ": unknown method, data ", string(req.Params))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -665,11 +679,25 @@ func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]bchain.AddressDes
|
||||||
return rv, nil
|
return rv, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// unsubscribe addresses without addressSubscriptionsLock - can be called only from subscribeAddresses and unsubscribeAddresses
|
||||||
|
func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) {
|
||||||
|
for ads, sa := range s.addressSubscriptions {
|
||||||
|
for sc := range sa {
|
||||||
|
if sc == c {
|
||||||
|
delete(sa, c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(sa) == 0 {
|
||||||
|
delete(s.addressSubscriptions, ads)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) {
|
func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) {
|
||||||
// unsubscribe all previous subscriptions
|
|
||||||
s.unsubscribeAddresses(c)
|
|
||||||
s.addressSubscriptionsLock.Lock()
|
s.addressSubscriptionsLock.Lock()
|
||||||
defer s.addressSubscriptionsLock.Unlock()
|
defer s.addressSubscriptionsLock.Unlock()
|
||||||
|
// unsubscribe all previous subscriptions
|
||||||
|
s.doUnsubscribeAddresses(c)
|
||||||
for i := range addrDesc {
|
for i := range addrDesc {
|
||||||
ads := string(addrDesc[i])
|
ads := string(addrDesc[i])
|
||||||
as, ok := s.addressSubscriptions[ads]
|
as, ok := s.addressSubscriptions[ads]
|
||||||
|
@ -686,26 +714,30 @@ func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []bch
|
||||||
func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interface{}, err error) {
|
func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interface{}, err error) {
|
||||||
s.addressSubscriptionsLock.Lock()
|
s.addressSubscriptionsLock.Lock()
|
||||||
defer s.addressSubscriptionsLock.Unlock()
|
defer s.addressSubscriptionsLock.Unlock()
|
||||||
for ads, sa := range s.addressSubscriptions {
|
s.doUnsubscribeAddresses(c)
|
||||||
|
return &subscriptionResponse{false}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// unsubscribe fiat rates without fiatRatesSubscriptionsLock - can be called only from subscribeFiatRates and unsubscribeFiatRates
|
||||||
|
func (s *WebsocketServer) doUnsubscribeFiatRates(c *websocketChannel) {
|
||||||
|
for fr, sa := range s.fiatRatesSubscriptions {
|
||||||
for sc := range sa {
|
for sc := range sa {
|
||||||
if sc == c {
|
if sc == c {
|
||||||
delete(sa, c)
|
delete(sa, c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(sa) == 0 {
|
if len(sa) == 0 {
|
||||||
delete(s.addressSubscriptions, ads)
|
delete(s.fiatRatesSubscriptions, fr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &subscriptionResponse{false}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscribeFiatRates subscribes all FiatRates subscriptions by this channel
|
// subscribeFiatRates subscribes all FiatRates subscriptions by this channel
|
||||||
func (s *WebsocketServer) subscribeFiatRates(c *websocketChannel, currency string, req *websocketReq) (res interface{}, err error) {
|
func (s *WebsocketServer) subscribeFiatRates(c *websocketChannel, currency string, req *websocketReq) (res interface{}, err error) {
|
||||||
// unsubscribe all previous subscriptions
|
|
||||||
s.unsubscribeFiatRates(c)
|
|
||||||
s.fiatRatesSubscriptionsLock.Lock()
|
s.fiatRatesSubscriptionsLock.Lock()
|
||||||
defer s.fiatRatesSubscriptionsLock.Unlock()
|
defer s.fiatRatesSubscriptionsLock.Unlock()
|
||||||
|
// unsubscribe all previous subscriptions
|
||||||
|
s.doUnsubscribeFiatRates(c)
|
||||||
if currency == "" {
|
if currency == "" {
|
||||||
currency = allFiatRates
|
currency = allFiatRates
|
||||||
}
|
}
|
||||||
|
@ -722,16 +754,7 @@ func (s *WebsocketServer) subscribeFiatRates(c *websocketChannel, currency strin
|
||||||
func (s *WebsocketServer) unsubscribeFiatRates(c *websocketChannel) (res interface{}, err error) {
|
func (s *WebsocketServer) unsubscribeFiatRates(c *websocketChannel) (res interface{}, err error) {
|
||||||
s.fiatRatesSubscriptionsLock.Lock()
|
s.fiatRatesSubscriptionsLock.Lock()
|
||||||
defer s.fiatRatesSubscriptionsLock.Unlock()
|
defer s.fiatRatesSubscriptionsLock.Unlock()
|
||||||
for fr, sa := range s.fiatRatesSubscriptions {
|
s.doUnsubscribeFiatRates(c)
|
||||||
for sc := range sa {
|
|
||||||
if sc == c {
|
|
||||||
delete(sa, c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(sa) == 0 {
|
|
||||||
delete(s.fiatRatesSubscriptions, fr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &subscriptionResponse{false}, nil
|
return &subscriptionResponse{false}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -747,12 +770,10 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
|
||||||
Hash: hash,
|
Hash: hash,
|
||||||
}
|
}
|
||||||
for c, id := range s.newBlockSubscriptions {
|
for c, id := range s.newBlockSubscriptions {
|
||||||
if c.IsAlive() {
|
c.DataOut(&websocketRes{
|
||||||
c.out <- &websocketRes{
|
ID: id,
|
||||||
ID: id,
|
Data: &data,
|
||||||
Data: &data,
|
})
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
|
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
|
||||||
}
|
}
|
||||||
|
@ -772,30 +793,26 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap
|
||||||
Address: addr[0],
|
Address: addr[0],
|
||||||
Tx: tx,
|
Tx: tx,
|
||||||
}
|
}
|
||||||
// get the list of subscriptions again, this time keep the lock
|
|
||||||
s.addressSubscriptionsLock.Lock()
|
s.addressSubscriptionsLock.Lock()
|
||||||
defer s.addressSubscriptionsLock.Unlock()
|
defer s.addressSubscriptionsLock.Unlock()
|
||||||
as, ok := s.addressSubscriptions[stringAddressDescriptor]
|
as, ok := s.addressSubscriptions[stringAddressDescriptor]
|
||||||
if ok {
|
if ok {
|
||||||
for c, id := range as {
|
for c, id := range as {
|
||||||
if c.IsAlive() {
|
c.DataOut(&websocketRes{
|
||||||
c.out <- &websocketRes{
|
ID: id,
|
||||||
ID: id,
|
Data: &data,
|
||||||
Data: &data,
|
})
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
glog.Info("broadcasting new tx ", tx.Txid, ", addr ", addr[0], " to ", len(as), " channels")
|
glog.Info("broadcasting new tx ", tx.Txid, ", addr ", addr[0], " to ", len(as), " channels")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnNewTx is a callback that broadcasts info about a tx affecting subscribed address
|
func (s *WebsocketServer) getNewTxSubscriptions(tx *bchain.MempoolTx) map[string]struct{} {
|
||||||
func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
|
|
||||||
// check if there is any subscription in inputs, outputs and erc20
|
// check if there is any subscription in inputs, outputs and erc20
|
||||||
// release the lock immediately, GetTransactionFromMempoolTx is potentially slow
|
|
||||||
subscribed := make(map[string]struct{})
|
|
||||||
s.addressSubscriptionsLock.Lock()
|
s.addressSubscriptionsLock.Lock()
|
||||||
|
defer s.addressSubscriptionsLock.Unlock()
|
||||||
|
subscribed := make(map[string]struct{})
|
||||||
for i := range tx.Vin {
|
for i := range tx.Vin {
|
||||||
sad := string(tx.Vin[i].AddrDesc)
|
sad := string(tx.Vin[i].AddrDesc)
|
||||||
if len(sad) > 0 {
|
if len(sad) > 0 {
|
||||||
|
@ -833,7 +850,12 @@ func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.addressSubscriptionsLock.Unlock()
|
return subscribed
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnNewTx is a callback that broadcasts info about a tx affecting subscribed address
|
||||||
|
func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
|
||||||
|
subscribed := s.getNewTxSubscriptions(tx)
|
||||||
if len(subscribed) > 0 {
|
if len(subscribed) > 0 {
|
||||||
atx, err := s.api.GetTransactionFromMempoolTx(tx)
|
atx, err := s.api.GetTransactionFromMempoolTx(tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -847,8 +869,6 @@ func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WebsocketServer) broadcastTicker(currency string, rates map[string]float64) {
|
func (s *WebsocketServer) broadcastTicker(currency string, rates map[string]float64) {
|
||||||
s.fiatRatesSubscriptionsLock.Lock()
|
|
||||||
defer s.fiatRatesSubscriptionsLock.Unlock()
|
|
||||||
as, ok := s.fiatRatesSubscriptions[currency]
|
as, ok := s.fiatRatesSubscriptions[currency]
|
||||||
if ok && len(as) > 0 {
|
if ok && len(as) > 0 {
|
||||||
data := struct {
|
data := struct {
|
||||||
|
@ -856,24 +876,20 @@ func (s *WebsocketServer) broadcastTicker(currency string, rates map[string]floa
|
||||||
}{
|
}{
|
||||||
Rates: rates,
|
Rates: rates,
|
||||||
}
|
}
|
||||||
// get the list of subscriptions again, this time keep the lock
|
for c, id := range as {
|
||||||
as, ok = s.fiatRatesSubscriptions[currency]
|
c.DataOut(&websocketRes{
|
||||||
if ok {
|
ID: id,
|
||||||
for c, id := range as {
|
Data: &data,
|
||||||
if c.IsAlive() {
|
})
|
||||||
c.out <- &websocketRes{
|
|
||||||
ID: id,
|
|
||||||
Data: &data,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
glog.Info("broadcasting new rates for currency ", currency, " to ", len(as), " channels")
|
|
||||||
}
|
}
|
||||||
|
glog.Info("broadcasting new rates for currency ", currency, " to ", len(as), " channels")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnNewFiatRatesTicker is a callback that broadcasts info about fiat rates affecting subscribed currency
|
// OnNewFiatRatesTicker is a callback that broadcasts info about fiat rates affecting subscribed currency
|
||||||
func (s *WebsocketServer) OnNewFiatRatesTicker(ticker *db.CurrencyRatesTicker) {
|
func (s *WebsocketServer) OnNewFiatRatesTicker(ticker *db.CurrencyRatesTicker) {
|
||||||
|
s.fiatRatesSubscriptionsLock.Lock()
|
||||||
|
defer s.fiatRatesSubscriptionsLock.Unlock()
|
||||||
for currency, rate := range ticker.Rates {
|
for currency, rate := range ticker.Rates {
|
||||||
s.broadcastTicker(currency, map[string]float64{currency: rate})
|
s.broadcastTicker(currency, map[string]float64{currency: rate})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue