Add filter from-to to balance history

balanceHistory
Martin Boehm 2019-11-19 08:46:47 +01:00
parent da714b5299
commit bf3d822b87
10 changed files with 188 additions and 36 deletions

View File

@ -302,7 +302,6 @@ type BalanceHistory struct {
Txs int `json:"txs"`
ReceivedSat *Amount `json:"received"`
SentSat *Amount `json:"sent"`
BalanceSat *Amount `json:"balance"`
}
// Blocks is list of blocks with paging information

View File

@ -802,17 +802,26 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco
}
// GetBalanceHistory returns history of balance for given address
func (w *Worker) GetBalanceHistory(address string) ([]BalanceHistory, error) {
var bh []BalanceHistory
var b BalanceHistory
var bi *db.BlockInfo
var balance big.Int
func (w *Worker) GetBalanceHistory(address string, fromTime, toTime time.Time) ([]BalanceHistory, error) {
var bhs []BalanceHistory
bh := BalanceHistory{
SentSat: &Amount{},
ReceivedSat: &Amount{},
}
start := time.Now()
addrDesc, _, err := w.getAddrDescAndNormalizeAddress(address)
if err != nil {
return nil, err
}
txs, err := w.getAddressTxids(addrDesc, false, &AddressFilter{Vout: AddressFilterVoutOff}, maxInt)
fromHeight := uint32(0)
toHeight := maxInt
if !fromTime.IsZero() {
fromHeight = w.is.GetBlockHeightOfTime(uint32(fromTime.Unix()))
}
if !toTime.IsZero() {
toHeight = int(w.is.GetBlockHeightOfTime(uint32(toTime.Unix())))
}
txs, err := w.getAddressTxids(addrDesc, false, &AddressFilter{Vout: AddressFilterVoutOff, FromHeight: fromHeight}, toHeight)
if err != nil {
return nil, err
}
@ -826,19 +835,13 @@ func (w *Worker) GetBalanceHistory(address string) ([]BalanceHistory, error) {
continue
}
counted := false
if bi == nil || bi.Height != ta.Height {
bi, err = w.db.GetBlockInfo(ta.Height)
if err != nil {
return nil, err
time := int64(w.is.GetBlockTime(ta.Height))
hour := time - time%3600
if bh.Time != hour {
if bh.Txs != 0 {
bhs = append(bhs, bh)
}
}
hour := bi.Time - bi.Time%3600
if b.Time != hour {
if b.Txs != 0 {
b.BalanceSat = (*Amount)(new(big.Int).Set(&balance))
bh = append(bh, b)
}
b = BalanceHistory{
bh = BalanceHistory{
Time: hour,
SentSat: &Amount{},
ReceivedSat: &Amount{},
@ -847,32 +850,29 @@ func (w *Worker) GetBalanceHistory(address string) ([]BalanceHistory, error) {
for i := range ta.Inputs {
tai := &ta.Inputs[i]
if bytes.Equal(addrDesc, tai.AddrDesc) {
(*big.Int)(b.SentSat).Add((*big.Int)(b.SentSat), &tai.ValueSat)
balance.Sub(&balance, &tai.ValueSat)
(*big.Int)(bh.SentSat).Add((*big.Int)(bh.SentSat), &tai.ValueSat)
if !counted {
counted = true
b.Txs++
bh.Txs++
}
}
}
for i := range ta.Outputs {
tao := &ta.Outputs[i]
if bytes.Equal(addrDesc, tao.AddrDesc) {
(*big.Int)(b.ReceivedSat).Add((*big.Int)(b.ReceivedSat), &tao.ValueSat)
balance.Add(&balance, &tao.ValueSat)
(*big.Int)(bh.ReceivedSat).Add((*big.Int)(bh.ReceivedSat), &tao.ValueSat)
if !counted {
counted = true
b.Txs++
bh.Txs++
}
}
}
}
if b.Txs != 0 {
b.BalanceSat = (*Amount)(&balance)
bh = append(bh, b)
if bh.Txs != 0 {
bhs = append(bhs, bh)
}
glog.Info("GetBalanceHistory ", address, ", count ", len(bh), " finished in ", time.Since(start))
return bh, nil
glog.Info("GetBalanceHistory ", address, ", count ", len(bhs), " finished in ", time.Since(start))
return bhs, nil
}
func (w *Worker) waitForBackendSync() {

View File

@ -2,6 +2,7 @@ package common
import (
"encoding/json"
"sort"
"sync"
"time"
)
@ -45,6 +46,7 @@ type InternalState struct {
IsSynchronized bool `json:"isSynchronized"`
BestHeight uint32 `json:"bestHeight"`
LastSync time.Time `json:"lastSync"`
BlockTimes []uint32 `json:"-"`
IsMempoolSynchronized bool `json:"isMempoolSynchronized"`
MempoolSize int `json:"mempoolSize"`
@ -164,6 +166,54 @@ func (is *InternalState) DBSizeTotal() int64 {
return total
}
// GetBlockTime returns block time if block found or 0
func (is *InternalState) GetBlockTime(height uint32) uint32 {
is.mux.Lock()
defer is.mux.Unlock()
if int(height) < len(is.BlockTimes) {
return is.BlockTimes[height]
}
return 0
}
// AppendBlockTime appends block time to BlockTimes
func (is *InternalState) AppendBlockTime(time uint32) {
is.mux.Lock()
defer is.mux.Unlock()
is.BlockTimes = append(is.BlockTimes, time)
}
// RemoveLastBlockTimes removes last times from BlockTimes
func (is *InternalState) RemoveLastBlockTimes(count int) {
is.mux.Lock()
defer is.mux.Unlock()
if len(is.BlockTimes) < count {
count = len(is.BlockTimes)
}
is.BlockTimes = is.BlockTimes[:len(is.BlockTimes)-count]
}
// GetBlockHeightOfTime returns block height of the first block with time greater or equal to the given time or MaxUint32 if no such block
func (is *InternalState) GetBlockHeightOfTime(time uint32) uint32 {
is.mux.Lock()
defer is.mux.Unlock()
height := sort.Search(len(is.BlockTimes), func(i int) bool { return time <= is.BlockTimes[i] })
if height == len(is.BlockTimes) {
return ^uint32(0)
}
// as the block times can sometimes be out of order try 20 blocks lower to locate a block with the time greater or equal to the given time
max, height := height, height-20
if height < 0 {
height = 0
}
for ; height <= max; height++ {
if time <= is.BlockTimes[height] {
break
}
}
return uint32(height)
}
// Pack marshals internal state to json
func (is *InternalState) Pack() ([]byte, error) {
is.mux.Lock()

View File

@ -381,6 +381,12 @@ func (b *BulkConnect) Close() error {
return err
}
}
var err error
b.d.is.BlockTimes, err = b.d.loadBlockTimes()
if err != nil {
return err
}
if err := b.d.SetInconsistentState(false); err != nil {
return err
}

View File

@ -348,8 +348,11 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error {
if err := d.storeAddresses(wb, block.Height, addresses); err != nil {
return err
}
return d.db.Write(d.wo, wb)
if err := d.db.Write(d.wo, wb); err != nil {
return err
}
d.is.AppendBlockTime(uint32(block.Time))
return nil
}
// Addresses index
@ -1334,6 +1337,7 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e
}
err := d.db.Write(d.wo, wb)
if err == nil {
d.is.RemoveLastBlockTimes(int(higher-lower) + 1)
glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher)
}
return err
@ -1448,6 +1452,32 @@ func (d *RocksDB) internalDeleteTx(wb *gorocksdb.WriteBatch, key []byte) {
// internal state
const internalStateKey = "internalState"
func (d *RocksDB) loadBlockTimes() ([]uint32, error) {
var times []uint32
it := d.db.NewIteratorCF(d.ro, d.cfh[cfHeight])
defer it.Close()
counter := uint32(0)
time := uint32(0)
for it.SeekToFirst(); it.Valid(); it.Next() {
height := unpackUint(it.Key().Data())
if height > counter {
glog.Warning("gap in cfHeight: expecting ", counter, ", got ", height)
for ; counter < height; counter++ {
times = append(times, time)
}
}
counter++
info, err := d.unpackBlockInfo(it.Value().Data())
if err != nil {
return nil, err
}
time = uint32(info.Time)
times = append(times, time)
}
glog.Info("loaded ", len(times), " block times")
return times, nil
}
// LoadInternalState loads from db internal state or initializes a new one if not yet stored
func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, error) {
val, err := d.db.GetCF(d.ro, d.cfh[cfDefault], []byte(internalStateKey))
@ -1493,6 +1523,10 @@ func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, erro
}
}
is.DbColumns = nc
is.BlockTimes, err = d.loadBlockTimes()
if err != nil {
return nil, err
}
// after load, reset the synchronization data
is.IsSynchronized = false
is.IsMempoolSynchronized = false

View File

@ -455,6 +455,7 @@ func (d *RocksDB) DisconnectBlockRangeEthereumType(lower uint32, higher uint32)
d.storeAddressContracts(wb, contracts)
err := d.db.Write(d.wo, wb)
if err == nil {
d.is.RemoveLastBlockTimes(int(higher-lower) + 1)
glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher)
}
return err

View File

@ -167,6 +167,10 @@ func TestRocksDB_Index_EthereumType(t *testing.T) {
})
defer closeAndDestroyRocksDB(t, d)
if len(d.is.BlockTimes) != 0 {
t.Fatal("Expecting is.BlockTimes 0, got ", len(d.is.BlockTimes))
}
// connect 1st block
block1 := dbtestdata.GetTestEthereumTypeBlock1(d.chainParser)
if err := d.ConnectBlock(block1); err != nil {
@ -174,6 +178,10 @@ func TestRocksDB_Index_EthereumType(t *testing.T) {
}
verifyAfterEthereumTypeBlock1(t, d, false)
if len(d.is.BlockTimes) != 1 {
t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes))
}
// connect 2nd block
block2 := dbtestdata.GetTestEthereumTypeBlock2(d.chainParser)
if err := d.ConnectBlock(block2); err != nil {
@ -181,6 +189,10 @@ func TestRocksDB_Index_EthereumType(t *testing.T) {
}
verifyAfterEthereumTypeBlock2(t, d)
if len(d.is.BlockTimes) != 2 {
t.Fatal("Expecting is.BlockTimes 2, got ", len(d.is.BlockTimes))
}
// get transactions for various addresses / low-high ranges
verifyGetTransactions(t, d, "0x"+dbtestdata.EthAddr55, 0, 10000000, []txidIndex{
{"0x" + dbtestdata.EthTxidB2T2, ^2},
@ -275,10 +287,18 @@ func TestRocksDB_Index_EthereumType(t *testing.T) {
}
}
if len(d.is.BlockTimes) != 1 {
t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes))
}
// connect block again and verify the state of db
if err := d.ConnectBlock(block2); err != nil {
t.Fatal(err)
}
verifyAfterEthereumTypeBlock2(t, d)
if len(d.is.BlockTimes) != 2 {
t.Fatal("Expecting is.BlockTimes 2, got ", len(d.is.BlockTimes))
}
}

View File

@ -532,6 +532,10 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) {
})
defer closeAndDestroyRocksDB(t, d)
if len(d.is.BlockTimes) != 0 {
t.Fatal("Expecting is.BlockTimes 0, got ", len(d.is.BlockTimes))
}
// connect 1st block - will log warnings about missing UTXO transactions in txAddresses column
block1 := dbtestdata.GetTestBitcoinTypeBlock1(d.chainParser)
if err := d.ConnectBlock(block1); err != nil {
@ -539,6 +543,10 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) {
}
verifyAfterBitcoinTypeBlock1(t, d, false)
if len(d.is.BlockTimes) != 1 {
t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes))
}
// connect 2nd block - use some outputs from the 1st block as the inputs and 1 input uses tx from the same block
block2 := dbtestdata.GetTestBitcoinTypeBlock2(d.chainParser)
if err := d.ConnectBlock(block2); err != nil {
@ -546,6 +554,10 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) {
}
verifyAfterBitcoinTypeBlock2(t, d)
if len(d.is.BlockTimes) != 2 {
t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes))
}
// get transactions for various addresses / low-high ranges
verifyGetTransactions(t, d, dbtestdata.Addr2, 0, 1000000, []txidIndex{
{dbtestdata.TxidB2T1, ^1},
@ -649,12 +661,20 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) {
}
}
if len(d.is.BlockTimes) != 1 {
t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes))
}
// connect block again and verify the state of db
if err := d.ConnectBlock(block2); err != nil {
t.Fatal(err)
}
verifyAfterBitcoinTypeBlock2(t, d)
if len(d.is.BlockTimes) != 2 {
t.Fatal("Expecting is.BlockTimes 1, got ", len(d.is.BlockTimes))
}
// test public methods for address balance and tx addresses
ab, err := d.GetAddressBalance(dbtestdata.Addr5, AddressBalanceDetailUTXO)
if err != nil {
@ -744,6 +764,10 @@ func Test_BulkConnect_BitcoinType(t *testing.T) {
t.Fatal("DB not in DbStateInconsistent")
}
if len(d.is.BlockTimes) != 0 {
t.Fatal("Expecting is.BlockTimes 0, got ", len(d.is.BlockTimes))
}
if err := bc.ConnectBlock(dbtestdata.GetTestBitcoinTypeBlock1(d.chainParser), false); err != nil {
t.Fatal(err)
}
@ -766,6 +790,10 @@ func Test_BulkConnect_BitcoinType(t *testing.T) {
}
verifyAfterBitcoinTypeBlock2(t, d)
if len(d.is.BlockTimes) != 225495 {
t.Fatal("Expecting is.BlockTimes 225495, got ", len(d.is.BlockTimes))
}
}
func Test_packBigint_unpackBigint(t *testing.T) {

View File

@ -1035,17 +1035,26 @@ func (s *PublicServer) apiUtxo(r *http.Request, apiVersion int) (interface{}, er
func (s *PublicServer) apiBalanceHistory(r *http.Request, apiVersion int) (interface{}, error) {
var history []api.BalanceHistory
var fromTime, toTime time.Time
var err error
if i := strings.LastIndexByte(r.URL.Path, '/'); i > 0 {
gap, ec := strconv.Atoi(r.URL.Query().Get("gap"))
if ec != nil {
gap = 0
}
t := r.URL.Query().Get("from")
if t != "" {
fromTime, _ = time.Parse("2006-02-01", t)
}
t = r.URL.Query().Get("to")
if t != "" {
toTime, _ = time.Parse("2006-02-01", t)
}
history, err = s.api.GetUtxoBalanceHistory(r.URL.Path[i+1:], gap)
if err == nil {
s.metrics.ExplorerViews.With(common.Labels{"action": "api-xpub-balancehistory"}).Inc()
} else {
history, err = s.api.GetBalanceHistory(r.URL.Path[i+1:])
history, err = s.api.GetBalanceHistory(r.URL.Path[i+1:], fromTime, toTime)
s.metrics.ExplorerViews.With(common.Labels{"action": "api-address-balancehistory"}).Inc()
}
}

View File

@ -50,8 +50,13 @@ func setupRocksDB(t *testing.T, parser bchain.BlockChainParser) (*db.RocksDB, *c
t.Fatal(err)
}
d.SetInternalState(is)
block1 := dbtestdata.GetTestBitcoinTypeBlock1(parser)
// setup internal state BlockTimes
for i := uint32(0); i < block1.Height; i++ {
is.BlockTimes = append(is.BlockTimes, 0)
}
// import data
if err := d.ConnectBlock(dbtestdata.GetTestBitcoinTypeBlock1(parser)); err != nil {
if err := d.ConnectBlock(block1); err != nil {
t.Fatal(err)
}
block2 := dbtestdata.GetTestBitcoinTypeBlock2(parser)
@ -610,7 +615,7 @@ func httpTestsBitcoinType(t *testing.T, ts *httptest.Server) {
status: http.StatusOK,
contentType: "application/json; charset=utf-8",
body: []string{
`[{"blockTime":1521514800,"txs":1,"received":"12345","sent":"0","balance":"12345"},{"blockTime":1521594000,"txs":1,"received":"0","sent":"12345","balance":"0"}]`,
`[{"blockTime":1521514800,"txs":1,"received":"12345","sent":"0"},{"blockTime":1521594000,"txs":1,"received":"0","sent":"12345"}]`,
},
},
{
@ -619,7 +624,7 @@ func httpTestsBitcoinType(t *testing.T, ts *httptest.Server) {
status: http.StatusOK,
contentType: "application/json; charset=utf-8",
body: []string{
`[{"blockTime":1521514800,"txs":1,"received":"9876","sent":"0","balance":"9876"},{"blockTime":1521594000,"txs":1,"received":"9000","sent":"9876","balance":"9000"}]`,
`[{"blockTime":1521514800,"txs":1,"received":"9876","sent":"0"},{"blockTime":1521594000,"txs":1,"received":"9000","sent":"9876"}]`,
},
},
{