Notify socket.io subscribers about new block

indexv1
Martin Boehm 2018-02-22 13:01:35 +01:00
parent 5cfe48d619
commit 0c456fe245
3 changed files with 72 additions and 21 deletions

View File

@ -64,13 +64,15 @@ var (
)
var (
chanSyncIndex = make(chan struct{})
chanSyncMempool = make(chan struct{})
chanSyncIndexDone = make(chan struct{})
chanSyncMempoolDone = make(chan struct{})
chain *bchain.BitcoinRPC
mempool *bchain.Mempool
index *db.RocksDB
chanSyncIndex = make(chan struct{})
chanSyncMempool = make(chan struct{})
chanSyncIndexDone = make(chan struct{})
chanSyncMempoolDone = make(chan struct{})
chain *bchain.BitcoinRPC
mempool *bchain.Mempool
index *db.RocksDB
callbackOnNewIndexHash []func(hash string)
callbackOnNewTxAddr []func(txid string, addr string)
)
func main() {
@ -130,12 +132,9 @@ func main() {
}
if *synchronize {
if err := resyncIndex(true); err != nil {
if err := resyncIndex(true, nil); err != nil {
glog.Fatal("resyncIndex ", err)
}
go syncIndexLoop()
go syncMempoolLoop()
chanSyncMempool <- struct{}{}
}
var httpServer *server.HTTPServer
@ -172,6 +171,15 @@ func main() {
}
}
}()
callbackOnNewIndexHash = append(callbackOnNewIndexHash, socketIoServer.OnNewBlockHash)
}
if *synchronize {
// start the synchronization loops after the server interfaces are started
go syncIndexLoop()
go syncMempoolLoop()
// sync mempool immediately
chanSyncMempool <- struct{}{}
}
var mq *bchain.MQ
@ -214,7 +222,7 @@ func main() {
}
}
if httpServer != nil || mq != nil {
if httpServer != nil || socketIoServer != nil || mq != nil {
waitForSignalAndShutdown(httpServer, socketIoServer, mq, 5*time.Second)
}
@ -254,13 +262,19 @@ func syncIndexLoop() {
glog.Info("syncIndexLoop starting")
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
tickAndDebounce(resyncIndexPeriodMs*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
if err := resyncIndex(false); err != nil {
if err := resyncIndex(false, onNewBlockHash); err != nil {
glog.Error("syncIndexLoop", err)
}
})
glog.Info("syncIndexLoop stopped")
}
func onNewBlockHash(hash string) {
for _, c := range callbackOnNewIndexHash {
c(hash)
}
}
func syncMempoolLoop() {
defer close(chanSyncMempoolDone)
glog.Info("syncMempoolLoop starting")
@ -320,7 +334,7 @@ func printResult(txid string, vout uint32, isOutput bool) error {
return nil
}
func resyncIndex(bulk bool) error {
func resyncIndex(bulk bool, onNewBlock func(hash string)) error {
remote, err := chain.GetBestBlockHash()
if err != nil {
return err
@ -375,7 +389,7 @@ func resyncIndex(bulk bool) error {
if err != nil {
return err
}
return resyncIndex(false)
return resyncIndex(false, onNewBlock)
}
}
@ -420,16 +434,14 @@ func resyncIndex(bulk bool) error {
}
// after parallel load finish the sync using standard way,
// new blocks may have been created in the meantime
return resyncIndex(false)
return resyncIndex(false, onNewBlock)
}
}
return connectBlocks(hash)
return connectBlocks(hash, onNewBlock)
}
func connectBlocks(
hash string,
) error {
func connectBlocks(hash string, onNewBlock func(hash string)) error {
bch := make(chan blockResult, 8)
done := make(chan struct{})
defer close(done)
@ -446,6 +458,9 @@ func connectBlocks(
if err != nil {
return err
}
if onNewBlock != nil {
onNewBlock(res.block.Hash)
}
}
if lastRes.block != nil {

View File

@ -634,7 +634,42 @@ func (s *SocketIoServer) sendTransaction(tx string) (res resultSendTransaction,
return
}
// onSubscribe expects two event subscriptions based on the req parameter (including the doublequotes):
// "bitcoind/hashblock"
// "bitcoind/addresstxid",["2MzTmvPJLZaLzD9XdN3jMtQA5NexC3rAPww","2NAZRJKr63tSdcTxTN3WaE9ZNDyXy6PgGuv"]
func (s *SocketIoServer) onSubscribe(c *gosocketio.Channel, req []byte) interface{} {
glog.Info(c.Id(), " onSubscribe ", string(req))
r := string(req)
glog.Info(c.Id(), " onSubscribe ", r)
var sc string
i := strings.Index(r, "\",[")
if i > 0 {
var addrs []string
sc = r[1:i]
if sc != "bitcoind/addresstxid" {
glog.Error(c.Id(), " onSubscribe ", sc, ": invalid data")
return nil
}
err := json.Unmarshal([]byte(r[i+2:]), &addrs)
if err != nil {
glog.Error(c.Id(), " onSubscribe ", sc, ": ", err)
return nil
}
for _, a := range addrs {
c.Join(sc + "-" + a)
}
} else {
sc = r[1 : len(r)-1]
if sc != "bitcoind/hashblock" {
glog.Error(c.Id(), " onSubscribe ", sc, ": invalid data")
return nil
}
c.Join(sc)
}
return nil
}
// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block
func (s *SocketIoServer) OnNewBlockHash(hash string) {
glog.Info("broadcasting new block hash ", hash)
s.server.BroadcastTo("bitcoind/hashblock", "bitcoind/hashblock", hash)
}

View File

@ -188,6 +188,7 @@
socket.on("bitcoind/hashblock", function (result) {
console.log('on bitcoind/hashblock');
console.log(result);
document.getElementById('subscribeHashBlockResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n";
});
}