Check for error in utxo DB

pull/380/head
Martin Boehm 2020-01-26 20:12:12 +01:00
parent 273b880245
commit 0751ed452c
2 changed files with 79 additions and 0 deletions

View File

@ -54,6 +54,7 @@ var (
synchronize = flag.Bool("sync", false, "synchronizes until tip, if together with zeromq, keeps index synchronized")
repair = flag.Bool("repair", false, "repair the database")
fixUtxo = flag.Bool("fixutxo", false, "check and fix utxo db and exit")
prof = flag.String("prof", "", "http server binding [address]:port of the interface to profiling data /debug/pprof/ (default no profiling)")
syncChunk = flag.Int("chunk", 100, "block chunk size for processing in bulk mode")
@ -178,6 +179,15 @@ func mainWithExitCode() int {
}
defer index.Close()
if *fixUtxo {
err = index.FixUtxos(chanOsSignal)
if err != nil {
glog.Error("fixUtxos: ", err)
return exitCodeFatal
}
return exitCodeOK
}
internalState, err = newInternalState(coin, coinShortcut, coinLabel, index)
if err != nil {
glog.Error("internalState: ", err)
@ -556,6 +566,7 @@ func storeInternalStateLoop() {
close(stopCompute)
close(chanStoreInternalStateDone)
}()
signal.Notify(stopCompute, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
var computeRunning bool
lastCompute := time.Now()
lastAppInfo := time.Now()

View File

@ -1758,6 +1758,74 @@ func (d *RocksDB) ComputeInternalStateColumnStats(stopCompute chan os.Signal) er
return nil
}
func (d *RocksDB) fixUtxo(ba *AddrBalance) error {
var checksum big.Int
for i := range ba.Utxos {
checksum.Add(&checksum, &ba.Utxos[i].ValueSat)
}
if checksum.Cmp(&ba.BalanceSat) != 0 {
return errors.Errorf("balance %s, checksum %s, txs %d", ba.BalanceSat.String(), checksum.String(), ba.Txs)
}
return nil
}
// FixUtxos checks and fixes possible
func (d *RocksDB) FixUtxos(stop chan os.Signal) error {
if d.chainParser.GetChainType() != bchain.ChainBitcoinType {
glog.Info("FixUtxos: applicable only for bitcoin type coins")
return nil
}
glog.Info("FixUtxos: starting")
var row, errorsCount int64
var seekKey []byte
// do not use cache
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
for {
var addrDesc bchain.AddressDescriptor
it := d.db.NewIteratorCF(ro, d.cfh[cfAddressBalance])
if row == 0 {
it.SeekToFirst()
} else {
glog.Info("FixUtxos: row ", row, ", errors ", errorsCount)
it.Seek(seekKey)
it.Next()
}
for count := 0; it.Valid() && count < refreshIterator; it.Next() {
select {
case <-stop:
return errors.New("Interrupted")
default:
}
addrDesc = it.Key().Data()
buf := it.Value().Data()
count++
row++
if len(buf) < 3 {
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", empty data")
continue
}
ba, err := unpackAddrBalance(buf, d.chainParser.PackedTxidLen(), AddressBalanceDetailUTXO)
if err != nil {
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", unpackAddrBalance error ", err)
continue
}
err = d.fixUtxo(ba)
if err != nil {
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", error ", err)
}
}
seekKey = append([]byte{}, addrDesc...)
valid := it.Valid()
it.Close()
if !valid {
break
}
}
glog.Info("FixUtxos: finished, scanned ", row, " rows, found ", errorsCount, " errors")
return nil
}
// Helpers
func packAddressKey(addrDesc bchain.AddressDescriptor, height uint32) []byte {