diff --git a/db/rocksdb.go b/db/rocksdb.go index d65325f5..d3b216df 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -10,8 +10,10 @@ import ( "math/big" "os" "path/filepath" + "sort" "strconv" "time" + "unsafe" vlq "github.com/bsm/go-vlq" "github.com/golang/glog" @@ -565,7 +567,7 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add balance.Txs++ } balance.BalanceSat.Sub(&balance.BalanceSat, &spentOutput.ValueSat) - balance.Utxos = removeUtxo(balance.Utxos, btxID, int32(input.Vout), spentOutput.AddrDesc) + markUtxoAsSpent(balance.Utxos, btxID, int32(input.Vout), spentOutput.AddrDesc) if balance.BalanceSat.Sign() < 0 { d.resetValueSatToZero(&balance.BalanceSat, spentOutput.AddrDesc, "balance") } @@ -576,14 +578,18 @@ func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses add return nil } -func removeUtxo(utxos []Utxo, btxID []byte, vout int32, addrDesc bchain.AddressDescriptor) []Utxo { - for i, utxo := range utxos { - if utxo.Vout == vout && bytes.Compare(utxo.BtxID, btxID) == 0 { - return append(utxos[:i], utxos[i+1:]...) +// markUtxoAsSpent finds outpoint btxID:vout in utxos and marks it as spent +// it is much faster than removing the utxo from the slice as it would cause in many memory copy operations +func markUtxoAsSpent(utxos []Utxo, btxID []byte, vout int32, addrDesc bchain.AddressDescriptor) { + for i := range utxos { + utxo := &utxos[i] + if utxo.Vout == vout && *(*int)(unsafe.Pointer(&utxo.BtxID[0])) == *(*int)(unsafe.Pointer(&btxID[0])) && bytes.Equal(utxo.BtxID, btxID) { + // mark utxo as spent by setting vout=-1 + utxo.Vout = -1 + return } } glog.Errorf("Utxo %s:%d not found in addrDesc %s", hex.EncodeToString(btxID), vout, addrDesc) - return utxos } // addToAddressesMap maintains mapping between addresses and transactions in one block @@ -843,7 +849,8 @@ func unpackAddrBalance(buf []byte, txidUnpackedLen int) (*AddrBalance, error) { sentSat, sl := unpackBigint(buf[l:]) balanceSat, bl := unpackBigint(buf[l+sl:]) l = l + sl + bl - utxos := make([]Utxo, 0) + // estimate the size of utxos to avoid reallocation + utxos := make([]Utxo, 0, len(buf[l:])/txidUnpackedLen+3) for len(buf[l:]) >= txidUnpackedLen+3 { btxID := append([]byte(nil), buf[l:l+txidUnpackedLen]...) l += txidUnpackedLen @@ -877,13 +884,16 @@ func packAddrBalance(ab *AddrBalance, buf, varBuf []byte) []byte { l = packBigint(&ab.BalanceSat, varBuf) buf = append(buf, varBuf[:l]...) for _, utxo := range ab.Utxos { - buf = append(buf, utxo.BtxID...) - l = packVaruint(uint(utxo.Vout), varBuf) - buf = append(buf, varBuf[:l]...) - l = packVaruint(uint(utxo.Height), varBuf) - buf = append(buf, varBuf[:l]...) - l = packBigint(&utxo.ValueSat, varBuf) - buf = append(buf, varBuf[:l]...) + // if Vout < 0, utxo is marked as spent + if utxo.Vout >= 0 { + buf = append(buf, utxo.BtxID...) + l = packVaruint(uint(utxo.Vout), varBuf) + buf = append(buf, varBuf[:l]...) + l = packVaruint(uint(utxo.Height), varBuf) + buf = append(buf, varBuf[:l]...) + l = packBigint(&utxo.ValueSat, varBuf) + buf = append(buf, varBuf[:l]...) + } } return buf } @@ -1102,8 +1112,10 @@ func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, height uint32, bi *Block // Disconnect blocks -func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, txid string, inputs []outpoint, txa *TxAddresses, +func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, btxID []byte, inputs []outpoint, txa *TxAddresses, txAddressesToUpdate map[string]*TxAddresses, balances map[string]*AddrBalance) error { + var err error + var balance *AddrBalance addresses := make(map[string]struct{}) getAddressBalance := func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error) { var err error @@ -1120,33 +1132,16 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, } for i, t := range txa.Inputs { if len(t.AddrDesc) > 0 { + input := &inputs[i] s := string(t.AddrDesc) _, exist := addresses[s] if !exist { addresses[s] = struct{}{} } - b, err := getAddressBalance(t.AddrDesc) - if err != nil { - return err - } - if b != nil { - // subtract number of txs only once - if !exist { - b.Txs-- - } - b.SentSat.Sub(&b.SentSat, &t.ValueSat) - if b.SentSat.Sign() < 0 { - d.resetValueSatToZero(&b.SentSat, t.AddrDesc, "sent amount") - } - b.BalanceSat.Add(&b.BalanceSat, &t.ValueSat) - } else { - ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc) - glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc) - } - s = string(inputs[i].btxID) - sa, exist := txAddressesToUpdate[s] - if !exist { - sa, err = d.getTxAddresses(inputs[i].btxID) + s = string(input.btxID) + sa, found := txAddressesToUpdate[s] + if !found { + sa, err = d.getTxAddresses(input.btxID) if err != nil { return err } @@ -1154,34 +1149,65 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, txAddressesToUpdate[s] = sa } } + var inputHeight uint32 if sa != nil { - sa.Outputs[inputs[i].index].Spent = false + sa.Outputs[input.index].Spent = false + inputHeight = sa.Height + } + if d.chainParser.IsAddrDescIndexable(t.AddrDesc) { + balance, err = getAddressBalance(t.AddrDesc) + if err != nil { + return err + } + if balance != nil { + // subtract number of txs only once + if !exist { + balance.Txs-- + } + balance.SentSat.Sub(&balance.SentSat, &t.ValueSat) + if balance.SentSat.Sign() < 0 { + d.resetValueSatToZero(&balance.SentSat, t.AddrDesc, "sent amount") + } + balance.BalanceSat.Add(&balance.BalanceSat, &t.ValueSat) + balance.Utxos = append(balance.Utxos, Utxo{ + BtxID: input.btxID, + Vout: input.index, + Height: inputHeight, + ValueSat: t.ValueSat, + }) + } else { + ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc) + glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc) + } } } } - for _, t := range txa.Outputs { + for i, t := range txa.Outputs { if len(t.AddrDesc) > 0 { s := string(t.AddrDesc) _, exist := addresses[s] if !exist { addresses[s] = struct{}{} } - b, err := getAddressBalance(t.AddrDesc) - if err != nil { - return err - } - if b != nil { - // subtract number of txs only once - if !exist { - b.Txs-- + if d.chainParser.IsAddrDescIndexable(t.AddrDesc) { + balance, err := getAddressBalance(t.AddrDesc) + if err != nil { + return err } - b.BalanceSat.Sub(&b.BalanceSat, &t.ValueSat) - if b.BalanceSat.Sign() < 0 { - d.resetValueSatToZero(&b.BalanceSat, t.AddrDesc, "balance") + if balance != nil { + // subtract number of txs only once + if !exist { + balance.Txs-- + } + balance.BalanceSat.Sub(&balance.BalanceSat, &t.ValueSat) + if balance.BalanceSat.Sign() < 0 { + d.resetValueSatToZero(&balance.BalanceSat, t.AddrDesc, "balance") + } + markUtxoAsSpent(balance.Utxos, btxID, int32(i), t.AddrDesc) + } else { + ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc) + glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc) } - } else { - ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc) - glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc) } } } @@ -1218,19 +1244,19 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e // when connecting block, amount is first in tx on the output side, then in another tx on the input side // when disconnecting, it must be done backwards for i := len(blockTxs) - 1; i >= 0; i-- { - txid := blockTxs[i].btxID - s := string(txid) + btxID := blockTxs[i].btxID + s := string(btxID) txsToDelete[s] = struct{}{} - txa, err := d.getTxAddresses(txid) + txa, err := d.getTxAddresses(btxID) if err != nil { return err } if txa == nil { - ut, _ := d.chainParser.UnpackTxid(txid) + ut, _ := d.chainParser.UnpackTxid(btxID) glog.Warning("TxAddress for txid ", ut, " not found") continue } - if err := d.disconnectTxAddresses(wb, height, s, blockTxs[i].inputs, txa, txAddressesToUpdate, balances); err != nil { + if err := d.disconnectTxAddresses(wb, height, btxID, blockTxs[i].inputs, txa, txAddressesToUpdate, balances); err != nil { return err } } @@ -1239,7 +1265,7 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e wb.DeleteCF(d.cfh[cfHeight], key) } d.storeTxAddresses(wb, txAddressesToUpdate) - d.storeBalances(wb, balances) + d.storeBalancesDisconnect(wb, balances) for s := range txsToDelete { b := []byte(s) wb.DeleteCF(d.cfh[cfTransactions], b) @@ -1252,6 +1278,27 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e return err } +func (d *RocksDB) storeBalancesDisconnect(wb *gorocksdb.WriteBatch, balances map[string]*AddrBalance) { + for _, b := range balances { + if b != nil { + // remove spent utxos + us := make([]Utxo, 0, len(b.Utxos)) + for _, u := range b.Utxos { + // remove utxos marked as spent + if u.Vout >= 0 { + us = append(us, u) + } + } + b.Utxos = us + // sort utxos by height + sort.SliceStable(b.Utxos, func(i, j int) bool { + return b.Utxos[i].Height < b.Utxos[j].Height + }) + } + } + d.storeBalances(wb, balances) + +} func dirSize(path string) (int64, error) { var size int64 err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index 40f1e192..5fe27347 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -664,6 +664,14 @@ func TestRocksDB_Index_BitcoinType(t *testing.T) { Txs: 2, SentSat: *dbtestdata.SatB1T2A5, BalanceSat: *dbtestdata.SatB2T3A5, + Utxos: []Utxo{ + Utxo{ + BtxID: hexToBytes(dbtestdata.TxidB2T3), + Vout: 0, + Height: 225494, + ValueSat: *dbtestdata.SatB2T3A5, + }, + }, } if !reflect.DeepEqual(ab, abw) { t.Errorf("GetAddressBalance() = %+v, want %+v", ab, abw)