Prepare notification of block height in OnNewBlock

pull/53/head
Martin Boehm 2018-09-11 13:37:12 +02:00
parent 6dcf3fd45d
commit 8bdf4b0ae3
9 changed files with 26 additions and 20 deletions

View File

@ -191,7 +191,7 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err
return c.b.SendRawTransaction(tx) return c.b.SendRawTransaction(tx)
} }
func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (count int, err error) { func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) {
defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now()) defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now())
count, err = c.b.ResyncMempool(onNewTxAddr) count, err = c.b.ResyncMempool(onNewTxAddr)
if err == nil { if err == nil {

View File

@ -607,7 +607,7 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) {
// ResyncMempool gets mempool transactions and maps output scripts to transactions. // ResyncMempool gets mempool transactions and maps output scripts to transactions.
// ResyncMempool is not reentrant, it should be called from a single thread. // ResyncMempool is not reentrant, it should be called from a single thread.
// It returns number of transactions in mempool // It returns number of transactions in mempool
func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { func (b *BitcoinRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) {
return b.Mempool.Resync(onNewTxAddr) return b.Mempool.Resync(onNewTxAddr)
} }

View File

@ -530,7 +530,7 @@ func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) {
return result, nil return result, nil
} }
func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { func (b *EthereumRPC) ResyncMempool(onNewTxAddr bchain.OnNewTxAddrFunc) (int, error) {
return b.Mempool.Resync(onNewTxAddr) return b.Mempool.Resync(onNewTxAddr)
} }

View File

@ -47,7 +47,7 @@ func (m *NonUTXOMempool) updateMappings(newTxToInputOutput map[string][]addrInde
// Resync gets mempool transactions and maps outputs to transactions. // Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread. // Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe. // Read operations (GetTransactions) are safe.
func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { func (m *NonUTXOMempool) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
start := time.Now() start := time.Now()
glog.V(1).Info("Mempool: resync") glog.V(1).Info("Mempool: resync")
txs, err := m.chain.GetMempool() txs, err := m.chain.GetMempool()

View File

@ -31,7 +31,7 @@ type UTXOMempool struct {
addrIDToTx map[string][]outpoint addrIDToTx map[string][]outpoint
chanTxid chan string chanTxid chan string
chanAddrIndex chan txidio chanAddrIndex chan txidio
onNewTxAddr func(txid string, addr string, isOutput bool) onNewTxAddr OnNewTxAddrFunc
} }
// NewUTXOMempool creates new mempool handler. // NewUTXOMempool creates new mempool handler.
@ -166,7 +166,7 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul
// Resync gets mempool transactions and maps outputs to transactions. // Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread. // Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe. // Read operations (GetTransactions) are safe.
func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) { func (m *UTXOMempool) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
start := time.Now() start := time.Now()
glog.V(1).Info("mempool: resync") glog.V(1).Info("mempool: resync")
m.onNewTxAddr = onNewTxAddr m.onNewTxAddr = onNewTxAddr

View File

@ -112,6 +112,12 @@ func (e *RPCError) Error() string {
return fmt.Sprintf("%d: %s", e.Code, e.Message) return fmt.Sprintf("%d: %s", e.Code, e.Message)
} }
// OnNewBlockFunc is used to send notification about a new block
type OnNewBlockFunc func(hash string, height uint32)
// OnNewTxAddrFunc is used to send notification about a new transaction/address
type OnNewTxAddrFunc func(txid string, addr string, isOutput bool)
// BlockChain defines common interface to block chain daemon // BlockChain defines common interface to block chain daemon
type BlockChain interface { type BlockChain interface {
// life-cycle methods // life-cycle methods
@ -136,7 +142,7 @@ type BlockChain interface {
EstimateFee(blocks int) (float64, error) EstimateFee(blocks int) (float64, error)
SendRawTransaction(tx string) (string, error) SendRawTransaction(tx string) (string, error)
// mempool // mempool
ResyncMempool(onNewTxAddr func(txid string, addr string, isOutput bool)) (int, error) ResyncMempool(onNewTxAddr OnNewTxAddrFunc) (int, error)
GetMempoolTransactions(address string) ([]string, error) GetMempoolTransactions(address string) ([]string, error)
GetMempoolEntry(txid string) (*MempoolEntry, error) GetMempoolEntry(txid string) (*MempoolEntry, error)
// parser // parser

View File

@ -85,8 +85,8 @@ var (
txCache *db.TxCache txCache *db.TxCache
syncWorker *db.SyncWorker syncWorker *db.SyncWorker
internalState *common.InternalState internalState *common.InternalState
callbacksOnNewBlockHash []func(hash string) callbacksOnNewBlock []bchain.OnNewBlockFunc
callbacksOnNewTxAddr []func(txid string, addr string, isOutput bool) callbacksOnNewTxAddr []bchain.OnNewTxAddrFunc
chanOsSignal chan os.Signal chanOsSignal chan os.Signal
inShutdown int32 inShutdown int32
) )
@ -281,7 +281,7 @@ func main() {
} }
} }
}() }()
callbacksOnNewBlockHash = append(callbacksOnNewBlockHash, publicServer.OnNewBlockHash) callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock)
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr) callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr)
} }
@ -392,9 +392,9 @@ func syncIndexLoop() {
glog.Info("syncIndexLoop stopped") glog.Info("syncIndexLoop stopped")
} }
func onNewBlockHash(hash string) { func onNewBlockHash(hash string, height uint32) {
for _, c := range callbacksOnNewBlockHash { for _, c := range callbacksOnNewBlock {
c(hash) c(hash, height)
} }
} }

