Add option to compute fee statistics for chosen blocks

pull/187/head^2
Martin Boehm 2019-04-28 11:54:12 +02:00
parent 228d40e7a5
commit 984618a0ed
5 changed files with 113 additions and 37 deletions

View File

@ -8,7 +8,10 @@ import (
"bytes"
"encoding/json"
"fmt"
"math"
"math/big"
"os"
"sort"
"strconv"
"time"
@ -596,7 +599,7 @@ func (w *Worker) getEthereumTypeAddressBalances(addrDesc bchain.AddressDescripto
return ba, tokens, ci, n, nonContractTxs, totalResults, nil
}
func (w *Worker) txFromTxid(txid string, bestheight uint32, option AccountDetails) (*Tx, error) {
func (w *Worker) txFromTxid(txid string, bestheight uint32, option AccountDetails, blockInfo *db.BlockInfo) (*Tx, error) {
var tx *Tx
var err error
// only ChainBitcoinType supports TxHistoryLight
@ -610,16 +613,18 @@ func (w *Worker) txFromTxid(txid string, bestheight uint32, option AccountDetail
// as fallback, provide empty TxAddresses to return at least something
ta = &db.TxAddresses{}
}
bi, err := w.db.GetBlockInfo(ta.Height)
if err != nil {
return nil, errors.Annotatef(err, "GetBlockInfo %v", ta.Height)
if blockInfo == nil {
blockInfo, err = w.db.GetBlockInfo(ta.Height)
if err != nil {
return nil, errors.Annotatef(err, "GetBlockInfo %v", ta.Height)
}
if blockInfo == nil {
glog.Warning("DB inconsistency: block height ", ta.Height, ": not found in db")
// provide empty BlockInfo to return the rest of tx data
blockInfo = &db.BlockInfo{}
}
}
if bi == nil {
glog.Warning("DB inconsistency: block height ", ta.Height, ": not found in db")
// provide empty BlockInfo to return the rest of tx data
bi = &db.BlockInfo{}
}
tx = w.txFromTxAddress(txid, ta, bi, bestheight)
tx = w.txFromTxAddress(txid, ta, blockInfo, bestheight)
} else {
tx, err = w.GetTransaction(txid, false, true)
if err != nil {
@ -750,7 +755,7 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco
if option == AccountDetailsTxidHistory {
txids = append(txids, txid)
} else {
tx, err := w.txFromTxid(txid, bestheight, option)
tx, err := w.txFromTxid(txid, bestheight, option, nil)
if err != nil {
return nil, err
}
@ -991,22 +996,9 @@ func (w *Worker) GetBlock(bid string, page int, txsOnPage int) (*Block, error) {
txs := make([]*Tx, to-from)
txi := 0
for i := from; i < to; i++ {
txid := bi.Txids[i]
if w.chainType == bchain.ChainBitcoinType {
ta, err := w.db.GetTxAddresses(txid)
if err != nil {
return nil, errors.Annotatef(err, "GetTxAddresses %v", txid)
}
if ta == nil {
glog.Warning("DB inconsistency: tx ", txid, ": not found in txAddresses")
continue
}
txs[txi] = w.txFromTxAddress(txid, ta, dbi, bestheight)
} else {
txs[txi], err = w.GetTransaction(txid, false, false)
if err != nil {
return nil, err
}
txs[txi], err = w.txFromTxid(bi.Txids[i], bestheight, AccountDetailsTxHistoryLight, dbi)
if err != nil {
return nil, err
}
txi++
}
@ -1041,6 +1033,66 @@ func (w *Worker) GetBlock(bid string, page int, txsOnPage int) (*Block, error) {
}, nil
}
// ComputeFeeStats computes fee distribution in defined blocks and logs them to log
func (w *Worker) ComputeFeeStats(blockFrom, blockTo int, stopCompute chan os.Signal) error {
bestheight, _, err := w.db.GetBestBlock()
if err != nil {
return errors.Annotatef(err, "GetBestBlock")
}
for block := blockFrom; block <= blockTo; block++ {
hash, err := w.db.GetBlockHash(uint32(block))
if err != nil {
return err
}
bi, err := w.chain.GetBlockInfo(hash)
if err != nil {
return err
}
// process only blocks with enough transactions
if len(bi.Txids) > 20 {
dbi := &db.BlockInfo{
Hash: bi.Hash,
Height: bi.Height,
Time: bi.Time,
}
txids := bi.Txids
if w.chainType == bchain.ChainBitcoinType {
// skip the coinbase transaction
txids = txids[1:]
}
fees := make([]int64, len(txids))
sum := int64(0)
for i, txid := range txids {
select {
case <-stopCompute:
glog.Info("ComputeFeeStats interrupted at height ", block)
return db.ErrOperationInterrupted
default:
tx, err := w.txFromTxid(txid, bestheight, AccountDetailsTxHistoryLight, dbi)
if err != nil {
return err
}
fee := tx.FeesSat.AsInt64()
fees[i] = fee
sum += fee
}
}
sort.Slice(fees, func(i, j int) bool { return fees[i] < fees[j] })
step := float64(len(fees)) / 10
percentils := ""
for i := float64(0); i < float64(len(fees)+1); i += step {
ii := int(math.Round(i))
if ii >= len(fees) {
ii = len(fees) - 1
}
percentils += "," + strconv.FormatInt(fees[ii], 10)
}
glog.Info(block, ",", time.Unix(bi.Time, 0).Format(time.RFC3339), ",", len(bi.Txids), ",", sum, ",", float64(sum)/float64(len(bi.Txids)), percentils)
}
}
return nil
}
// GetSystemInfo returns information about system
func (w *Worker) GetSystemInfo(internal bool) (*SystemInfo, error) {
start := time.Now()

View File

@ -495,7 +495,7 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
if option == AccountDetailsTxidHistory {
txids = append(txids, xpubTxid.txid)
} else {
tx, err := w.txFromTxid(xpubTxid.txid, bestheight, option)
tx, err := w.txFromTxid(xpubTxid.txid, bestheight, option, nil)
if err != nil {
return nil, err
}

View File

@ -68,8 +68,9 @@ var (
noTxCache = flag.Bool("notxcache", false, "disable tx cache")
computeColumnStats = flag.Bool("computedbstats", false, "compute column stats and exit")
dbStatsPeriodHours = flag.Int("dbstatsperiod", 24, "period of db stats collection in hours, 0 disables stats collection")
computeColumnStats = flag.Bool("computedbstats", false, "compute column stats and exit")
computeFeeStatsFlag = flag.Bool("computedfeestats", false, "compute fee stats for blocks in blockheight-blockuntil range and exit")
dbStatsPeriodHours = flag.Int("dbstatsperiod", 24, "period of db stats collection in hours, 0 disables stats collection")
// resync index at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ)
resyncIndexPeriodMs = flag.Int("resyncindexperiod", 935093, "resync index period in milliseconds")
@ -179,6 +180,16 @@ func mainWithExitCode() int {
glog.Warning("internalState: database was left in open state, possibly previous ungraceful shutdown")
}
if *computeFeeStatsFlag {
internalState.DbState = common.DbStateOpen
err = computeFeeStats(chanOsSignal, *blockFrom, *blockUntil, index, chain, txCache, internalState, metrics)
if err != nil && err != db.ErrOperationInterrupted {
glog.Error("computeFeeStats: ", err)
return exitCodeFatal
}
return exitCodeOK
}
if *computeColumnStats {
internalState.DbState = common.DbStateOpen
err = index.ComputeInternalStateColumnStats(chanOsSignal)
@ -244,7 +255,7 @@ func mainWithExitCode() int {
internalState.SyncMode = true
internalState.InitialSync = true
if err := syncWorker.ResyncIndex(nil, true); err != nil {
if err != db.ErrSyncInterrupted {
if err != db.ErrOperationInterrupted {
glog.Error("resyncIndex ", err)
return exitCodeFatal
}
@ -288,7 +299,7 @@ func mainWithExitCode() int {
if !*synchronize {
if err = syncWorker.ConnectBlocksParallel(height, until); err != nil {
if err != db.ErrSyncInterrupted {
if err != db.ErrOperationInterrupted {
glog.Error("connectBlocksParallel ", err)
return exitCodeFatal
}
@ -613,3 +624,16 @@ func normalizeName(s string) string {
s = strings.Replace(s, " ", "-", -1)
return s
}
// computeFeeStats computes fee distribution in defined blocks
func computeFeeStats(stopCompute chan os.Signal, blockFrom, blockTo int, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error {
start := time.Now()
glog.Info("computeFeeStats start")
api, err := api.NewWorker(db, chain, mempool, txCache, is)
if err != nil {
return err
}
err = api.ComputeFeeStats(blockFrom, blockTo, stopCompute)
glog.Info("computeFeeStats finished in ", time.Since(start))
return err
}

View File

@ -45,8 +45,8 @@ func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk
var errSynced = errors.New("synced")
// ErrSyncInterrupted is returned when synchronization is interrupted by OS signal
var ErrSyncInterrupted = errors.New("ErrSyncInterrupted")
// ErrOperationInterrupted is returned when operation is interrupted by OS signal
var ErrOperationInterrupted = errors.New("ErrOperationInterrupted")
// ResyncIndex synchronizes index to the top of the blockchain
// onNewBlock is called when new block is connected, but not in initial parallel sync
@ -206,7 +206,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync
select {
case <-w.chanOsSignal:
glog.Info("connectBlocks interrupted at height ", lastRes.block.Height)
return ErrSyncInterrupted
return ErrOperationInterrupted
case res := <-bch:
if res == empty {
break ConnectLoop
@ -330,7 +330,7 @@ ConnectLoop:
select {
case <-w.chanOsSignal:
glog.Info("connectBlocksParallel interrupted at height ", h)
err = ErrSyncInterrupted
err = ErrOperationInterrupted
// signal all workers to terminate their loops (error loops are interrupted below)
close(terminating)
break ConnectLoop

View File

@ -25,7 +25,7 @@ func testConnectBlocks(t *testing.T, h *TestHandler) {
close(ch)
}
}, true)
if err != nil && err != db.ErrSyncInterrupted {
if err != nil && err != db.ErrOperationInterrupted {
t.Fatal(err)
}