diff --git a/blockbook.go b/blockbook.go index 9188c2c7..77a89f1d 100644 --- a/blockbook.go +++ b/blockbook.go @@ -73,6 +73,8 @@ var ( noTxCache = flag.Bool("notxcache", false, "disable tx cache") + enableSubNewTx = flag.Bool("enablesubnewtx", false, "enable support for subscribing to all new transactions") + computeColumnStats = flag.Bool("computedbstats", false, "compute column stats and exit") computeFeeStatsFlag = flag.Bool("computefeestats", false, "compute fee stats for blocks in blockheight-blockuntil range and exit") dbStatsPeriodHours = flag.Int("dbstatsperiod", 24, "period of db stats collection in hours, 0 disables stats collection") @@ -405,7 +407,7 @@ func startInternalServer() (*server.InternalServer, error) { func startPublicServer() (*server.PublicServer, error) { // start public server in limited functionality, extend it after sync is finished by calling ConnectFullPublicInterface - publicServer, err := server.NewPublicServer(*publicBinding, *certFiles, index, chain, mempool, txCache, *explorerURL, metrics, internalState, *debugMode) + publicServer, err := server.NewPublicServer(*publicBinding, *certFiles, index, chain, mempool, txCache, *explorerURL, metrics, internalState, *debugMode, *enableSubNewTx) if err != nil { return nil, err } diff --git a/docs/api.md b/docs/api.md index dd38d6f5..00ed2730 100644 --- a/docs/api.md +++ b/docs/api.md @@ -764,12 +764,15 @@ The websocket interface provides the following requests: The client can subscribe to the following events: -- `subscribeNewBlock` - new block added to blockchain -- `subscribeAddresses` - new transaction for given address (list of addresses) -- `subscribeFiatRates` - new currency rate ticker +- `subscribeNewBlock` - new block added to blockchain +- `subscribeNewTransaction` - new transaction added to blockchain (all addresses) +- `subscribeAddresses` - new transaction for given address (list of addresses) +- `subscribeFiatRates` - new currency rate ticker There can be always only one subscription of given event per connection, i.e. new list of addresses replaces previous list of addresses. +The subscribeNewTransaction event is not enabled by default. To enable support, blockbook must be run with the `-enablesubnewtx` flag. + _Note: If there is reorg on the backend (blockchain), you will get a new block hash with the same or even smaller height if the reorg is deeper_ Websocket communication format @@ -791,5 +794,3 @@ Example for subscribing to an address (or multiple addresses) } } ``` - - diff --git a/server/public.go b/server/public.go index 1a9d1dde..a92fc1f1 100644 --- a/server/public.go +++ b/server/public.go @@ -58,7 +58,7 @@ type PublicServer struct { // NewPublicServer creates new public server http interface to blockbook and returns its handle // only basic functionality is mapped, to map all functions, call -func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool) (*PublicServer, error) { +func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool, enableSubNewTx bool) (*PublicServer, error) { api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { @@ -70,7 +70,7 @@ func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bch return nil, err } - websocket, err := NewWebsocketServer(db, chain, mempool, txCache, metrics, is) + websocket, err := NewWebsocketServer(db, chain, mempool, txCache, metrics, is, enableSubNewTx) if err != nil { return nil, err } diff --git a/server/public_test.go b/server/public_test.go index fd839750..161ca66f 100644 --- a/server/public_test.go +++ b/server/public_test.go @@ -109,7 +109,7 @@ func setupPublicHTTPServer(t *testing.T) (*PublicServer, string) { } // s.Run is never called, binding can be to any port - s, err := NewPublicServer("localhost:12345", "", d, chain, mempool, txCache, "", metrics, is, false) + s, err := NewPublicServer("localhost:12345", "", d, chain, mempool, txCache, "", metrics, is, false, false) if err != nil { t.Fatal(err) } @@ -1421,6 +1421,20 @@ func websocketTestsBitcoinType(t *testing.T, ts *httptest.Server) { }, want: `{"id":"36","data":[{"time":1521514800,"txs":1,"received":"1","sent":"0","sentToSelf":"0","rates":{"eur":1301,"usd":2001}}]}`, }, + { + name: "websocket subscribeNewTransaction", + req: websocketReq{ + Method: "subscribeNewTransaction", + }, + want: `{"id":"37","data":{"subscribed":false,"message":"subscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}}`, + }, + { + name: "websocket unsubscribeNewTransaction", + req: websocketReq{ + Method: "unsubscribeNewTransaction", + }, + want: `{"id":"38","data":{"subscribed":false,"message":"unsubscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}}`, + }, } // send all requests at once diff --git a/server/websocket.go b/server/websocket.go index 3386450a..48b45c80 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -57,27 +57,30 @@ type websocketChannel struct { // WebsocketServer is a handle to websocket server type WebsocketServer struct { - socket *websocket.Conn - upgrader *websocket.Upgrader - db *db.RocksDB - txCache *db.TxCache - chain bchain.BlockChain - chainParser bchain.BlockChainParser - mempool bchain.Mempool - metrics *common.Metrics - is *common.InternalState - api *api.Worker - block0hash string - newBlockSubscriptions map[*websocketChannel]string - newBlockSubscriptionsLock sync.Mutex - addressSubscriptions map[string]map[*websocketChannel]string - addressSubscriptionsLock sync.Mutex - fiatRatesSubscriptions map[string]map[*websocketChannel]string - fiatRatesSubscriptionsLock sync.Mutex + socket *websocket.Conn + upgrader *websocket.Upgrader + db *db.RocksDB + txCache *db.TxCache + chain bchain.BlockChain + chainParser bchain.BlockChainParser + mempool bchain.Mempool + metrics *common.Metrics + is *common.InternalState + api *api.Worker + block0hash string + newBlockSubscriptions map[*websocketChannel]string + newBlockSubscriptionsLock sync.Mutex + newTransactionEnabled bool + newTransactionSubscriptions map[*websocketChannel]string + newTransactionSubscriptionsLock sync.Mutex + addressSubscriptions map[string]map[*websocketChannel]string + addressSubscriptionsLock sync.Mutex + fiatRatesSubscriptions map[string]map[*websocketChannel]string + fiatRatesSubscriptionsLock sync.Mutex } // NewWebsocketServer creates new websocket interface to blockbook and returns its handle -func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) { +func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState, enableSubNewTx bool) (*WebsocketServer, error) { api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err @@ -92,18 +95,20 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain. WriteBufferSize: 1024 * 32, CheckOrigin: checkOrigin, }, - db: db, - txCache: txCache, - chain: chain, - chainParser: chain.GetChainParser(), - mempool: mempool, - metrics: metrics, - is: is, - api: api, - block0hash: b0, - newBlockSubscriptions: make(map[*websocketChannel]string), - addressSubscriptions: make(map[string]map[*websocketChannel]string), - fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string), + db: db, + txCache: txCache, + chain: chain, + chainParser: chain.GetChainParser(), + mempool: mempool, + metrics: metrics, + is: is, + api: api, + block0hash: b0, + newBlockSubscriptions: make(map[*websocketChannel]string), + newTransactionEnabled: enableSubNewTx, + newTransactionSubscriptions: make(map[*websocketChannel]string), + addressSubscriptions: make(map[string]map[*websocketChannel]string), + fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string), } return s, nil } @@ -243,6 +248,7 @@ func (s *WebsocketServer) onConnect(c *websocketChannel) { func (s *WebsocketServer) onDisconnect(c *websocketChannel) { s.unsubscribeNewBlock(c) + s.unsubscribeNewTransaction(c) s.unsubscribeAddresses(c) s.unsubscribeFiatRates(c) glog.Info("Client disconnected ", c.id, ", ", c.ip) @@ -346,6 +352,12 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs "unsubscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { return s.unsubscribeNewBlock(c) }, + "subscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + return s.subscribeNewTransaction(c, req) + }, + "unsubscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { + return s.unsubscribeNewTransaction(c) + }, "subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { ad, err := s.unmarshalAddresses(req.Params) if err == nil { @@ -657,6 +669,10 @@ func (s *WebsocketServer) sendTransaction(tx string) (res resultSendTransaction, type subscriptionResponse struct { Subscribed bool `json:"subscribed"` } +type subscriptionResponseMessage struct { + Subscribed bool `json:"subscribed"` + Message string `json:"message"` +} func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *websocketReq) (res interface{}, err error) { s.newBlockSubscriptionsLock.Lock() @@ -672,6 +688,26 @@ func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interfac return &subscriptionResponse{false}, nil } +func (s *WebsocketServer) subscribeNewTransaction(c *websocketChannel, req *websocketReq) (res interface{}, err error) { + s.newTransactionSubscriptionsLock.Lock() + defer s.newTransactionSubscriptionsLock.Unlock() + if !s.newTransactionEnabled { + return &subscriptionResponseMessage{false, "subscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil + } + s.newTransactionSubscriptions[c] = req.ID + return &subscriptionResponse{true}, nil +} + +func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res interface{}, err error) { + s.newTransactionSubscriptionsLock.Lock() + defer s.newTransactionSubscriptionsLock.Unlock() + if !s.newTransactionEnabled { + return &subscriptionResponseMessage{false, "unsubscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil + } + delete(s.newTransactionSubscriptions, c) + return &subscriptionResponse{false}, nil +} + func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]bchain.AddressDescriptor, error) { r := struct { Addresses []string `json:"addresses"` @@ -790,6 +826,18 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") } +func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) { + s.newTransactionSubscriptionsLock.Lock() + defer s.newTransactionSubscriptionsLock.Unlock() + for c, id := range s.newTransactionSubscriptions { + c.DataOut(&websocketRes{ + ID: id, + Data: &tx, + }) + } + glog.Info("broadcasting new tx ", tx.Txid, " to ", len(s.newTransactionSubscriptions), " channels") +} + func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx) { addrDesc := bchain.AddressDescriptor(stringAddressDescriptor) addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc) @@ -868,12 +916,15 @@ func (s *WebsocketServer) getNewTxSubscriptions(tx *bchain.MempoolTx) map[string // OnNewTx is a callback that broadcasts info about a tx affecting subscribed address func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) { subscribed := s.getNewTxSubscriptions(tx) - if len(subscribed) > 0 { + if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 { atx, err := s.api.GetTransactionFromMempoolTx(tx) if err != nil { glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid) return } + + s.sendOnNewTx(atx) + for stringAddressDescriptor := range subscribed { s.sendOnNewTxAddr(stringAddressDescriptor, atx) } diff --git a/static/test-websocket.html b/static/test-websocket.html index 05fabb48..ec7180b8 100644 --- a/static/test-websocket.html +++ b/static/test-websocket.html @@ -55,6 +55,7 @@ pendingMessages = {}; subscriptions = {}; subscribeNewBlockId = ""; + subscribeNewTransactionId = ""; subscribeAddressesId = ""; if (server.startsWith("http")) { server = server.replace("http", "ws"); @@ -269,6 +270,33 @@ }); } + function subscribeNewTransaction() { + const method = 'subscribeNewTransaction'; + const params = { + }; + if (subscribeNewTransactionId) { + delete subscriptions[subscribeNewTransactionId]; + subscribeNewTransactionId = ""; + } + subscribeNewTransactionId = subscribe(method, params, function (result) { + document.getElementById('subscribeNewTransactionResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; + }); + document.getElementById('subscribeNewTransactionId').innerText = subscribeNewTransactionId; + document.getElementById('unsubscribeNewTransactionButton').setAttribute("style", "display: inherit;"); + } + + function unsubscribeNewTransaction() { + const method = 'unsubscribeNewTransaction'; + const params = { + }; + unsubscribe(method, subscribeNewTransactionId, params, function (result) { + subscribeNewTransactionId = ""; + document.getElementById('subscribeNewTransactionResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; + document.getElementById('subscribeNewTransactionId').innerText = ""; + document.getElementById('unsubscribeNewTransactionButton').setAttribute("style", "display: none;"); + }); + } + function subscribeAddresses() { const method = 'subscribeAddresses'; var addresses = document.getElementById('subscribeAddressesName').value.split(","); @@ -585,6 +613,20 @@
+
+
+ +
+
+ +
+
+ +
+
+
+
+