Store mempool size in internal state

pull/7/head
Martin Boehm 2018-06-01 13:22:56 +02:00
parent c0519ec9c2
commit 17ed8f7d5d
8 changed files with 18 additions and 14 deletions

View File

@ -151,7 +151,7 @@ func (c *blockChainWithMetrics) SendRawTransaction(tx string) (v string, err err
return c.b.SendRawTransaction(tx)
}
func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string)) (err error) {
func (c *blockChainWithMetrics) ResyncMempool(onNewTxAddr func(txid string, addr string)) (count int, err error) {
defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now())
return c.b.ResyncMempool(onNewTxAddr)
}

View File

@ -598,7 +598,8 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*bchain.Tx, error) {
// ResyncMempool gets mempool transactions and maps output scripts to transactions.
// ResyncMempool is not reentrant, it should be called from a single thread.
func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) error {
// It returns number of transactions in mempool
func (b *BitcoinRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) {
return b.Mempool.Resync(onNewTxAddr)
}

View File

@ -496,7 +496,7 @@ func (b *EthereumRPC) SendRawTransaction(tx string) (string, error) {
return "", errors.New("SendRawTransaction: not implemented")
}
func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) error {
func (b *EthereumRPC) ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error) {
return b.Mempool.Resync(onNewTxAddr)
}

View File

@ -47,12 +47,12 @@ func (m *NonUTXOMempool) updateMappings(newTxToInputOutput map[string][]addrInde
// Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error {
func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) {
start := time.Now()
glog.V(1).Info("Mempool: resync")
txs, err := m.chain.GetMempool()
if err != nil {
return err
return 0, err
}
parser := m.chain.GetChainParser()
// allocate slightly larger capacity of the maps
@ -102,5 +102,5 @@ func (m *NonUTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) erro
}
m.updateMappings(newTxToInputOutput, newAddrIDToTx)
glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool")
return nil
return len(m.txToInputOutput), nil
}

View File

@ -166,13 +166,13 @@ func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan outpoint, chanResul
// Resync gets mempool transactions and maps outputs to transactions.
// Resync is not reentrant, it should be called from a single thread.
// Read operations (GetTransactions) are safe.
func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error {
func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) (int, error) {
start := time.Now()
glog.V(1).Info("mempool: resync")
m.onNewTxAddr = onNewTxAddr
txs, err := m.chain.GetMempool()
if err != nil {
return err
return 0, err
}
glog.V(2).Info("mempool: resync ", len(txs), " txs")
// allocate slightly larger capacity of the maps
@ -215,5 +215,5 @@ func (m *UTXOMempool) Resync(onNewTxAddr func(txid string, addr string)) error {
m.updateMappings(newTxToInputOutput, newAddrIDToTx)
m.onNewTxAddr = nil
glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool")
return nil
return len(m.txToInputOutput), nil
}

View File

@ -135,7 +135,7 @@ type BlockChain interface {
EstimateFee(blocks int) (float64, error)
SendRawTransaction(tx string) (string, error)
// mempool
ResyncMempool(onNewTxAddr func(txid string, addr string)) error
ResyncMempool(onNewTxAddr func(txid string, addr string)) (int, error)
GetMempoolTransactions(address string) ([]string, error)
GetMempoolEntry(txid string) (*MempoolEntry, error)
// parser

View File

@ -238,7 +238,7 @@ func main() {
glog.Error("resyncIndex ", err)
return
}
if err = chain.ResyncMempool(nil); err != nil {
if _, err = chain.ResyncMempool(nil); err != nil {
glog.Error("resyncMempool ", err)
return
}
@ -385,10 +385,11 @@ func syncMempoolLoop() {
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
tickAndDebounce(resyncMempoolPeriodMs*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() {
internalState.StartedMempoolSync()
if err := chain.ResyncMempool(onNewTxAddr); err != nil {
if count, err := chain.ResyncMempool(onNewTxAddr); err != nil {
glog.Error("syncMempoolLoop ", errors.ErrorStack(err))
} else {
internalState.FinishedMempoolSync()
internalState.FinishedMempoolSync(count)
}
})
glog.Info("syncMempoolLoop stopped")

View File

@ -38,6 +38,7 @@ type InternalState struct {
LastSync time.Time `json:"lastSync"`
IsMempoolSynchronized bool `json:"isMempoolSynchronized"`
MempoolSize int `json:"mempoolSize"`
LastMempoolSync time.Time `json:"lastMempoolSync"`
DbColumns []InternalStateColumn `json:"dbColumns"`
@ -81,10 +82,11 @@ func (is *InternalState) StartedMempoolSync() {
}
// FinishedMempoolSync marks end of mempool synchronization
func (is *InternalState) FinishedMempoolSync() {
func (is *InternalState) FinishedMempoolSync(mempoolSize int) {
is.mux.Lock()
defer is.mux.Unlock()
is.IsMempoolSynchronized = true
is.MempoolSize = mempoolSize
is.LastMempoolSync = time.Now()
}