diff --git a/blockbook.go b/blockbook.go index bc135177..9635ab0e 100644 --- a/blockbook.go +++ b/blockbook.go @@ -64,13 +64,15 @@ var ( ) var ( - chanSyncIndex = make(chan struct{}) - chanSyncMempool = make(chan struct{}) - chanSyncIndexDone = make(chan struct{}) - chanSyncMempoolDone = make(chan struct{}) - chain *bchain.BitcoinRPC - mempool *bchain.Mempool - index *db.RocksDB + chanSyncIndex = make(chan struct{}) + chanSyncMempool = make(chan struct{}) + chanSyncIndexDone = make(chan struct{}) + chanSyncMempoolDone = make(chan struct{}) + chain *bchain.BitcoinRPC + mempool *bchain.Mempool + index *db.RocksDB + callbackOnNewIndexHash []func(hash string) + callbackOnNewTxAddr []func(txid string, addr string) ) func main() { @@ -130,12 +132,9 @@ func main() { } if *synchronize { - if err := resyncIndex(true); err != nil { + if err := resyncIndex(true, nil); err != nil { glog.Fatal("resyncIndex ", err) } - go syncIndexLoop() - go syncMempoolLoop() - chanSyncMempool <- struct{}{} } var httpServer *server.HTTPServer @@ -172,6 +171,15 @@ func main() { } } }() + callbackOnNewIndexHash = append(callbackOnNewIndexHash, socketIoServer.OnNewBlockHash) + } + + if *synchronize { + // start the synchronization loops after the server interfaces are started + go syncIndexLoop() + go syncMempoolLoop() + // sync mempool immediately + chanSyncMempool <- struct{}{} } var mq *bchain.MQ @@ -214,7 +222,7 @@ func main() { } } - if httpServer != nil || mq != nil { + if httpServer != nil || socketIoServer != nil || mq != nil { waitForSignalAndShutdown(httpServer, socketIoServer, mq, 5*time.Second) } @@ -254,13 +262,19 @@ func syncIndexLoop() { glog.Info("syncIndexLoop starting") // resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second tickAndDebounce(resyncIndexPeriodMs*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() { - if err := resyncIndex(false); err != nil { + if err := resyncIndex(false, onNewBlockHash); err != nil { glog.Error("syncIndexLoop", err) } }) glog.Info("syncIndexLoop stopped") } +func onNewBlockHash(hash string) { + for _, c := range callbackOnNewIndexHash { + c(hash) + } +} + func syncMempoolLoop() { defer close(chanSyncMempoolDone) glog.Info("syncMempoolLoop starting") @@ -320,7 +334,7 @@ func printResult(txid string, vout uint32, isOutput bool) error { return nil } -func resyncIndex(bulk bool) error { +func resyncIndex(bulk bool, onNewBlock func(hash string)) error { remote, err := chain.GetBestBlockHash() if err != nil { return err @@ -375,7 +389,7 @@ func resyncIndex(bulk bool) error { if err != nil { return err } - return resyncIndex(false) + return resyncIndex(false, onNewBlock) } } @@ -420,16 +434,14 @@ func resyncIndex(bulk bool) error { } // after parallel load finish the sync using standard way, // new blocks may have been created in the meantime - return resyncIndex(false) + return resyncIndex(false, onNewBlock) } } - return connectBlocks(hash) + return connectBlocks(hash, onNewBlock) } -func connectBlocks( - hash string, -) error { +func connectBlocks(hash string, onNewBlock func(hash string)) error { bch := make(chan blockResult, 8) done := make(chan struct{}) defer close(done) @@ -446,6 +458,9 @@ func connectBlocks( if err != nil { return err } + if onNewBlock != nil { + onNewBlock(res.block.Hash) + } } if lastRes.block != nil { diff --git a/server/socketio.go b/server/socketio.go index 4827bf4a..a71f8961 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -634,7 +634,42 @@ func (s *SocketIoServer) sendTransaction(tx string) (res resultSendTransaction, return } +// onSubscribe expects two event subscriptions based on the req parameter (including the doublequotes): +// "bitcoind/hashblock" +// "bitcoind/addresstxid",["2MzTmvPJLZaLzD9XdN3jMtQA5NexC3rAPww","2NAZRJKr63tSdcTxTN3WaE9ZNDyXy6PgGuv"] func (s *SocketIoServer) onSubscribe(c *gosocketio.Channel, req []byte) interface{} { - glog.Info(c.Id(), " onSubscribe ", string(req)) + r := string(req) + glog.Info(c.Id(), " onSubscribe ", r) + var sc string + i := strings.Index(r, "\",[") + if i > 0 { + var addrs []string + sc = r[1:i] + if sc != "bitcoind/addresstxid" { + glog.Error(c.Id(), " onSubscribe ", sc, ": invalid data") + return nil + } + err := json.Unmarshal([]byte(r[i+2:]), &addrs) + if err != nil { + glog.Error(c.Id(), " onSubscribe ", sc, ": ", err) + return nil + } + for _, a := range addrs { + c.Join(sc + "-" + a) + } + } else { + sc = r[1 : len(r)-1] + if sc != "bitcoind/hashblock" { + glog.Error(c.Id(), " onSubscribe ", sc, ": invalid data") + return nil + } + c.Join(sc) + } return nil } + +// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block +func (s *SocketIoServer) OnNewBlockHash(hash string) { + glog.Info("broadcasting new block hash ", hash) + s.server.BroadcastTo("bitcoind/hashblock", "bitcoind/hashblock", hash) +} diff --git a/server/static/test.html b/server/static/test.html index 02921168..574de0c1 100644 --- a/server/static/test.html +++ b/server/static/test.html @@ -188,6 +188,7 @@ socket.on("bitcoind/hashblock", function (result) { console.log('on bitcoind/hashblock'); console.log(result); + document.getElementById('subscribeHashBlockResult').innerText += JSON.stringify(result).replace(/,/g, ", ") + "\n"; }); }