View File

@ -47,7 +47,7 @@ var errSynced = errors.New("synced")
// ResyncIndex synchronizes index to the top of the blockchain // ResyncIndex synchronizes index to the top of the blockchain
// onNewBlock is called when new block is connected, but not in initial parallel sync // onNewBlock is called when new block is connected, but not in initial parallel sync
func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { func (w *SyncWorker) ResyncIndex(onNewBlock bchain.OnNewBlockFunc) error {
start := time.Now() start := time.Now()
w.is.StartedSync() w.is.StartedSync()
@ -75,7 +75,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
return err return err
} }
func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc) error {
remoteBestHash, err := w.chain.GetBestBlockHash() remoteBestHash, err := w.chain.GetBestBlockHash()
if err != nil { if err != nil {
return err return err
@ -135,7 +135,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
return w.connectBlocks(onNewBlock) return w.connectBlocks(onNewBlock)
} }
func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock func(hash string)) error { func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock bchain.OnNewBlockFunc) error {
// find forked blocks, disconnect them and then synchronize again // find forked blocks, disconnect them and then synchronize again
var height uint32 var height uint32
hashes := []string{localBestHash} hashes := []string{localBestHash}
@ -163,7 +163,7 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on
return w.resyncIndex(onNewBlock) return w.resyncIndex(onNewBlock)
} }
func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc) error {
bch := make(chan blockResult, 8) bch := make(chan blockResult, 8)
done := make(chan struct{}) done := make(chan struct{})
defer close(done) defer close(done)
@ -181,7 +181,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error {
return err return err
} }
if onNewBlock != nil { if onNewBlock != nil {
onNewBlock(res.block.Hash) onNewBlock(res.block.Hash, res.block.Height)
} }
if res.block.Height > 0 && res.block.Height%1000 == 0 { if res.block.Height > 0 && res.block.Height%1000 == 0 {
glog.Info("connected block ", res.block.Height, " ", res.block.Hash) glog.Info("connected block ", res.block.Height, " ", res.block.Hash)

View File

@ -139,8 +139,8 @@ func (s *PublicServer) Shutdown(ctx context.Context) error {
return s.https.Shutdown(ctx) return s.https.Shutdown(ctx)
} }
// OnNewBlockHash notifies users subscribed to bitcoind/hashblock about new block // OnNewBlock notifies users subscribed to bitcoind/hashblock about new block
func (s *PublicServer) OnNewBlockHash(hash string) { func (s *PublicServer) OnNewBlock(hash string, height uint32) {
s.socketio.OnNewBlockHash(hash) s.socketio.OnNewBlockHash(hash)
} }