Alter disconnect block procedure to avoid possible utxo db inconsistency

pull/380/head
Martin Boehm 2020-01-31 09:29:54 +01:00
parent fd4181d03f
commit 00b0a402ea
1 changed files with 106 additions and 72 deletions

View File

@ -612,7 +612,7 @@ func (ab *AddrBalance) markUtxoAsSpent(btxID []byte, vout int32) {
}
}
}
glog.Errorf("Utxo %s:%d not found, using in map %v", hex.EncodeToString(btxID), vout, len(ab.utxosMap) != 0)
glog.Errorf("Utxo %s:%d not found, utxosMap size %d", hex.EncodeToString(btxID), vout, len(ab.utxosMap))
}
type blockTxs struct {
@ -1320,33 +1320,16 @@ func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, height uint32, bi *Block
// Disconnect blocks
func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32, btxID []byte, inputs []outpoint, txa *TxAddresses,
txAddressesToUpdate map[string]*TxAddresses, balances map[string]*AddrBalance) error {
func (d *RocksDB) disconnectTxAddressesInputs(wb *gorocksdb.WriteBatch, btxID []byte, inputs []outpoint, txa *TxAddresses, txAddressesToUpdate map[string]*TxAddresses,
getAddressBalance func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error),
addressFoundInTx func(addrDesc bchain.AddressDescriptor, btxID []byte) bool) error {
var err error
var balance *AddrBalance
addresses := make(map[string]struct{})
getAddressBalance := func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error) {
var err error
s := string(addrDesc)
b, fb := balances[s]
if !fb {
b, err = d.GetAddrDescBalance(addrDesc, addressBalanceDetailUTXOIndexed)
if err != nil {
return nil, err
}
balances[s] = b
}
return b, nil
}
for i, t := range txa.Inputs {
if len(t.AddrDesc) > 0 {
input := &inputs[i]
s := string(t.AddrDesc)
_, exist := addresses[s]
if !exist {
addresses[s] = struct{}{}
}
s = string(input.btxID)
exist := addressFoundInTx(t.AddrDesc, btxID)
s := string(input.btxID)
sa, found := txAddressesToUpdate[s]
if !found {
sa, err = d.getTxAddresses(input.btxID)
@ -1390,13 +1373,15 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32,
}
}
}
return nil
}
func (d *RocksDB) disconnectTxAddressesOutputs(wb *gorocksdb.WriteBatch, btxID []byte, txa *TxAddresses,
getAddressBalance func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error),
addressFoundInTx func(addrDesc bchain.AddressDescriptor, btxID []byte) bool) error {
for i, t := range txa.Outputs {
if len(t.AddrDesc) > 0 {
s := string(t.AddrDesc)
_, exist := addresses[s]
if !exist {
addresses[s] = struct{}{}
}
exist := addressFoundInTx(t.AddrDesc, btxID)
if d.chainParser.IsAddrDescIndexable(t.AddrDesc) {
balance, err := getAddressBalance(t.AddrDesc)
if err != nil {
@ -1419,11 +1404,96 @@ func (d *RocksDB) disconnectTxAddresses(wb *gorocksdb.WriteBatch, height uint32,
}
}
}
for a := range addresses {
return nil
}
func (d *RocksDB) disconnectBlock(height uint32, blockTxs []blockTxs) error {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
txAddressesToUpdate := make(map[string]*TxAddresses)
txAddresses := make([]*TxAddresses, len(blockTxs))
txsToDelete := make(map[string]struct{})
balances := make(map[string]*AddrBalance)
getAddressBalance := func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error) {
var err error
s := string(addrDesc)
b, fb := balances[s]
if !fb {
// do not use addressBalanceDetailUTXOIndexed as the utxo may be in wrong order for the helper map
b, err = d.GetAddrDescBalance(addrDesc, AddressBalanceDetailUTXO)
if err != nil {
return nil, err
}
balances[s] = b
}
return b, nil
}
// all addresses in the block are stored in blockAddressesTxs, together with a map of transactions where they appear
blockAddressesTxs := make(map[string]map[string]struct{})
// addressFoundInTx handles updates of the blockAddressesTxs map and returns true if the address+tx was already encountered
addressFoundInTx := func(addrDesc bchain.AddressDescriptor, btxID []byte) bool {
sAddrDesc := string(addrDesc)
sBtxID := string(btxID)
a, exist := blockAddressesTxs[sAddrDesc]
if !exist {
blockAddressesTxs[sAddrDesc] = map[string]struct{}{sBtxID: struct{}{}}
} else {
_, exist = a[sBtxID]
if !exist {
a[sBtxID] = struct{}{}
}
}
return exist
}
glog.Info("Disconnecting block ", height, " containing ", len(blockTxs), " transactions")
// when connecting block, outputs are processed first
// when disconnecting, inputs must be reversed first
for i := range blockTxs {
btxID := blockTxs[i].btxID
s := string(btxID)
txsToDelete[s] = struct{}{}
txa, err := d.getTxAddresses(btxID)
if err != nil {
return err
}
if txa == nil {
ut, _ := d.chainParser.UnpackTxid(btxID)
glog.Warning("TxAddress for txid ", ut, " not found")
continue
}
txAddresses[i] = txa
if err := d.disconnectTxAddressesInputs(wb, btxID, blockTxs[i].inputs, txa, txAddressesToUpdate, getAddressBalance, addressFoundInTx); err != nil {
return err
}
}
for i := range blockTxs {
btxID := blockTxs[i].btxID
txa := txAddresses[i]
if txa == nil {
continue
}
if err := d.disconnectTxAddressesOutputs(wb, btxID, txa, getAddressBalance, addressFoundInTx); err != nil {
return err
}
}
for a := range blockAddressesTxs {
key := packAddressKey([]byte(a), height)
wb.DeleteCF(d.cfh[cfAddresses], key)
}
return nil
key := packUint(height)
wb.DeleteCF(d.cfh[cfBlockTxs], key)
wb.DeleteCF(d.cfh[cfHeight], key)
d.storeTxAddresses(wb, txAddressesToUpdate)
d.storeBalancesDisconnect(wb, balances)
for s := range txsToDelete {
b := []byte(s)
wb.DeleteCF(d.cfh[cfTransactions], b)
wb.DeleteCF(d.cfh[cfTxAddresses], b)
}
return d.db.Write(d.wo, wb)
}
// DisconnectBlockRangeBitcoinType removes all data belonging to blocks in range lower-higher
@ -1440,51 +1510,15 @@ func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) e
}
blocks[height-lower] = blockTxs
}
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
txAddressesToUpdate := make(map[string]*TxAddresses)
txsToDelete := make(map[string]struct{})
balances := make(map[string]*AddrBalance)
for height := higher; height >= lower; height-- {
blockTxs := blocks[height-lower]
glog.Info("Disconnecting block ", height, " containing ", len(blockTxs), " transactions")
// go backwards to avoid interim negative balance
// when connecting block, amount is first in tx on the output side, then in another tx on the input side
// when disconnecting, it must be done backwards
for i := len(blockTxs) - 1; i >= 0; i-- {
btxID := blockTxs[i].btxID
s := string(btxID)
txsToDelete[s] = struct{}{}
txa, err := d.getTxAddresses(btxID)
if err != nil {
return err
}
if txa == nil {
ut, _ := d.chainParser.UnpackTxid(btxID)
glog.Warning("TxAddress for txid ", ut, " not found")
continue
}
if err := d.disconnectTxAddresses(wb, height, btxID, blockTxs[i].inputs, txa, txAddressesToUpdate, balances); err != nil {
return err
}
err := d.disconnectBlock(height, blocks[height-lower])
if err != nil {
return err
}
key := packUint(height)
wb.DeleteCF(d.cfh[cfBlockTxs], key)
wb.DeleteCF(d.cfh[cfHeight], key)
}
d.storeTxAddresses(wb, txAddressesToUpdate)
d.storeBalancesDisconnect(wb, balances)
for s := range txsToDelete {
b := []byte(s)
wb.DeleteCF(d.cfh[cfTransactions], b)
wb.DeleteCF(d.cfh[cfTxAddresses], b)
}
err := d.db.Write(d.wo, wb)
if err == nil {
d.is.RemoveLastBlockTimes(int(higher-lower) + 1)
glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher)
}
return err
d.is.RemoveLastBlockTimes(int(higher-lower) + 1)
glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher)
return nil
}
func (d *RocksDB) storeBalancesDisconnect(wb *gorocksdb.WriteBatch, balances map[string]*AddrBalance) {