Use single instance instead of global InternalState object

pull/7/head
Martin Boehm 2018-05-29 11:37:35 +02:00
parent f7ce91b4e4
commit 9147781772
6 changed files with 39 additions and 31 deletions

View File

@ -81,6 +81,7 @@ var (
index *db.RocksDB
txCache *db.TxCache
syncWorker *db.SyncWorker
internalState *common.InternalState
callbacksOnNewBlockHash []func(hash string)
callbacksOnNewTxAddr []func(txid string, addr string)
chanOsSignal chan os.Signal
@ -156,22 +157,23 @@ func main() {
}
defer index.Close()
common.IS, err = index.LoadInternalState(*coin)
internalState, err = newInternalState(*coin, index)
if err != nil {
glog.Fatal("internalState: ", err)
}
if common.IS.DbState != common.DbStateClosed {
glog.Warning("internalState: database in not closed state ", common.IS.DbState, ", possibly previous ungraceful shutdown")
index.SetInternalState(internalState)
if internalState.DbState != common.DbStateClosed {
glog.Warning("internalState: database in not closed state ", internalState.DbState, ", possibly previous ungraceful shutdown")
}
syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics)
syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics, internalState)
if err != nil {
glog.Fatalf("NewSyncWorker %v", err)
}
// set the DbState to open at this moment, after all important workers are initialized
common.IS.DbState = common.DbStateOpen
err = index.StoreInternalState(common.IS)
internalState.DbState = common.DbStateOpen
err = index.StoreInternalState(internalState)
if err != nil {
glog.Fatal("internalState: ", err)
}
@ -242,7 +244,7 @@ func main() {
var socketIoServer *server.SocketIoServer
if *socketIoBinding != "" {
socketIoServer, err = server.NewSocketIoServer(
*socketIoBinding, *certFiles, index, chain, txCache, *explorerURL, metrics)
*socketIoBinding, *certFiles, index, chain, txCache, *explorerURL, metrics, internalState)
if err != nil {
glog.Error("socketio: ", err)
return
@ -304,6 +306,10 @@ func main() {
}
}
func newInternalState(coin string, d *db.RocksDB) (*common.InternalState, error) {
return d.LoadInternalState(coin)
}
func tickAndDebounce(tickTime time.Duration, debounceTime time.Duration, input chan struct{}, f func()) {
timer := time.NewTimer(tickTime)
var firstDebounce time.Time
@ -360,11 +366,11 @@ func syncMempoolLoop() {
glog.Info("syncMempoolLoop starting")
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
tickAndDebounce(resyncMempoolPeriodMs*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() {
common.IS.StartedMempoolSync()
internalState.StartedMempoolSync()
if err := chain.ResyncMempool(onNewTxAddr); err != nil {
glog.Error("syncMempoolLoop ", errors.ErrorStack(err))
} else {
common.IS.FinishedMempoolSync()
internalState.FinishedMempoolSync()
}
})
glog.Info("syncMempoolLoop stopped")
@ -374,7 +380,7 @@ func storeInternalStateLoop() {
defer close(chanStoreInternalStateDone)
glog.Info("storeInternalStateLoop starting")
tickAndDebounce(storeInternalStatePeriodMs*time.Millisecond, (storeInternalStatePeriodMs-1)*time.Millisecond, chanStoreInternalState, func() {
if err := index.StoreInternalState(common.IS); err != nil {
if err := index.StoreInternalState(internalState); err != nil {
glog.Error("storeInternalStateLoop ", errors.ErrorStack(err))
}
})

View File

@ -42,9 +42,6 @@ type InternalState struct {
DbColumns []InternalStateColumn `json:"dbColumns"`
}
// IS is a singleton holding internal state of the application
var IS *InternalState
// StartedSync signals start of synchronization
func (is *InternalState) StartedSync() {
is.mux.Lock()

View File

@ -36,6 +36,7 @@ type RocksDB struct {
ro *gorocksdb.ReadOptions
cfh []*gorocksdb.ColumnFamilyHandle
chainParser bchain.BlockChainParser
is *common.InternalState
}
const (
@ -102,7 +103,7 @@ func NewRocksDB(path string, parser bchain.BlockChainParser) (d *RocksDB, err er
wo := gorocksdb.NewDefaultWriteOptions()
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
return &RocksDB{path, db, wo, ro, cfh, parser}, nil
return &RocksDB{path, db, wo, ro, cfh, parser, nil}, nil
}
func (d *RocksDB) closeDB() error {
@ -118,9 +119,9 @@ func (d *RocksDB) closeDB() error {
func (d *RocksDB) Close() error {
if d.db != nil {
// store the internal state of the app
if common.IS.DbState == common.DbStateOpen {
common.IS.DbState = common.DbStateClosed
if err := d.StoreInternalState(common.IS); err != nil {
if d.is != nil && d.is.DbState == common.DbStateOpen {
d.is.DbState = common.DbStateClosed
if err := d.StoreInternalState(d.is); err != nil {
glog.Infof("internalState: ", err)
}
}
@ -917,6 +918,11 @@ func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, erro
return is, nil
}
// SetInternalState sets the InternalState to be used by db to collect internal state
func (d *RocksDB) SetInternalState(is *common.InternalState) {
d.is = is
}
// StoreInternalState stores the internal state to db
func (d *RocksDB) StoreInternalState(is *common.InternalState) error {
buf, err := is.Pack()

View File

@ -3,7 +3,6 @@ package db
import (
"blockbook/bchain"
"blockbook/bchain/coins/btc"
"blockbook/common"
"encoding/hex"
"fmt"
"io/ioutil"
@ -19,7 +18,7 @@ import (
// simplified explanation of signed varint packing, used in many index data structures
// for number n, the packing is: 2*n if n>=0 else 2*(-n)-1
// take only 1 byte if abs(n)<127
// takes only 1 byte if abs(n)<127
func setupRocksDB(t *testing.T, p bchain.BlockChainParser) *RocksDB {
tmp, err := ioutil.TempDir("", "testdb")
@ -30,10 +29,6 @@ func setupRocksDB(t *testing.T, p bchain.BlockChainParser) *RocksDB {
if err != nil {
t.Fatal(err)
}
common.IS, err = d.LoadInternalState("btc-testnet")
if err != nil {
t.Fatal("internalState: ", err)
}
return d
}

View File

@ -22,10 +22,11 @@ type SyncWorker struct {
startHash string
chanOsSignal chan os.Signal
metrics *common.Metrics
is *common.InternalState
}
// NewSyncWorker creates new SyncWorker and returns its handle
func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal, metrics *common.Metrics) (*SyncWorker, error) {
func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal, metrics *common.Metrics, is *common.InternalState) (*SyncWorker, error) {
if minStartHeight < 0 {
minStartHeight = 0
}
@ -38,6 +39,7 @@ func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk
startHeight: uint32(minStartHeight),
chanOsSignal: chanOsSignal,
metrics: metrics,
is: is,
}, nil
}
@ -47,7 +49,7 @@ var errSynced = errors.New("synced")
// onNewBlock is called when new block is connected, but not in initial parallel sync
func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
start := time.Now()
common.IS.StartedSync()
w.is.StartedSync()
err := w.resyncIndex(onNewBlock)
@ -59,12 +61,12 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk()))
bh, _, err := w.db.GetBestBlock()
if err == nil {
common.IS.FinishedSync(bh)
w.is.FinishedSync(bh)
}
return nil
case errSynced:
// this is not actually error but flag that resync wasn't necessary
common.IS.FinishedSyncNoChange()
w.is.FinishedSyncNoChange()
return nil
}

View File

@ -33,10 +33,11 @@ type SocketIoServer struct {
chainParser bchain.BlockChainParser
explorerURL string
metrics *common.Metrics
is *common.InternalState
}
// NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle
func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics) (*SocketIoServer, error) {
func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) {
server := gosocketio.NewServer(transport.GetDefaultWebsocketTransport())
server.On(gosocketio.OnConnection, func(c *gosocketio.Channel) {
@ -76,6 +77,7 @@ func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain b
chainParser: chain.GetChainParser(),
explorerURL: explorerURL,
metrics: metrics,
is: is,
}
// support for tests of socket.io interface
@ -155,10 +157,10 @@ type resAboutBlockbookPublic struct {
func (s *SocketIoServer) index(w http.ResponseWriter, r *http.Request) {
vi := common.GetVersionInfo()
ss, bh, st := common.IS.GetSyncState()
ms, mt := common.IS.GetMempoolSyncState()
ss, bh, st := s.is.GetSyncState()
ms, mt := s.is.GetMempoolSyncState()
a := resAboutBlockbookPublic{
Coin: common.IS.Coin,
Coin: s.is.Coin,
About: blockbookAbout,
Version: vi.Version,
GitCommit: vi.GitCommit,