Repeatedly with debounce synchronize index and mempool

pull/1/head
Martin Boehm 2018-02-01 11:24:53 +01:00
parent ed47171406
commit a07c414e72
1 changed files with 31 additions and 7 deletions

View File

@ -189,31 +189,55 @@ func main() {
<-chanSyncMempoolDone
}
func tickAndDebounce(tickTime time.Duration, debounceTime time.Duration, input chan struct{}, f func()) {
timer := time.NewTimer(tickTime)
Loop:
for {
timerChan := timer.C
select {
case _, ok := <-input:
timer.Stop()
// exit loop on closed channel
if !ok {
break Loop
}
// debounce for debounceTime
timer = time.NewTimer(debounceTime)
case <-timerChan:
// do the action and start the loop again
f()
timer = time.NewTimer(tickTime)
}
}
}
func syncIndexLoop() {
defer close(chanSyncIndexDone)
glog.Info("syncIndexLoop starting")
for range chanSyncIndex {
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
tickAndDebounce(935093*time.Millisecond, 1009*time.Millisecond, chanSyncIndex, func() {
if err := resyncIndex(); err != nil {
glog.Error(err)
glog.Error("syncIndexLoop", err)
}
}
})
glog.Info("syncIndexLoop stopped")
}
func syncMempoolLoop() {
defer close(chanSyncMempoolDone)
glog.Info("syncMempoolLoop starting")
for range chanSyncMempool {
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
tickAndDebounce(60017*time.Millisecond, 1009*time.Millisecond, chanSyncMempool, func() {
if err := mempool.Resync(); err != nil {
glog.Error(err)
glog.Error("syncMempoolLoop", err)
}
}
})
glog.Info("syncMempoolLoop stopped")
}
func mqHandler(m *bchain.MQMessage) {
body := hex.EncodeToString(m.Body)
glog.V(2).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body)
glog.V(1).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body)
if m.Topic == "hashblock" {
chanSyncIndex <- struct{}{}
} else if m.Topic == "hashtx" {