diff --git a/blockbook.go b/blockbook.go index f7d71259..95bd4582 100644 --- a/blockbook.go +++ b/blockbook.go @@ -177,7 +177,7 @@ func main() { if *computeColumnStats { internalState.DbState = common.DbStateOpen - err = index.ComputeInternalStateColumnStats() + err = index.ComputeInternalStateColumnStats(chanOsSignal) if err != nil { glog.Error("internalState: ", err) } @@ -321,7 +321,7 @@ func main() { close(chanStoreInternalState) <-chanSyncIndexDone <-chanSyncMempoolDone - <-chanStoreInternalState + <-chanStoreInternalStateDone } } @@ -412,9 +412,26 @@ func syncMempoolLoop() { } func storeInternalStateLoop() { - defer close(chanStoreInternalStateDone) + stopCompute := make(chan os.Signal) + defer func() { + close(stopCompute) + close(chanStoreInternalStateDone) + }() glog.Info("storeInternalStateLoop starting") + lastCompute := time.Now() + var computeRunning bool tickAndDebounce(storeInternalStatePeriodMs*time.Millisecond, (storeInternalStatePeriodMs-1)*time.Millisecond, chanStoreInternalState, func() { + if !computeRunning && lastCompute.Add(10*time.Hour).Before(time.Now()) { + computeRunning = true + go func() { + err := index.ComputeInternalStateColumnStats(stopCompute) + if err != nil { + glog.Error("computeInternalStateColumnStats error: ", err) + } + lastCompute = time.Now() + computeRunning = false + }() + } if err := index.StoreInternalState(internalState); err != nil { glog.Error("storeInternalStateLoop ", errors.ErrorStack(err)) } diff --git a/db/rocksdb.go b/db/rocksdb.go index 8d835e84..44f75257 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "os" "path/filepath" + "time" "github.com/bsm/go-vlq" "github.com/golang/glog" @@ -958,7 +959,7 @@ func (d *RocksDB) StoreInternalState(is *common.InternalState) error { return d.db.PutCF(d.wo, d.cfh[cfDefault], []byte(internalStateKey), buf) } -func (d *RocksDB) computeColumnSize(col int) (int64, int64, int64, error) { +func (d *RocksDB) computeColumnSize(col int, stopCompute chan os.Signal) (int64, int64, int64, error) { var rows, keysSum, valuesSum int64 var seekKey []byte for { @@ -967,11 +968,16 @@ func (d *RocksDB) computeColumnSize(col int) (int64, int64, int64, error) { if rows == 0 { it.SeekToFirst() } else { - glog.Info("Column ", cfNames[col], ": rows ", rows, ", key bytes ", keysSum, ", value bytes ", valuesSum, ", in progress...") + glog.Info("db: Column ", cfNames[col], ": rows ", rows, ", key bytes ", keysSum, ", value bytes ", valuesSum, ", in progress...") it.Seek(seekKey) it.Next() } for count := 0; it.Valid() && count < refreshIterator; it.Next() { + select { + case <-stopCompute: + return 0, 0, 0, errors.New("Interrupted") + default: + } key = it.Key().Data() count++ rows++ @@ -990,15 +996,18 @@ func (d *RocksDB) computeColumnSize(col int) (int64, int64, int64, error) { // ComputeInternalStateColumnStats computes stats of all db columns and sets them to internal state // can be very slow operation -func (d *RocksDB) ComputeInternalStateColumnStats() error { +func (d *RocksDB) ComputeInternalStateColumnStats(stopCompute chan os.Signal) error { + start := time.Now() + glog.Info("db: ComputeInternalStateColumnStats start") for c := 0; c < len(cfNames); c++ { - rows, keysSum, valuesSum, err := d.computeColumnSize(c) + rows, keysSum, valuesSum, err := d.computeColumnSize(c, stopCompute) if err != nil { return err } d.is.SetDBColumnStats(c, rows, keysSum, valuesSum) - glog.Info("Column ", cfNames[c], ": rows ", rows, ", key bytes ", keysSum, ", value bytes ", valuesSum) + glog.Info("db: Column ", cfNames[c], ": rows ", rows, ", key bytes ", keysSum, ", value bytes ", valuesSum) } + glog.Info("db: ComputeInternalStateColumnStats finished in ", time.Since(start)) return nil }