From db39b5ef51c7309ec9ef92d15d4cd6c9492b57c2 Mon Sep 17 00:00:00 2001 From: Jakub Matys Date: Tue, 25 Sep 2018 11:20:58 +0200 Subject: [PATCH] Interrupt connectBlocks on OS signal while initial sync --- blockbook.go | 4 ++-- db/sync.go | 51 ++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/blockbook.go b/blockbook.go index a02ed158..c4888ce1 100644 --- a/blockbook.go +++ b/blockbook.go @@ -260,7 +260,7 @@ func main() { } if *synchronize { - if err := syncWorker.ResyncIndex(nil); err != nil { + if err := syncWorker.ResyncIndex(nil, true); err != nil { glog.Error("resyncIndex ", err) return } @@ -392,7 +392,7 @@ func syncIndexLoop() { glog.Info("syncIndexLoop starting") // resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second tickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() { - if err := syncWorker.ResyncIndex(onNewBlockHash); err != nil { + if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil { glog.Error("syncIndexLoop ", errors.ErrorStack(err)) } }) diff --git a/db/sync.go b/db/sync.go index ca31dff3..32093ab6 100644 --- a/db/sync.go +++ b/db/sync.go @@ -47,11 +47,11 @@ var errSynced = errors.New("synced") // ResyncIndex synchronizes index to the top of the blockchain // onNewBlock is called when new block is connected, but not in initial parallel sync -func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { +func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string), initialSync bool) error { start := time.Now() w.is.StartedSync() - err := w.resyncIndex(onNewBlock) + err := w.resyncIndex(onNewBlock, initialSync) switch err { case nil: @@ -75,7 +75,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { return err } -func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { +func (w *SyncWorker) resyncIndex(onNewBlock func(hash string), initialSync bool) error { remoteBestHash, err := w.chain.GetBestBlockHash() if err != nil { return err @@ -98,7 +98,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { if remoteHash != localBestHash { // forked - the remote hash differs from the local hash at the same height glog.Info("resync: local is forked at height ", localBestHeight, ", local hash ", localBestHash, ", remote hash", remoteHash) - return w.handleFork(localBestHeight, localBestHash, onNewBlock) + return w.handleFork(localBestHeight, localBestHash, onNewBlock, initialSync) } glog.Info("resync: local at ", localBestHeight, " is behind") w.startHeight = localBestHeight + 1 @@ -129,13 +129,13 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { } // after parallel load finish the sync using standard way, // new blocks may have been created in the meantime - return w.resyncIndex(onNewBlock) + return w.resyncIndex(onNewBlock, initialSync) } } - return w.connectBlocks(onNewBlock) + return w.connectBlocks(onNewBlock, initialSync) } -func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock func(hash string)) error { +func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock func(hash string), initialSync bool) error { // find forked blocks, disconnect them and then synchronize again var height uint32 hashes := []string{localBestHash} @@ -160,18 +160,19 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on if err := w.DisconnectBlocks(height+1, localBestHeight, hashes); err != nil { return err } - return w.resyncIndex(onNewBlock) + return w.resyncIndex(onNewBlock, initialSync) } -func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { +func (w *SyncWorker) connectBlocks(onNewBlock func(hash string), initialSync bool) error { bch := make(chan blockResult, 8) done := make(chan struct{}) defer close(done) go w.getBlockChain(bch, done) - var lastRes blockResult - for res := range bch { + var lastRes, empty blockResult + + connect := func(res blockResult) error { lastRes = res if res.err != nil { return res.err @@ -186,6 +187,34 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { if res.block.Height > 0 && res.block.Height%1000 == 0 { glog.Info("connected block ", res.block.Height, " ", res.block.Hash) } + + return nil + } + + if initialSync { + ConnectLoop: + for { + select { + case <-w.chanOsSignal: + return errors.Errorf("connectBlocks interrupted at height %d", lastRes.block.Height) + case res := <-bch: + if res == empty { + break ConnectLoop + } + err := connect(res) + if err != nil { + return err + } + } + } + } else { + // while regular sync, OS sig is handled by waitForSignalAndShutdown + for res := range bch { + err := connect(res) + if err != nil { + return err + } + } } if lastRes.block != nil {