Fix errors in utxo db

pull/380/head
Martin Boehm 2020-01-27 14:33:58 +01:00
parent 58f426207e
commit 2493c3d1af
1 changed files with 62 additions and 7 deletions

View File

@ -1765,15 +1765,64 @@ func (d *RocksDB) ComputeInternalStateColumnStats(stopCompute chan os.Signal) er
return nil
}
func (d *RocksDB) fixUtxo(ba *AddrBalance) error {
func (d *RocksDB) fixUtxo(addrDesc bchain.AddressDescriptor, ba *AddrBalance) (bool, error) {
var checksum big.Int
for i := range ba.Utxos {
checksum.Add(&checksum, &ba.Utxos[i].ValueSat)
}
if checksum.Cmp(&ba.BalanceSat) != 0 {
return errors.Errorf("balance %s, checksum %s, txs %d", ba.BalanceSat.String(), checksum.String(), ba.Txs)
var checksumFromTxs big.Int
var utxos []Utxo
err := d.GetAddrDescTransactions(addrDesc, 0, ^uint32(0), func(txid string, height uint32, indexes []int32) error {
var ta *TxAddresses
var err error
for _, index := range indexes {
// take only outputs
if index >= 0 {
if ta == nil {
ta, err = d.GetTxAddresses(txid)
if err != nil {
return err
}
}
if ta == nil {
return errors.New("DB inconsistency: tx " + txid + ": not found in txAddresses")
} else {
if len(ta.Outputs) <= int(index) {
glog.Warning("DB inconsistency: txAddresses " + txid + " does not have enough outputs")
} else {
tao := &ta.Outputs[index]
if !tao.Spent {
bTxid, _ := d.chainParser.PackTxid(txid)
checksumFromTxs.Add(&checksumFromTxs, &tao.ValueSat)
utxos = append(utxos, Utxo{BtxID: bTxid, Height: height, Vout: index, ValueSat: tao.ValueSat})
}
}
}
}
}
return nil
})
if err != nil {
return false, err
}
fixed := false
if checksumFromTxs.Cmp(&ba.BalanceSat) == 0 {
ba.Utxos = utxos
wb := gorocksdb.NewWriteBatch()
err = d.storeBalances(wb, map[string]*AddrBalance{string(addrDesc): ba})
if err == nil {
err = d.db.Write(d.wo, wb)
}
wb.Destroy()
if err != nil {
return false, errors.Errorf("balance %s, checksum %s, from txa %s, txs %d, error storing fixed utxos %v", ba.BalanceSat.String(), checksum.String(), checksumFromTxs.String(), ba.Txs, err)
}
fixed = true
}
return fixed, errors.Errorf("balance %s, checksum %s, from txa %s, txs %d", ba.BalanceSat.String(), checksum.String(), checksumFromTxs.String(), ba.Txs)
}
return nil
return false, nil
}
// FixUtxos checks and fixes possible
@ -1783,7 +1832,7 @@ func (d *RocksDB) FixUtxos(stop chan os.Signal) error {
return nil
}
glog.Info("FixUtxos: starting")
var row, errorsCount int64
var row, errorsCount, fixedCount int64
var seekKey []byte
// do not use cache
ro := gorocksdb.NewDefaultReadOptions()
@ -1810,16 +1859,22 @@ func (d *RocksDB) FixUtxos(stop chan os.Signal) error {
row++
if len(buf) < 3 {
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", empty data")
errorsCount++
continue
}
ba, err := unpackAddrBalance(buf, d.chainParser.PackedTxidLen(), AddressBalanceDetailUTXO)
if err != nil {
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", unpackAddrBalance error ", err)
errorsCount++
continue
}
err = d.fixUtxo(ba)
fixed, err := d.fixUtxo(addrDesc, ba)
if err != nil {
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", error ", err)
errorsCount++
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", error ", err, ", fixed ", fixed)
if fixed {
fixedCount++
}
}
}
seekKey = append([]byte{}, addrDesc...)
@ -1829,7 +1884,7 @@ func (d *RocksDB) FixUtxos(stop chan os.Signal) error {
break
}
}
glog.Info("FixUtxos: finished, scanned ", row, " rows, found ", errorsCount, " errors")
glog.Info("FixUtxos: finished, scanned ", row, " rows, found ", errorsCount, " errors, fixed ", fixedCount)
return nil
}