From 984618a0edac7bdea680b9094d671efb9b3474b9 Mon Sep 17 00:00:00 2001 From: Martin Boehm Date: Sun, 28 Apr 2019 11:54:12 +0200 Subject: [PATCH] Add option to compute fee statistics for chosen blocks --- api/worker.go | 106 +++++++++++++++++++++++++++--------- api/xpub.go | 2 +- blockbook.go | 32 +++++++++-- db/sync.go | 8 +-- tests/sync/connectblocks.go | 2 +- 5 files changed, 113 insertions(+), 37 deletions(-) diff --git a/api/worker.go b/api/worker.go index 642a90ec..7cb6e732 100644 --- a/api/worker.go +++ b/api/worker.go @@ -8,7 +8,10 @@ import ( "bytes" "encoding/json" "fmt" + "math" "math/big" + "os" + "sort" "strconv" "time" @@ -596,7 +599,7 @@ func (w *Worker) getEthereumTypeAddressBalances(addrDesc bchain.AddressDescripto return ba, tokens, ci, n, nonContractTxs, totalResults, nil } -func (w *Worker) txFromTxid(txid string, bestheight uint32, option AccountDetails) (*Tx, error) { +func (w *Worker) txFromTxid(txid string, bestheight uint32, option AccountDetails, blockInfo *db.BlockInfo) (*Tx, error) { var tx *Tx var err error // only ChainBitcoinType supports TxHistoryLight @@ -610,16 +613,18 @@ func (w *Worker) txFromTxid(txid string, bestheight uint32, option AccountDetail // as fallback, provide empty TxAddresses to return at least something ta = &db.TxAddresses{} } - bi, err := w.db.GetBlockInfo(ta.Height) - if err != nil { - return nil, errors.Annotatef(err, "GetBlockInfo %v", ta.Height) + if blockInfo == nil { + blockInfo, err = w.db.GetBlockInfo(ta.Height) + if err != nil { + return nil, errors.Annotatef(err, "GetBlockInfo %v", ta.Height) + } + if blockInfo == nil { + glog.Warning("DB inconsistency: block height ", ta.Height, ": not found in db") + // provide empty BlockInfo to return the rest of tx data + blockInfo = &db.BlockInfo{} + } } - if bi == nil { - glog.Warning("DB inconsistency: block height ", ta.Height, ": not found in db") - // provide empty BlockInfo to return the rest of tx data - bi = &db.BlockInfo{} - } - tx = w.txFromTxAddress(txid, ta, bi, bestheight) + tx = w.txFromTxAddress(txid, ta, blockInfo, bestheight) } else { tx, err = w.GetTransaction(txid, false, true) if err != nil { @@ -750,7 +755,7 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco if option == AccountDetailsTxidHistory { txids = append(txids, txid) } else { - tx, err := w.txFromTxid(txid, bestheight, option) + tx, err := w.txFromTxid(txid, bestheight, option, nil) if err != nil { return nil, err } @@ -991,22 +996,9 @@ func (w *Worker) GetBlock(bid string, page int, txsOnPage int) (*Block, error) { txs := make([]*Tx, to-from) txi := 0 for i := from; i < to; i++ { - txid := bi.Txids[i] - if w.chainType == bchain.ChainBitcoinType { - ta, err := w.db.GetTxAddresses(txid) - if err != nil { - return nil, errors.Annotatef(err, "GetTxAddresses %v", txid) - } - if ta == nil { - glog.Warning("DB inconsistency: tx ", txid, ": not found in txAddresses") - continue - } - txs[txi] = w.txFromTxAddress(txid, ta, dbi, bestheight) - } else { - txs[txi], err = w.GetTransaction(txid, false, false) - if err != nil { - return nil, err - } + txs[txi], err = w.txFromTxid(bi.Txids[i], bestheight, AccountDetailsTxHistoryLight, dbi) + if err != nil { + return nil, err } txi++ } @@ -1041,6 +1033,66 @@ func (w *Worker) GetBlock(bid string, page int, txsOnPage int) (*Block, error) { }, nil } +// ComputeFeeStats computes fee distribution in defined blocks and logs them to log +func (w *Worker) ComputeFeeStats(blockFrom, blockTo int, stopCompute chan os.Signal) error { + bestheight, _, err := w.db.GetBestBlock() + if err != nil { + return errors.Annotatef(err, "GetBestBlock") + } + for block := blockFrom; block <= blockTo; block++ { + hash, err := w.db.GetBlockHash(uint32(block)) + if err != nil { + return err + } + bi, err := w.chain.GetBlockInfo(hash) + if err != nil { + return err + } + // process only blocks with enough transactions + if len(bi.Txids) > 20 { + dbi := &db.BlockInfo{ + Hash: bi.Hash, + Height: bi.Height, + Time: bi.Time, + } + txids := bi.Txids + if w.chainType == bchain.ChainBitcoinType { + // skip the coinbase transaction + txids = txids[1:] + } + fees := make([]int64, len(txids)) + sum := int64(0) + for i, txid := range txids { + select { + case <-stopCompute: + glog.Info("ComputeFeeStats interrupted at height ", block) + return db.ErrOperationInterrupted + default: + tx, err := w.txFromTxid(txid, bestheight, AccountDetailsTxHistoryLight, dbi) + if err != nil { + return err + } + fee := tx.FeesSat.AsInt64() + fees[i] = fee + sum += fee + } + } + sort.Slice(fees, func(i, j int) bool { return fees[i] < fees[j] }) + step := float64(len(fees)) / 10 + percentils := "" + for i := float64(0); i < float64(len(fees)+1); i += step { + ii := int(math.Round(i)) + if ii >= len(fees) { + ii = len(fees) - 1 + } + percentils += "," + strconv.FormatInt(fees[ii], 10) + } + glog.Info(block, ",", time.Unix(bi.Time, 0).Format(time.RFC3339), ",", len(bi.Txids), ",", sum, ",", float64(sum)/float64(len(bi.Txids)), percentils) + } + } + return nil +} + // GetSystemInfo returns information about system func (w *Worker) GetSystemInfo(internal bool) (*SystemInfo, error) { start := time.Now() diff --git a/api/xpub.go b/api/xpub.go index 664a12ff..80f41085 100644 --- a/api/xpub.go +++ b/api/xpub.go @@ -495,7 +495,7 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc if option == AccountDetailsTxidHistory { txids = append(txids, xpubTxid.txid) } else { - tx, err := w.txFromTxid(xpubTxid.txid, bestheight, option) + tx, err := w.txFromTxid(xpubTxid.txid, bestheight, option, nil) if err != nil { return nil, err } diff --git a/blockbook.go b/blockbook.go index da8a067a..fd2ebf55 100644 --- a/blockbook.go +++ b/blockbook.go @@ -68,8 +68,9 @@ var ( noTxCache = flag.Bool("notxcache", false, "disable tx cache") - computeColumnStats = flag.Bool("computedbstats", false, "compute column stats and exit") - dbStatsPeriodHours = flag.Int("dbstatsperiod", 24, "period of db stats collection in hours, 0 disables stats collection") + computeColumnStats = flag.Bool("computedbstats", false, "compute column stats and exit") + computeFeeStatsFlag = flag.Bool("computedfeestats", false, "compute fee stats for blocks in blockheight-blockuntil range and exit") + dbStatsPeriodHours = flag.Int("dbstatsperiod", 24, "period of db stats collection in hours, 0 disables stats collection") // resync index at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ) resyncIndexPeriodMs = flag.Int("resyncindexperiod", 935093, "resync index period in milliseconds") @@ -179,6 +180,16 @@ func mainWithExitCode() int { glog.Warning("internalState: database was left in open state, possibly previous ungraceful shutdown") } + if *computeFeeStatsFlag { + internalState.DbState = common.DbStateOpen + err = computeFeeStats(chanOsSignal, *blockFrom, *blockUntil, index, chain, txCache, internalState, metrics) + if err != nil && err != db.ErrOperationInterrupted { + glog.Error("computeFeeStats: ", err) + return exitCodeFatal + } + return exitCodeOK + } + if *computeColumnStats { internalState.DbState = common.DbStateOpen err = index.ComputeInternalStateColumnStats(chanOsSignal) @@ -244,7 +255,7 @@ func mainWithExitCode() int { internalState.SyncMode = true internalState.InitialSync = true if err := syncWorker.ResyncIndex(nil, true); err != nil { - if err != db.ErrSyncInterrupted { + if err != db.ErrOperationInterrupted { glog.Error("resyncIndex ", err) return exitCodeFatal } @@ -288,7 +299,7 @@ func mainWithExitCode() int { if !*synchronize { if err = syncWorker.ConnectBlocksParallel(height, until); err != nil { - if err != db.ErrSyncInterrupted { + if err != db.ErrOperationInterrupted { glog.Error("connectBlocksParallel ", err) return exitCodeFatal } @@ -613,3 +624,16 @@ func normalizeName(s string) string { s = strings.Replace(s, " ", "-", -1) return s } + +// computeFeeStats computes fee distribution in defined blocks +func computeFeeStats(stopCompute chan os.Signal, blockFrom, blockTo int, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error { + start := time.Now() + glog.Info("computeFeeStats start") + api, err := api.NewWorker(db, chain, mempool, txCache, is) + if err != nil { + return err + } + err = api.ComputeFeeStats(blockFrom, blockTo, stopCompute) + glog.Info("computeFeeStats finished in ", time.Since(start)) + return err +} diff --git a/db/sync.go b/db/sync.go index 7d2a2d9f..c5415c72 100644 --- a/db/sync.go +++ b/db/sync.go @@ -45,8 +45,8 @@ func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk var errSynced = errors.New("synced") -// ErrSyncInterrupted is returned when synchronization is interrupted by OS signal -var ErrSyncInterrupted = errors.New("ErrSyncInterrupted") +// ErrOperationInterrupted is returned when operation is interrupted by OS signal +var ErrOperationInterrupted = errors.New("ErrOperationInterrupted") // ResyncIndex synchronizes index to the top of the blockchain // onNewBlock is called when new block is connected, but not in initial parallel sync @@ -206,7 +206,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync select { case <-w.chanOsSignal: glog.Info("connectBlocks interrupted at height ", lastRes.block.Height) - return ErrSyncInterrupted + return ErrOperationInterrupted case res := <-bch: if res == empty { break ConnectLoop @@ -330,7 +330,7 @@ ConnectLoop: select { case <-w.chanOsSignal: glog.Info("connectBlocksParallel interrupted at height ", h) - err = ErrSyncInterrupted + err = ErrOperationInterrupted // signal all workers to terminate their loops (error loops are interrupted below) close(terminating) break ConnectLoop diff --git a/tests/sync/connectblocks.go b/tests/sync/connectblocks.go index 0dffdf20..1785dc1a 100644 --- a/tests/sync/connectblocks.go +++ b/tests/sync/connectblocks.go @@ -25,7 +25,7 @@ func testConnectBlocks(t *testing.T, h *TestHandler) { close(ch) } }, true) - if err != nil && err != db.ErrSyncInterrupted { + if err != nil && err != db.ErrOperationInterrupted { t.Fatal(err) }