Store internal state periodicaly

pull/7/head
Martin Boehm 2018-05-23 08:54:02 +02:00
parent 1a953ea2a4
commit 96f25ce11a
1 changed files with 30 additions and 11 deletions

View File

@ -35,6 +35,9 @@ const resyncMempoolPeriodMs = 60017
// debounce too close requests for resync mempool (ZeroMQ sends message for each tx, when new block there are many transactions)
const debounceResyncMempoolMs = 1009
// store internal state about once every minute
const storeInternalStatePeriodMs = 59699
var (
blockchain = flag.String("blockchaincfg", "", "path to blockchain RPC service configuration json file")
@ -68,17 +71,19 @@ var (
)
var (
chanSyncIndex = make(chan struct{})
chanSyncMempool = make(chan struct{})
chanSyncIndexDone = make(chan struct{})
chanSyncMempoolDone = make(chan struct{})
chain bchain.BlockChain
index *db.RocksDB
txCache *db.TxCache
syncWorker *db.SyncWorker
callbacksOnNewBlockHash []func(hash string)
callbacksOnNewTxAddr []func(txid string, addr string)
chanOsSignal chan os.Signal
chanSyncIndex = make(chan struct{})
chanSyncMempool = make(chan struct{})
chanStoreInternalState = make(chan struct{})
chanSyncIndexDone = make(chan struct{})
chanSyncMempoolDone = make(chan struct{})
chanStoreInternalStateDone = make(chan struct{})
chain bchain.BlockChain
index *db.RocksDB
txCache *db.TxCache
syncWorker *db.SyncWorker
callbacksOnNewBlockHash []func(hash string)
callbacksOnNewTxAddr []func(txid string, addr string)
chanOsSignal chan os.Signal
)
func init() {
@ -261,6 +266,7 @@ func main() {
// start the synchronization loops after the server interfaces are started
go syncIndexLoop()
go syncMempoolLoop()
go storeInternalStateLoop()
}
if *blockFrom >= 0 {
@ -291,8 +297,10 @@ func main() {
if *synchronize {
close(chanSyncIndex)
close(chanSyncMempool)
close(chanStoreInternalState)
<-chanSyncIndexDone
<-chanSyncMempoolDone
<-chanStoreInternalState
}
}
@ -362,6 +370,17 @@ func syncMempoolLoop() {
glog.Info("syncMempoolLoop stopped")
}
func storeInternalStateLoop() {
defer close(chanStoreInternalStateDone)
glog.Info("storeInternalStateLoop starting")
tickAndDebounce(storeInternalStatePeriodMs*time.Millisecond, (storeInternalStatePeriodMs-1)*time.Millisecond, chanStoreInternalState, func() {
if err := index.StoreInternalState(common.IS); err != nil {
glog.Error("storeInternalStateLoop ", errors.ErrorStack(err))
}
})
glog.Info("storeInternalStateLoop stopped")
}
func onNewTxAddr(txid string, addr string) {
for _, c := range callbacksOnNewTxAddr {
c(txid, addr)