Improve performance of utxo indexing

indexv5
Martin Boehm 2019-05-12 02:19:51 +02:00
parent 31fb59b1a5
commit 6b16814d0b
4 changed files with 113 additions and 51 deletions

View File

@ -685,7 +685,7 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco
nonce = strconv.Itoa(int(n))
} else {
// ba can be nil if the address is only in mempool!
ba, err = w.db.GetAddrDescBalance(addrDesc)
ba, err = w.db.GetAddrDescBalance(addrDesc, db.AddressBalanceDetailNoUTXO)
if err != nil {
return nil, NewAPIError(fmt.Sprintf("Address not found, %v", err), true)
}
@ -839,7 +839,7 @@ func (w *Worker) getAddrDescUtxo(addrDesc bchain.AddressDescriptor, ba *db.AddrB
if !onlyMempool {
// get utxo from index
if ba == nil {
ba, err = w.db.GetAddrDescBalance(addrDesc)
ba, err = w.db.GetAddrDescBalance(addrDesc, db.AddressBalanceDetailUTXO)
if err != nil {
return nil, NewAPIError(fmt.Sprintf("Address not found, %v", err), true)
}

View File

@ -160,7 +160,7 @@ func (w *Worker) xpubCheckAndLoadTxids(ad *xpubAddress, filter *AddressFilter, m
func (w *Worker) xpubDerivedAddressBalance(data *xpubData, ad *xpubAddress) (bool, error) {
var err error
if ad.balance, err = w.db.GetAddrDescBalance(ad.addrDesc); err != nil {
if ad.balance, err = w.db.GetAddrDescBalance(ad.addrDesc, db.AddressBalanceDetailUTXO); err != nil {
return false, err
}
if ad.balance != nil {

View File

@ -44,6 +44,18 @@ type connectBlockStats struct {
balancesMiss int
}
// AddressBalanceDetail specifies what data are returned by GetAddressBalance
type AddressBalanceDetail int
const (
// AddressBalanceDetailNoUTXO returns address balance without utxos
AddressBalanceDetailNoUTXO = 0
// AddressBalanceDetailUTXO returns address balance with utxos
AddressBalanceDetailUTXO = 1
// addressBalanceDetailUTXOIndexed returns address balance with utxos and index for updates, used only internally
addressBalanceDetailUTXOIndexed = 2
)
// RocksDB handle
type RocksDB struct {
path string
@ -401,6 +413,7 @@ type AddrBalance struct {
SentSat big.Int
BalanceSat big.Int
Utxos []Utxo
utxosMap map[string]int
}
// ReceivedSat computes received amount from total balance and sent amount
@ -410,6 +423,61 @@ func (ab *AddrBalance) ReceivedSat() *big.Int {
return &r
}
// addUtxo
func (ab *AddrBalance) addUtxo(u *Utxo) {
ab.Utxos = append(ab.Utxos, *u)
l := len(ab.Utxos)
if l >= 16 {
if len(ab.utxosMap) == 0 {
ab.utxosMap = make(map[string]int, 32)
for i := 0; i < l; i++ {
s := string(ab.Utxos[i].BtxID)
if _, e := ab.utxosMap[s]; !e {
ab.utxosMap[s] = i
}
}
} else {
s := string(u.BtxID)
if _, e := ab.utxosMap[s]; !e {
ab.utxosMap[s] = l - 1
}
}
}
}
// markUtxoAsSpent finds outpoint btxID:vout in utxos and marks it as spent
// for small number of utxos the linear search is done, for larger number there is a hashmap index
// it is much faster than removing the utxo from the slice as it would cause in memory copy operations
func (ab *AddrBalance) markUtxoAsSpent(btxID []byte, vout int32) {
if len(ab.utxosMap) == 0 {
for i := range ab.Utxos {
utxo := &ab.Utxos[i]
if utxo.Vout == vout && *(*int)(unsafe.Pointer(&utxo.BtxID[0])) == *(*int)(unsafe.Pointer(&btxID[0])) && bytes.Equal(utxo.BtxID, btxID) {
// mark utxo as spent by setting vout=-1
utxo.Vout = -1
return
}
}
} else {
if i, e := ab.utxosMap[string(btxID)]; e {
l := len(ab.Utxos)
for ; i < l; i++ {
utxo := &ab.Utxos[i]
if utxo.Vout == vout {
if bytes.Equal(utxo.BtxID, btxID) {
// mark utxo as spent by setting vout=-1
utxo.Vout = -1
return
} else {
break
}
}
}
}
}
glog.Errorf("Utxo %s:%d not found, using in map %v", hex.EncodeToString(btxID), vout, len(ab.utxosMap) != 0)
}
type blockTxs struct {
btxID []byte
inputs []outpoint
@ -467,7 +535,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
strAddrDesc := string(addrDesc)
balance, e := balances[strAddrDesc]
if !e {
balance, err = d.GetAddrDescBalance(addrDesc)
balance, err = d.GetAddrDescBalance(addrDesc, addressBalanceDetailUTXOIndexed)
if err != nil {
return err
}
@ -480,7 +548,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
d.cbs.balancesHit++
}
balance.BalanceSat.Add(&balance.BalanceSat, &output.ValueSat)
balance.Utxos = append(balance.Utxos, Utxo{
balance.addUtxo(&Utxo{
BtxID: btxID,
Vout: int32(i),
Height: block.Height,
@ -550,7 +618,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
strAddrDesc := string(spentOutput.AddrDesc)
balance, e := balances[strAddrDesc]
if !e {
balance, err = d.GetAddrDescBalance(spentOutput.AddrDesc)
balance, err = d.GetAddrDescBalance(spentOutput.AddrDesc, addressBalanceDetailUTXOIndexed)
if err != nil {
return err
}
@ -567,7 +635,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
balance.Txs++
}
balance.BalanceSat.Sub(&balance.BalanceSat, &spentOutput.ValueSat)
markUtxoAsSpent(balance.Utxos, btxID, int32(input.Vout), spentOutput.AddrDesc)
balance.markUtxoAsSpent(btxID, int32(input.Vout))
if balance.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&balance.BalanceSat, spentOutput.AddrDesc, "balance")
}
@ -578,20 +646,6 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
return nil
}
// markUtxoAsSpent finds outpoint btxID:vout in utxos and marks it as spent
// it is much faster than removing the utxo from the slice as it would cause in many memory copy operations
func markUtxoAsSpent(utxos []Utxo, btxID []byte, vout int32, addrDesc bchain.AddressDescriptor) {
for i := range utxos {
utxo := &utxos[i]
if utxo.Vout == vout && *(*int)(unsafe.Pointer(&utxo.BtxID[0])) == *(*int)(unsafe.Pointer(&btxID[0])) && bytes.Equal(utxo.BtxID, btxID) {
// mark utxo as spent by setting vout=-1
utxo.Vout = -1
return
}
}
glog.Errorf("Utxo %s:%d not found in addrDesc %s", hex.EncodeToString(btxID), vout, addrDesc)
}
// addToAddressesMap maintains mapping between addresses and transactions in one block
// the method assumes that outpus in the block are processed before the inputs
// the return value is true if the tx was processed before, to not to count the tx multiple times
@ -739,7 +793,7 @@ func (d *RocksDB) getBlockTxs(height uint32) ([]blockTxs, error) {
}
// GetAddrDescBalance returns AddrBalance for given addrDesc
func (d *RocksDB) GetAddrDescBalance(addrDesc bchain.AddressDescriptor) (*AddrBalance, error) {
func (d *RocksDB) GetAddrDescBalance(addrDesc bchain.AddressDescriptor, detail AddressBalanceDetail) (*AddrBalance, error) {
val, err := d.db.GetCF(d.ro, d.cfh[cfAddressBalance], addrDesc)
if err != nil {
return nil, err
@ -750,16 +804,16 @@ func (d *RocksDB) GetAddrDescBalance(addrDesc bchain.AddressDescriptor) (*AddrBa
if len(buf) < 3 {
return nil, nil
}
return unpackAddrBalance(buf, d.chainParser.PackedTxidLen())
return unpackAddrBalance(buf, d.chainParser.PackedTxidLen(), detail)
}
// GetAddressBalance returns address balance for an address or nil if address not found
func (d *RocksDB) GetAddressBalance(address string) (*AddrBalance, error) {
func (d *RocksDB) GetAddressBalance(address string, detail AddressBalanceDetail) (*AddrBalance, error) {
addrDesc, err := d.chainParser.GetAddrDescFromAddress(address)
if err != nil {
return nil, err
}
return d.GetAddrDescBalance(addrDesc)
return d.GetAddrDescBalance(addrDesc, detail)
}
func (d *RocksDB) getTxAddresses(btxID []byte) (*TxAddresses, error) {
@ -844,35 +898,43 @@ func appendTxOutput(txo *TxOutput, buf []byte, varBuf []byte) []byte {
return buf
}
func unpackAddrBalance(buf []byte, txidUnpackedLen int) (*AddrBalance, error) {
func unpackAddrBalance(buf []byte, txidUnpackedLen int, detail AddressBalanceDetail) (*AddrBalance, error) {
txs, l := unpackVaruint(buf)
sentSat, sl := unpackBigint(buf[l:])
balanceSat, bl := unpackBigint(buf[l+sl:])
l = l + sl + bl
// estimate the size of utxos to avoid reallocation
utxos := make([]Utxo, 0, len(buf[l:])/txidUnpackedLen+3)
for len(buf[l:]) >= txidUnpackedLen+3 {
btxID := append([]byte(nil), buf[l:l+txidUnpackedLen]...)
l += txidUnpackedLen
vout, ll := unpackVaruint(buf[l:])
l += ll
height, ll := unpackVaruint(buf[l:])
l += ll
valueSat, ll := unpackBigint(buf[l:])
l += ll
utxos = append(utxos, Utxo{
BtxID: btxID,
Vout: int32(vout),
Height: uint32(height),
ValueSat: valueSat,
})
}
return &AddrBalance{
ab := &AddrBalance{
Txs: uint32(txs),
SentSat: sentSat,
BalanceSat: balanceSat,
Utxos: utxos,
}, nil
}
if detail != AddressBalanceDetailNoUTXO {
// estimate the size of utxos to avoid reallocation
ab.Utxos = make([]Utxo, 0, len(buf[l:])/txidUnpackedLen+3)
// ab.utxosMap = make(map[string]int, cap(ab.Utxos))
for len(buf[l:]) >= txidUnpackedLen+3 {
btxID := append([]byte(nil), buf[l:l+txidUnpackedLen]...)
l += txidUnpackedLen
vout, ll := unpackVaruint(buf[l:])
l += ll
height, ll := unpackVaruint(buf[l:])
l += ll
valueSat, ll := unpackBigint(buf[l:])
l += ll
u := Utxo{
BtxID: btxID,
Vout: int32(vout),
Height: uint32(height),
ValueSat: valueSat,
}
if detail == AddressBalanceDetailUTXO {
ab.Utxos = append(ab.Utxos, u)
} else {
ab.addUtxo(&u)
}
}
}
return ab, nil
}
func packAddrBalance(ab *AddrBalance, buf, varBuf []byte) []byte {
@ -1122,7 +1184,7 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32,
s := string(addrDesc)
b, fb := balances[s]
if !fb {
b, err = d.GetAddrDescBalance(addrDesc)
b, err = d.GetAddrDescBalance(addrDesc, addressBalanceDetailUTXOIndexed)
if err != nil {
return nil, err
}
@ -1203,7 +1265,7 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32,
if balance.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&balance.BalanceSat, t.AddrDesc, "balance")
}
markUtxoAsSpent(balance.Utxos, btxID, int32(i), t.AddrDesc)
balance.markUtxoAsSpent(btxID, int32(i))
} else {
ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc)
glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc)

View File

@ -656,7 +656,7 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) {
verifyAfterBitcoinTypeBlock2(t, d)
// test public methods for address balance and tx addresses
ab, err := d.GetAddressBalance(dbtestdata.Addr5)
ab, err := d.GetAddressBalance(dbtestdata.Addr5, AddressBalanceDetailUTXO)
if err != nil {
t.Fatal(err)
}
@ -1060,7 +1060,7 @@ func Test_packAddrBalance_unpackAddrBalance(t *testing.T) {
if !reflect.DeepEqual(hex, tt.hex) {
t.Errorf("packTxAddresses() = %v, want %v", hex, tt.hex)
}
got1, err := unpackAddrBalance(b, parser.PackedTxidLen())
got1, err := unpackAddrBalance(b, parser.PackedTxidLen(), AddressBalanceDetailUTXO)
if err != nil {
t.Errorf("unpackTxAddresses() error = %v", err)
return