Add internal state of the application

pull/7/head
Martin Boehm 2018-05-22 12:56:51 +02:00
parent 1b3fc581ac
commit 291e99ba5f
3 changed files with 184 additions and 4 deletions

View File

@ -149,11 +149,26 @@ func main() {
}
defer index.Close()
common.IS, err = index.LoadInternalState()
if err != nil {
glog.Fatal("internalState: ", err)
}
if common.IS.DbState != common.DbStateClosed {
glog.Warning("internalState: database in not closed state ", common.IS.DbState, ", possibly previous ungraceful shutdown")
}
syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics)
if err != nil {
glog.Fatalf("NewSyncWorker %v", err)
}
// set the DbState to open at this moment, after all important workers are initialized
common.IS.DbState = common.DbStateOpen
err = index.StoreInternalState(common.IS)
if err != nil {
glog.Fatal("internalState: ", err)
}
if *rollbackHeight >= 0 {
bestHeight, bestHash, err := index.GetBestBlock()
if err != nil {

View File

@ -0,0 +1,106 @@
package common
import (
"encoding/json"
"sync"
"time"
)
const (
// DbStateClosed means db was closed gracefully
DbStateClosed = uint32(iota)
// DbStateOpen means db is open or application died without closing the db
DbStateOpen
)
// InternalStateColumn contains the data of a db column
type InternalStateColumn struct {
Name string `json:"name"`
Version uint32 `json:"version"`
Rows int64 `json:"rows"`
KeysSum int64 `json:"keysSum"`
ValuesSum int64 `json:"valuesSum"`
}
// InternalState contains the data of the internal state
type InternalState struct {
mux sync.Mutex
DbState uint32 `json:"dbState"`
LastStore time.Time `json:"lastStore"`
IsSynchronized bool `json:"isSynchronized"`
BestHeight uint32 `json:"bestHeight"`
LastSync time.Time `json:"lastSync"`
IsMempoolSynchronized bool `json:"isMempoolSynchronized"`
LastMempoolSync time.Time `json:"lastMempoolSync"`
DbColumns []InternalStateColumn `json:"dbColumns"`
}
// IS is a singleton holding internal state of the application
var IS *InternalState
func (is *InternalState) StartedSync() {
is.mux.Lock()
defer is.mux.Unlock()
is.IsSynchronized = false
}
func (is *InternalState) FinishedSync(bestHeight uint32) {
is.mux.Lock()
defer is.mux.Unlock()
is.IsSynchronized = true
is.BestHeight = bestHeight
is.LastSync = time.Now()
}
func (is *InternalState) GetSyncState() (bool, uint32, time.Time) {
is.mux.Lock()
defer is.mux.Unlock()
return is.IsSynchronized, is.BestHeight, is.LastSync
}
func (is *InternalState) StartedMempoolSync() {
is.mux.Lock()
defer is.mux.Unlock()
is.IsMempoolSynchronized = false
}
func (is *InternalState) FinishedMempoolSync() {
is.mux.Lock()
defer is.mux.Unlock()
is.IsMempoolSynchronized = true
is.LastMempoolSync = time.Now()
}
func (is *InternalState) GetMempoolSyncState() (bool, time.Time) {
is.mux.Lock()
defer is.mux.Unlock()
return is.IsMempoolSynchronized, is.LastMempoolSync
}
func (is *InternalState) AddDBColumnStats(c int, rowsDiff int64, keysSumDiff int64, valuesSumDiff int64) {
is.mux.Lock()
defer is.mux.Unlock()
is.DbColumns[c].Rows += rowsDiff
is.DbColumns[c].KeysSum += keysSumDiff
is.DbColumns[c].ValuesSum += valuesSumDiff
}
func (is *InternalState) Pack() ([]byte, error) {
is.mux.Lock()
defer is.mux.Unlock()
is.LastStore = time.Now()
return json.Marshal(is)
}
func UnpackInternalState(buf []byte) (*InternalState, error) {
var is InternalState
if err := json.Unmarshal(buf, &is); err != nil {
return nil, err
}
return &is, nil
}

View File

@ -2,6 +2,7 @@ package db
import (
"blockbook/bchain"
"blockbook/common"
"bytes"
"encoding/binary"
"encoding/hex"
@ -109,15 +110,25 @@ func (d *RocksDB) closeDB() error {
h.Destroy()
}
d.db.Close()
d.db = nil
return nil
}
// Close releases the RocksDB environment opened in NewRocksDB.
func (d *RocksDB) Close() error {
glog.Infof("rocksdb: close")
d.closeDB()
d.wo.Destroy()
d.ro.Destroy()
if d.db != nil {
// store the internal state of the app
if common.IS.DbState == common.DbStateOpen {
common.IS.DbState = common.DbStateClosed
if err := d.StoreInternalState(common.IS); err != nil {
glog.Infof("internalState: ", err)
}
}
glog.Infof("rocksdb: close")
d.closeDB()
d.wo.Destroy()
d.ro.Destroy()
}
return nil
}
@ -860,6 +871,54 @@ func (d *RocksDB) DeleteTx(txid string) error {
return d.db.DeleteCF(d.wo, d.cfh[cfTransactions], key)
}
// internal state
const internalStateKey = "internalState"
// LoadInternalState loads from db internal state or initializes a new one if not yet stored
func (d *RocksDB) LoadInternalState() (*common.InternalState, error) {
val, err := d.db.GetCF(d.ro, d.cfh[cfDefault], []byte(internalStateKey))
if err != nil {
return nil, err
}
defer val.Free()
data := val.Data()
var is *common.InternalState
if len(data) == 0 {
is = &common.InternalState{}
} else {
is, err = common.UnpackInternalState(data)
if err != nil {
return nil, err
}
}
// make sure that column stats match the columns
sc := is.DbColumns
nc := make([]common.InternalStateColumn, len(cfNames))
for i := 0; i < len(nc); i++ {
nc[i].Name = cfNames[i]
for j := 0; j < len(sc); j++ {
if sc[j].Name == nc[i].Name {
nc[i].Version = sc[j].Version
nc[i].Rows = sc[j].Rows
nc[i].KeysSum = sc[j].KeysSum
nc[i].ValuesSum = sc[j].ValuesSum
break
}
}
}
is.DbColumns = nc
return is, nil
}
// StoreInternalState stores the internal state to db
func (d *RocksDB) StoreInternalState(is *common.InternalState) error {
buf, err := is.Pack()
if err != nil {
return err
}
return d.db.PutCF(d.wo, d.cfh[cfDefault], []byte(internalStateKey), buf)
}
// Helpers
func packAddressKey(addrID []byte, height uint32) []byte {