Notify socket.io subscribers about new block

indexv1
Martin Boehm 2018-02-22 13:32:06 +01:00
parent 0c456fe245
commit 26c726c771
4 changed files with 31 additions and 14 deletions

View File

@ -71,7 +71,7 @@ func (m *Mempool) updateMappings(newTxToInputOutput map[string]inputOutput, newS
// Resync gets mempool transactions and maps output scripts to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
func (m *Mempool) Resync() error {
func (m *Mempool) Resync(onNewTxAddr func(txid string, addr string)) error {
start := time.Now()
glog.V(1).Info("Mempool: resync")
txs, err := m.chain.GetMempool()
@ -95,6 +95,9 @@ func (m *Mempool) Resync() error {
if outputScript != "" {
io.outputScripts = append(io.outputScripts, scriptIndex{outputScript, output.N})
}
if onNewTxAddr != nil && len(output.ScriptPubKey.Addresses) == 1 {
onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0])
}
}
io.inputs = make([]outpoint, 0, len(tx.Vin))
for _, input := range tx.Vin {

View File

@ -64,15 +64,15 @@ var (
)
var (
chanSyncIndex = make(chan struct{})
chanSyncMempool = make(chan struct{})
chanSyncIndexDone = make(chan struct{})
chanSyncMempoolDone = make(chan struct{})
chain *bchain.BitcoinRPC
mempool *bchain.Mempool
index *db.RocksDB
callbackOnNewIndexHash []func(hash string)
callbackOnNewTxAddr []func(txid string, addr string)
chanSyncIndex = make(chan struct{})
chanSyncMempool = make(chan struct{})
chanSyncIndexDone = make(chan struct{})
chanSyncMempoolDone = make(chan struct{})
chain *bchain.BitcoinRPC
mempool *bchain.Mempool
index *db.RocksDB
callbacksOnNewBlockHash []func(hash string)
callbacksOnNewTxAddr []func(txid string, addr string)
)
func main() {
@ -171,7 +171,8 @@ func main() {
}
}
}()
callbackOnNewIndexHash = append(callbackOnNewIndexHash, socketIoServer.OnNewBlockHash)
callbacksOnNewBlockHash = append(callbacksOnNewBlockHash, socketIoServer.OnNewBlockHash)
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, socketIoServer.OnNewTxAddr)
}
if *synchronize {
@ -270,7 +271,7 @@ func syncIndexLoop() {
}
func onNewBlockHash(hash string) {
for _, c := range callbackOnNewIndexHash {
for _, c := range callbacksOnNewBlockHash {
c(hash)
}
}
@ -280,13 +281,19 @@ func syncMempoolLoop() {
glog.Info("syncMempoolLoop starting")
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
tickAndDebounce(resyncMempoolPeriodMs*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() {
if err := mempool.Resync(); err != nil {
if err := mempool.Resync(onNewTxAddr); err != nil {
glog.Error("syncMempoolLoop", err)
}
})
glog.Info("syncMempoolLoop stopped")
}
func onNewTxAddr(txid string, addr string) {
for _, c := range callbacksOnNewTxAddr {
c(txid, addr)
}
}
func mqHandler(m *bchain.MQMessage) {
body := hex.EncodeToString(m.Body)
glog.V(1).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body)

View File

@ -655,7 +655,7 @@ func (s *SocketIoServer) onSubscribe(c *gosocketio.Channel, req []byte) interfac
return nil
}
for _, a := range addrs {
c.Join(sc + "-" + a)
c.Join("bitcoind/addresstxid-" + a)
}
} else {
sc = r[1 : len(r)-1]
@ -673,3 +673,9 @@ func (s *SocketIoServer) OnNewBlockHash(hash string) {
glog.Info("broadcasting new block hash ", hash)
s.server.BroadcastTo("bitcoind/hashblock", "bitcoind/hashblock", hash)
}
// OnNewTxAddr notifies users subscribed to bitcoind/addresstxid about new block
func (s *SocketIoServer) OnNewTxAddr(txid string, addr string) {
glog.Info("broadcasting new txid ", txid, " for addr ", addr)
s.server.BroadcastTo("bitcoind/addresstxid-"+addr, "bitcoind/addresstxid", map[string]string{"address": addr, "txid": txid})
}

View File

@ -201,6 +201,7 @@
socket.on("bitcoind/addresstxid", function (result) {
console.log('on bitcoind/addresstxid');
console.log(result);
document.getElementById('subscribeAddressTxidResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
});
}
</script>