diff --git a/db/rocksdb.go b/db/rocksdb.go index d4f75825..fc48e42e 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -612,7 +612,7 @@ func (ab *AddrBalance) markUtxoAsSpent(btxID []byte, vout int32) { } } } - glog.Errorf("Utxo %s:%d not found, using in map %v", hex.EncodeToString(btxID), vout, len(ab.utxosMap) != 0) + glog.Errorf("Utxo %s:%d not found, utxosMap size %d", hex.EncodeToString(btxID), vout, len(ab.utxosMap)) } type blockTxs struct { @@ -1320,33 +1320,16 @@ func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, height uint32, bi *Block // Disconnect blocks -func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, btxID []byte, inputs []outpoint, txa *TxAddresses, - txAddressesToUpdate map[string]*TxAddresses, balances map[string]*AddrBalance) error { +func (d *RocksDB) disconnectTxAddressesInputs(wb *gorocksdb.WriteBatch, btxID []byte, inputs []outpoint, txa *TxAddresses, txAddressesToUpdate map[string]*TxAddresses, + getAddressBalance func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error), + addressFoundInTx func(addrDesc bchain.AddressDescriptor, btxID []byte) bool) error { var err error var balance *AddrBalance - addresses := make(map[string]struct{}) - getAddressBalance := func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error) { - var err error - s := string(addrDesc) - b, fb := balances[s] - if !fb { - b, err = d.GetAddrDescBalance(addrDesc, addressBalanceDetailUTXOIndexed) - if err != nil { - return nil, err - } - balances[s] = b - } - return b, nil - } for i, t := range txa.Inputs { if len(t.AddrDesc) > 0 { input := &inputs[i] - s := string(t.AddrDesc) - _, exist := addresses[s] - if !exist { - addresses[s] = struct{}{} - } - s = string(input.btxID) + exist := addressFoundInTx(t.AddrDesc, btxID) + s := string(input.btxID) sa, found := txAddressesToUpdate[s] if !found { sa, err = d.getTxAddresses(input.btxID) @@ -1390,13 +1373,15 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, } } } + return nil +} + +func (d *RocksDB) disconnectTxAddressesOutputs(wb *gorocksdb.WriteBatch, btxID []byte, txa *TxAddresses, + getAddressBalance func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error), + addressFoundInTx func(addrDesc bchain.AddressDescriptor, btxID []byte) bool) error { for i, t := range txa.Outputs { if len(t.AddrDesc) > 0 { - s := string(t.AddrDesc) - _, exist := addresses[s] - if !exist { - addresses[s] = struct{}{} - } + exist := addressFoundInTx(t.AddrDesc, btxID) if d.chainParser.IsAddrDescIndexable(t.AddrDesc) { balance, err := getAddressBalance(t.AddrDesc) if err != nil { @@ -1419,11 +1404,96 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, } } } - for a := range addresses { + return nil +} + +func (d *RocksDB) disconnectBlock(height uint32, blockTxs []blockTxs) error { + wb := gorocksdb.NewWriteBatch() + defer wb.Destroy() + txAddressesToUpdate := make(map[string]*TxAddresses) + txAddresses := make([]*TxAddresses, len(blockTxs)) + txsToDelete := make(map[string]struct{}) + + balances := make(map[string]*AddrBalance) + getAddressBalance := func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error) { + var err error + s := string(addrDesc) + b, fb := balances[s] + if !fb { + // do not use addressBalanceDetailUTXOIndexed as the utxo may be in wrong order for the helper map + b, err = d.GetAddrDescBalance(addrDesc, AddressBalanceDetailUTXO) + if err != nil { + return nil, err + } + balances[s] = b + } + return b, nil + } + + // all addresses in the block are stored in blockAddressesTxs, together with a map of transactions where they appear + blockAddressesTxs := make(map[string]map[string]struct{}) + // addressFoundInTx handles updates of the blockAddressesTxs map and returns true if the address+tx was already encountered + addressFoundInTx := func(addrDesc bchain.AddressDescriptor, btxID []byte) bool { + sAddrDesc := string(addrDesc) + sBtxID := string(btxID) + a, exist := blockAddressesTxs[sAddrDesc] + if !exist { + blockAddressesTxs[sAddrDesc] = map[string]struct{}{sBtxID: struct{}{}} + } else { + _, exist = a[sBtxID] + if !exist { + a[sBtxID] = struct{}{} + } + } + return exist + } + + glog.Info("Disconnecting block ", height, " containing ", len(blockTxs), " transactions") + // when connecting block, outputs are processed first + // when disconnecting, inputs must be reversed first + for i := range blockTxs { + btxID := blockTxs[i].btxID + s := string(btxID) + txsToDelete[s] = struct{}{} + txa, err := d.getTxAddresses(btxID) + if err != nil { + return err + } + if txa == nil { + ut, _ := d.chainParser.UnpackTxid(btxID) + glog.Warning("TxAddress for txid ", ut, " not found") + continue + } + txAddresses[i] = txa + if err := d.disconnectTxAddressesInputs(wb, btxID, blockTxs[i].inputs, txa, txAddressesToUpdate, getAddressBalance, addressFoundInTx); err != nil { + return err + } + } + for i := range blockTxs { + btxID := blockTxs[i].btxID + txa := txAddresses[i] + if txa == nil { + continue + } + if err := d.disconnectTxAddressesOutputs(wb, btxID, txa, getAddressBalance, addressFoundInTx); err != nil { + return err + } + } + for a := range blockAddressesTxs { key := packAddressKey([]byte(a), height) wb.DeleteCF(d.cfh[cfAddresses], key) } - return nil + key := packUint(height) + wb.DeleteCF(d.cfh[cfBlockTxs], key) + wb.DeleteCF(d.cfh[cfHeight], key) + d.storeTxAddresses(wb, txAddressesToUpdate) + d.storeBalancesDisconnect(wb, balances) + for s := range txsToDelete { + b := []byte(s) + wb.DeleteCF(d.cfh[cfTransactions], b) + wb.DeleteCF(d.cfh[cfTxAddresses], b) + } + return d.db.Write(d.wo, wb) } // DisconnectBlockRangeBitcoinType removes all data belonging to blocks in range lower-higher @@ -1440,51 +1510,15 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e } blocks[height-lower] = blockTxs } - wb := gorocksdb.NewWriteBatch() - defer wb.Destroy() - txAddressesToUpdate := make(map[string]*TxAddresses) - txsToDelete := make(map[string]struct{}) - balances := make(map[string]*AddrBalance) for height := higher; height >= lower; height-- { - blockTxs := blocks[height-lower] - glog.Info("Disconnecting block ", height, " containing ", len(blockTxs), " transactions") - // go backwards to avoid interim negative balance - // when connecting block, amount is first in tx on the output side, then in another tx on the input side - // when disconnecting, it must be done backwards - for i := len(blockTxs) - 1; i >= 0; i-- { - btxID := blockTxs[i].btxID - s := string(btxID) - txsToDelete[s] = struct{}{} - txa, err := d.getTxAddresses(btxID) - if err != nil { - return err - } - if txa == nil { - ut, _ := d.chainParser.UnpackTxid(btxID) - glog.Warning("TxAddress for txid ", ut, " not found") - continue - } - if err := d.disconnectTxAddresses(wb, height, btxID, blockTxs[i].inputs, txa, txAddressesToUpdate, balances); err != nil { - return err - } + err := d.disconnectBlock(height, blocks[height-lower]) + if err != nil { + return err } - key := packUint(height) - wb.DeleteCF(d.cfh[cfBlockTxs], key) - wb.DeleteCF(d.cfh[cfHeight], key) } - d.storeTxAddresses(wb, txAddressesToUpdate) - d.storeBalancesDisconnect(wb, balances) - for s := range txsToDelete { - b := []byte(s) - wb.DeleteCF(d.cfh[cfTransactions], b) - wb.DeleteCF(d.cfh[cfTxAddresses], b) - } - err := d.db.Write(d.wo, wb) - if err == nil { - d.is.RemoveLastBlockTimes(int(higher-lower) + 1) - glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher) - } - return err + d.is.RemoveLastBlockTimes(int(higher-lower) + 1) + glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher) + return nil } func (d *RocksDB) storeBalancesDisconnect(wb *gorocksdb.WriteBatch, balances map[string]*AddrBalance) {