Websocket new transaction (#574)

master
kaladin 2021-03-21 14:55:25 -06:00 committed by GitHub
parent 78c8a9d499
commit 1f6cddd4ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 150 additions and 40 deletions

View File

@ -73,6 +73,8 @@ var (
noTxCache = flag.Bool("notxcache", false, "disable tx cache")
enableSubNewTx = flag.Bool("enablesubnewtx", false, "enable support for subscribing to all new transactions")
computeColumnStats = flag.Bool("computedbstats", false, "compute column stats and exit")
computeFeeStatsFlag = flag.Bool("computefeestats", false, "compute fee stats for blocks in blockheight-blockuntil range and exit")
dbStatsPeriodHours = flag.Int("dbstatsperiod", 24, "period of db stats collection in hours, 0 disables stats collection")
@ -405,7 +407,7 @@ func startInternalServer() (*server.InternalServer, error) {
func startPublicServer() (*server.PublicServer, error) {
// start public server in limited functionality, extend it after sync is finished by calling ConnectFullPublicInterface
publicServer, err := server.NewPublicServer(*publicBinding, *certFiles, index, chain, mempool, txCache, *explorerURL, metrics, internalState, *debugMode)
publicServer, err := server.NewPublicServer(*publicBinding, *certFiles, index, chain, mempool, txCache, *explorerURL, metrics, internalState, *debugMode, *enableSubNewTx)
if err != nil {
return nil, err
}

View File

@ -764,12 +764,15 @@ The websocket interface provides the following requests:
The client can subscribe to the following events:
- `subscribeNewBlock` - new block added to blockchain
- `subscribeAddresses` - new transaction for given address (list of addresses)
- `subscribeFiatRates` - new currency rate ticker
- `subscribeNewBlock` - new block added to blockchain
- `subscribeNewTransaction` - new transaction added to blockchain (all addresses)
- `subscribeAddresses` - new transaction for given address (list of addresses)
- `subscribeFiatRates` - new currency rate ticker
There can be always only one subscription of given event per connection, i.e. new list of addresses replaces previous list of addresses.
The subscribeNewTransaction event is not enabled by default. To enable support, blockbook must be run with the `-enablesubnewtx` flag.
_Note: If there is reorg on the backend (blockchain), you will get a new block hash with the same or even smaller height if the reorg is deeper_
Websocket communication format
@ -791,5 +794,3 @@ Example for subscribing to an address (or multiple addresses)
}
}
```

View File

@ -58,7 +58,7 @@ type PublicServer struct {
// NewPublicServer creates new public server http interface to blockbook and returns its handle
// only basic functionality is mapped, to map all functions, call
func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool) (*PublicServer, error) {
func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState, debugMode bool, enableSubNewTx bool) (*PublicServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
if err != nil {
@ -70,7 +70,7 @@ func NewPublicServer(binding string, certFiles string, db *db.RocksDB, chain bch
return nil, err
}
websocket, err := NewWebsocketServer(db, chain, mempool, txCache, metrics, is)
websocket, err := NewWebsocketServer(db, chain, mempool, txCache, metrics, is, enableSubNewTx)
if err != nil {
return nil, err
}

View File

@ -109,7 +109,7 @@ func setupPublicHTTPServer(t *testing.T) (*PublicServer, string) {
}
// s.Run is never called, binding can be to any port
s, err := NewPublicServer("localhost:12345", "", d, chain, mempool, txCache, "", metrics, is, false)
s, err := NewPublicServer("localhost:12345", "", d, chain, mempool, txCache, "", metrics, is, false, false)
if err != nil {
t.Fatal(err)
}
@ -1421,6 +1421,20 @@ func websocketTestsBitcoinType(t *testing.T, ts *httptest.Server) {
},
want: `{"id":"36","data":[{"time":1521514800,"txs":1,"received":"1","sent":"0","sentToSelf":"0","rates":{"eur":1301,"usd":2001}}]}`,
},
{
name: "websocket subscribeNewTransaction",
req: websocketReq{
Method: "subscribeNewTransaction",
},
want: `{"id":"37","data":{"subscribed":false,"message":"subscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}}`,
},
{
name: "websocket unsubscribeNewTransaction",
req: websocketReq{
Method: "unsubscribeNewTransaction",
},
want: `{"id":"38","data":{"subscribed":false,"message":"unsubscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}}`,
},
}
// send all requests at once

View File

@ -57,27 +57,30 @@ type websocketChannel struct {
// WebsocketServer is a handle to websocket server
type WebsocketServer struct {
socket *websocket.Conn
upgrader *websocket.Upgrader
db *db.RocksDB
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
mempool bchain.Mempool
metrics *common.Metrics
is *common.InternalState
api *api.Worker
block0hash string
newBlockSubscriptions map[*websocketChannel]string
newBlockSubscriptionsLock sync.Mutex
addressSubscriptions map[string]map[*websocketChannel]string
addressSubscriptionsLock sync.Mutex
fiatRatesSubscriptions map[string]map[*websocketChannel]string
fiatRatesSubscriptionsLock sync.Mutex
socket *websocket.Conn
upgrader *websocket.Upgrader
db *db.RocksDB
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
mempool bchain.Mempool
metrics *common.Metrics
is *common.InternalState
api *api.Worker
block0hash string
newBlockSubscriptions map[*websocketChannel]string
newBlockSubscriptionsLock sync.Mutex
newTransactionEnabled bool
newTransactionSubscriptions map[*websocketChannel]string
newTransactionSubscriptionsLock sync.Mutex
addressSubscriptions map[string]map[*websocketChannel]string
addressSubscriptionsLock sync.Mutex
fiatRatesSubscriptions map[string]map[*websocketChannel]string
fiatRatesSubscriptionsLock sync.Mutex
}
// NewWebsocketServer creates new websocket interface to blockbook and returns its handle
func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) {
func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState, enableSubNewTx bool) (*WebsocketServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, is)
if err != nil {
return nil, err
@ -92,18 +95,20 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.
WriteBufferSize: 1024 * 32,
CheckOrigin: checkOrigin,
},
db: db,
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
mempool: mempool,
metrics: metrics,
is: is,
api: api,
block0hash: b0,
newBlockSubscriptions: make(map[*websocketChannel]string),
addressSubscriptions: make(map[string]map[*websocketChannel]string),
fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string),
db: db,
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
mempool: mempool,
metrics: metrics,
is: is,
api: api,
block0hash: b0,
newBlockSubscriptions: make(map[*websocketChannel]string),
newTransactionEnabled: enableSubNewTx,
newTransactionSubscriptions: make(map[*websocketChannel]string),
addressSubscriptions: make(map[string]map[*websocketChannel]string),
fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string),
}
return s, nil
}
@ -243,6 +248,7 @@ func (s *WebsocketServer) onConnect(c *websocketChannel) {
func (s *WebsocketServer) onDisconnect(c *websocketChannel) {
s.unsubscribeNewBlock(c)
s.unsubscribeNewTransaction(c)
s.unsubscribeAddresses(c)
s.unsubscribeFiatRates(c)
glog.Info("Client disconnected ", c.id, ", ", c.ip)
@ -346,6 +352,12 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *webs
"unsubscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.unsubscribeNewBlock(c)
},
"subscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.subscribeNewTransaction(c, req)
},
"unsubscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
return s.unsubscribeNewTransaction(c)
},
"subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) {
ad, err := s.unmarshalAddresses(req.Params)
if err == nil {
@ -657,6 +669,10 @@ func (s *WebsocketServer) sendTransaction(tx string) (res resultSendTransaction,
type subscriptionResponse struct {
Subscribed bool `json:"subscribed"`
}
type subscriptionResponseMessage struct {
Subscribed bool `json:"subscribed"`
Message string `json:"message"`
}
func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *websocketReq) (res interface{}, err error) {
s.newBlockSubscriptionsLock.Lock()
@ -672,6 +688,26 @@ func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interfac
return &subscriptionResponse{false}, nil
}
func (s *WebsocketServer) subscribeNewTransaction(c *websocketChannel, req *websocketReq) (res interface{}, err error) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
if !s.newTransactionEnabled {
return &subscriptionResponseMessage{false, "subscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil
}
s.newTransactionSubscriptions[c] = req.ID
return &subscriptionResponse{true}, nil
}
func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res interface{}, err error) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
if !s.newTransactionEnabled {
return &subscriptionResponseMessage{false, "unsubscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil
}
delete(s.newTransactionSubscriptions, c)
return &subscriptionResponse{false}, nil
}
func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]bchain.AddressDescriptor, error) {
r := struct {
Addresses []string `json:"addresses"`
@ -790,6 +826,18 @@ func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
}
func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
for c, id := range s.newTransactionSubscriptions {
c.DataOut(&websocketRes{
ID: id,
Data: &tx,
})
}
glog.Info("broadcasting new tx ", tx.Txid, " to ", len(s.newTransactionSubscriptions), " channels")
}
func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx) {
addrDesc := bchain.AddressDescriptor(stringAddressDescriptor)
addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
@ -868,12 +916,15 @@ func (s *WebsocketServer) getNewTxSubscriptions(tx *bchain.MempoolTx) map[string
// OnNewTx is a callback that broadcasts info about a tx affecting subscribed address
func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
subscribed := s.getNewTxSubscriptions(tx)
if len(subscribed) > 0 {
if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 {
atx, err := s.api.GetTransactionFromMempoolTx(tx)
if err != nil {
glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid)
return
}
s.sendOnNewTx(atx)
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx)
}

View File

@ -55,6 +55,7 @@
pendingMessages = {};
subscriptions = {};
subscribeNewBlockId = "";
subscribeNewTransactionId = "";
subscribeAddressesId = "";
if (server.startsWith("http")) {
server = server.replace("http", "ws");
@ -269,6 +270,33 @@
});
}
function subscribeNewTransaction() {
const method = 'subscribeNewTransaction';
const params = {
};
if (subscribeNewTransactionId) {
delete subscriptions[subscribeNewTransactionId];
subscribeNewTransactionId = "";
}
subscribeNewTransactionId = subscribe(method, params, function (result) {
document.getElementById('subscribeNewTransactionResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
});
document.getElementById('subscribeNewTransactionId').innerText = subscribeNewTransactionId;
document.getElementById('unsubscribeNewTransactionButton').setAttribute("style", "display: inherit;");
}
function unsubscribeNewTransaction() {
const method = 'unsubscribeNewTransaction';
const params = {
};
unsubscribe(method, subscribeNewTransactionId, params, function (result) {
subscribeNewTransactionId = "";
document.getElementById('subscribeNewTransactionResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
document.getElementById('subscribeNewTransactionId').innerText = "";
document.getElementById('unsubscribeNewTransactionButton').setAttribute("style", "display: none;");
});
}
function subscribeAddresses() {
const method = 'subscribeAddresses';
var addresses = document.getElementById('subscribeAddressesName').value.split(",");
@ -585,6 +613,20 @@
<div class="row">
<div class="col" id="subscribeNewBlockResult"></div>
</div>
<div class="row">
<div class="col">
<input class="btn btn-secondary" type="button" value="subscribe new transaction" onclick="subscribeNewTransaction()">
</div>
<div class="col-4">
<span id="subscribeNewTransactionId"></span>
</div>
<div class="col">
<input class="btn btn-secondary" id="unsubscribeNewTransactionButton" style="display: none;" type="button" value="unsubscribe" onclick="unsubscribeNewTransaction()">
</div>
</div>
<div class="row">
<div class="col" id="subscribeNewTransactionResult"></div>
</div>
<div class="row">
<div class="col">
<input class="btn btn-secondary" type="button" value="subscribe address" onclick="subscribeAddresses()">