Maintain utxo index on disconnect block

pull/187/head^2
Martin Boehm 2019-05-06 08:37:16 +02:00
parent 995d5c66b5
commit 733c966094
2 changed files with 114 additions and 59 deletions

View File

@ -10,8 +10,10 @@ import (
"math/big"
"os"
"path/filepath"
"sort"
"strconv"
"time"
"unsafe"
vlq "github.com/bsm/go-vlq"
"github.com/golang/glog"
@ -565,7 +567,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
balance.Txs++
}
balance.BalanceSat.Sub(&balance.BalanceSat, &spentOutput.ValueSat)
balance.Utxos = removeUtxo(balance.Utxos, btxID, int32(input.Vout), spentOutput.AddrDesc)
markUtxoAsSpent(balance.Utxos, btxID, int32(input.Vout), spentOutput.AddrDesc)
if balance.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&balance.BalanceSat, spentOutput.AddrDesc, "balance")
}
@ -576,14 +578,18 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add
return nil
}
func removeUtxo(utxos []Utxo, btxID []byte, vout int32, addrDesc bchain.AddressDescriptor) []Utxo {
for i, utxo := range utxos {
if utxo.Vout == vout && bytes.Compare(utxo.BtxID, btxID) == 0 {
return append(utxos[:i], utxos[i+1:]...)
// markUtxoAsSpent finds outpoint btxID:vout in utxos and marks it as spent
// it is much faster than removing the utxo from the slice as it would cause in many memory copy operations
func markUtxoAsSpent(utxos []Utxo, btxID []byte, vout int32, addrDesc bchain.AddressDescriptor) {
for i := range utxos {
utxo := &utxos[i]
if utxo.Vout == vout && *(*int)(unsafe.Pointer(&utxo.BtxID[0])) == *(*int)(unsafe.Pointer(&btxID[0])) && bytes.Equal(utxo.BtxID, btxID) {
// mark utxo as spent by setting vout=-1
utxo.Vout = -1
return
}
}
glog.Errorf("Utxo %s:%d not found in addrDesc %s", hex.EncodeToString(btxID), vout, addrDesc)
return utxos
}
// addToAddressesMap maintains mapping between addresses and transactions in one block
@ -843,7 +849,8 @@ func unpackAddrBalance(buf []byte, txidUnpackedLen int) (*AddrBalance, error) {
sentSat, sl := unpackBigint(buf[l:])
balanceSat, bl := unpackBigint(buf[l+sl:])
l = l + sl + bl
utxos := make([]Utxo, 0)
// estimate the size of utxos to avoid reallocation
utxos := make([]Utxo, 0, len(buf[l:])/txidUnpackedLen+3)
for len(buf[l:]) >= txidUnpackedLen+3 {
btxID := append([]byte(nil), buf[l:l+txidUnpackedLen]...)
l += txidUnpackedLen
@ -877,13 +884,16 @@ func packAddrBalance(ab *AddrBalance, buf, varBuf []byte) []byte {
l = packBigint(&ab.BalanceSat, varBuf)
buf = append(buf, varBuf[:l]...)
for _, utxo := range ab.Utxos {
buf = append(buf, utxo.BtxID...)
l = packVaruint(uint(utxo.Vout), varBuf)
buf = append(buf, varBuf[:l]...)
l = packVaruint(uint(utxo.Height), varBuf)
buf = append(buf, varBuf[:l]...)
l = packBigint(&utxo.ValueSat, varBuf)
buf = append(buf, varBuf[:l]...)
// if Vout < 0, utxo is marked as spent
if utxo.Vout >= 0 {
buf = append(buf, utxo.BtxID...)
l = packVaruint(uint(utxo.Vout), varBuf)
buf = append(buf, varBuf[:l]...)
l = packVaruint(uint(utxo.Height), varBuf)
buf = append(buf, varBuf[:l]...)
l = packBigint(&utxo.ValueSat, varBuf)
buf = append(buf, varBuf[:l]...)
}
}
return buf
}
@ -1102,8 +1112,10 @@ func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, height uint32, bi *Block
// Disconnect blocks
func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, txid string, inputs []outpoint, txa *TxAddresses,
func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, btxID []byte, inputs []outpoint, txa *TxAddresses,
txAddressesToUpdate map[string]*TxAddresses, balances map[string]*AddrBalance) error {
var err error
var balance *AddrBalance
addresses := make(map[string]struct{})
getAddressBalance := func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error) {
var err error
@ -1120,33 +1132,16 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32,
}
for i, t := range txa.Inputs {
if len(t.AddrDesc) > 0 {
input := &inputs[i]
s := string(t.AddrDesc)
_, exist := addresses[s]
if !exist {
addresses[s] = struct{}{}
}
b, err := getAddressBalance(t.AddrDesc)
if err != nil {
return err
}
if b != nil {
// subtract number of txs only once
if !exist {
b.Txs--
}
b.SentSat.Sub(&b.SentSat, &t.ValueSat)
if b.SentSat.Sign() < 0 {
d.resetValueSatToZero(&b.SentSat, t.AddrDesc, "sent amount")
}
b.BalanceSat.Add(&b.BalanceSat, &t.ValueSat)
} else {
ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc)
glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc)
}
s = string(inputs[i].btxID)
sa, exist := txAddressesToUpdate[s]
if !exist {
sa, err = d.getTxAddresses(inputs[i].btxID)
s = string(input.btxID)
sa, found := txAddressesToUpdate[s]
if !found {
sa, err = d.getTxAddresses(input.btxID)
if err != nil {
return err
}
@ -1154,34 +1149,65 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32,
txAddressesToUpdate[s] = sa
}
}
var inputHeight uint32
if sa != nil {
sa.Outputs[inputs[i].index].Spent = false
sa.Outputs[input.index].Spent = false
inputHeight = sa.Height
}
if d.chainParser.IsAddrDescIndexable(t.AddrDesc) {
balance, err = getAddressBalance(t.AddrDesc)
if err != nil {
return err
}
if balance != nil {
// subtract number of txs only once
if !exist {
balance.Txs--
}
balance.SentSat.Sub(&balance.SentSat, &t.ValueSat)
if balance.SentSat.Sign() < 0 {
d.resetValueSatToZero(&balance.SentSat, t.AddrDesc, "sent amount")
}
balance.BalanceSat.Add(&balance.BalanceSat, &t.ValueSat)
balance.Utxos = append(balance.Utxos, Utxo{
BtxID: input.btxID,
Vout: input.index,
Height: inputHeight,
ValueSat: t.ValueSat,
})
} else {
ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc)
glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc)
}
}
}
}
for _, t := range txa.Outputs {
for i, t := range txa.Outputs {
if len(t.AddrDesc) > 0 {
s := string(t.AddrDesc)
_, exist := addresses[s]
if !exist {
addresses[s] = struct{}{}
}
b, err := getAddressBalance(t.AddrDesc)
if err != nil {
return err
}
if b != nil {
// subtract number of txs only once
if !exist {
b.Txs--
if d.chainParser.IsAddrDescIndexable(t.AddrDesc) {
balance, err := getAddressBalance(t.AddrDesc)
if err != nil {
return err
}
b.BalanceSat.Sub(&b.BalanceSat, &t.ValueSat)
if b.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&b.BalanceSat, t.AddrDesc, "balance")
if balance != nil {
// subtract number of txs only once
if !exist {
balance.Txs--
}
balance.BalanceSat.Sub(&balance.BalanceSat, &t.ValueSat)
if balance.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&balance.BalanceSat, t.AddrDesc, "balance")
}
markUtxoAsSpent(balance.Utxos, btxID, int32(i), t.AddrDesc)
} else {
ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc)
glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc)
}
} else {
ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc)
glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc)
}
}
}
@ -1218,19 +1244,19 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e
// when connecting block, amount is first in tx on the output side, then in another tx on the input side
// when disconnecting, it must be done backwards
for i := len(blockTxs) - 1; i >= 0; i-- {
txid := blockTxs[i].btxID
s := string(txid)
btxID := blockTxs[i].btxID
s := string(btxID)
txsToDelete[s] = struct{}{}
txa, err := d.getTxAddresses(txid)
txa, err := d.getTxAddresses(btxID)
if err != nil {
return err
}
if txa == nil {
ut, _ := d.chainParser.UnpackTxid(txid)
ut, _ := d.chainParser.UnpackTxid(btxID)
glog.Warning("TxAddress for txid ", ut, " not found")
continue
}
if err := d.disconnectTxAddresses(wb, height, s, blockTxs[i].inputs, txa, txAddressesToUpdate, balances); err != nil {
if err := d.disconnectTxAddresses(wb, height, btxID, blockTxs[i].inputs, txa, txAddressesToUpdate, balances); err != nil {
return err
}
}
@ -1239,7 +1265,7 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e
wb.DeleteCF(d.cfh[cfHeight], key)
}
d.storeTxAddresses(wb, txAddressesToUpdate)
d.storeBalances(wb, balances)
d.storeBalancesDisconnect(wb, balances)
for s := range txsToDelete {
b := []byte(s)
wb.DeleteCF(d.cfh[cfTransactions], b)
@ -1252,6 +1278,27 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e
return err
}
func (d *RocksDB) storeBalancesDisconnect(wb *gorocksdb.WriteBatch, balances map[string]*AddrBalance) {
for _, b := range balances {
if b != nil {
// remove spent utxos
us := make([]Utxo, 0, len(b.Utxos))
for _, u := range b.Utxos {
// remove utxos marked as spent
if u.Vout >= 0 {
us = append(us, u)
}
}
b.Utxos = us
// sort utxos by height
sort.SliceStable(b.Utxos, func(i, j int) bool {
return b.Utxos[i].Height < b.Utxos[j].Height
})
}
}
d.storeBalances(wb, balances)
}
func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {

View File

@ -664,6 +664,14 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) {
Txs: 2,
SentSat: *dbtestdata.SatB1T2A5,
BalanceSat: *dbtestdata.SatB2T3A5,
Utxos: []Utxo{
Utxo{
BtxID: hexToBytes(dbtestdata.TxidB2T3),
Vout: 0,
Height: 225494,
ValueSat: *dbtestdata.SatB2T3A5,
},
},
}
if !reflect.DeepEqual(ab, abw) {
t.Errorf("GetAddressBalance() = %+v, want %+v", ab, abw)