Interrupt connectBlocks on OS signal while initial sync
parent
2531c789dd
commit
db39b5ef51
|
@ -260,7 +260,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if *synchronize {
|
if *synchronize {
|
||||||
if err := syncWorker.ResyncIndex(nil); err != nil {
|
if err := syncWorker.ResyncIndex(nil, true); err != nil {
|
||||||
glog.Error("resyncIndex ", err)
|
glog.Error("resyncIndex ", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -392,7 +392,7 @@ func syncIndexLoop() {
|
||||||
glog.Info("syncIndexLoop starting")
|
glog.Info("syncIndexLoop starting")
|
||||||
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
|
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
|
||||||
tickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
|
tickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
|
||||||
if err := syncWorker.ResyncIndex(onNewBlockHash); err != nil {
|
if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil {
|
||||||
glog.Error("syncIndexLoop ", errors.ErrorStack(err))
|
glog.Error("syncIndexLoop ", errors.ErrorStack(err))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
51
db/sync.go
51
db/sync.go
|
@ -47,11 +47,11 @@ var errSynced = errors.New("synced")
|
||||||
|
|
||||||
// ResyncIndex synchronizes index to the top of the blockchain
|
// ResyncIndex synchronizes index to the top of the blockchain
|
||||||
// onNewBlock is called when new block is connected, but not in initial parallel sync
|
// onNewBlock is called when new block is connected, but not in initial parallel sync
|
||||||
func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
|
func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string), initialSync bool) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
w.is.StartedSync()
|
w.is.StartedSync()
|
||||||
|
|
||||||
err := w.resyncIndex(onNewBlock)
|
err := w.resyncIndex(onNewBlock, initialSync)
|
||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
|
@ -75,7 +75,7 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
|
func (w *SyncWorker) resyncIndex(onNewBlock func(hash string), initialSync bool) error {
|
||||||
remoteBestHash, err := w.chain.GetBestBlockHash()
|
remoteBestHash, err := w.chain.GetBestBlockHash()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -98,7 +98,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
|
||||||
if remoteHash != localBestHash {
|
if remoteHash != localBestHash {
|
||||||
// forked - the remote hash differs from the local hash at the same height
|
// forked - the remote hash differs from the local hash at the same height
|
||||||
glog.Info("resync: local is forked at height ", localBestHeight, ", local hash ", localBestHash, ", remote hash", remoteHash)
|
glog.Info("resync: local is forked at height ", localBestHeight, ", local hash ", localBestHash, ", remote hash", remoteHash)
|
||||||
return w.handleFork(localBestHeight, localBestHash, onNewBlock)
|
return w.handleFork(localBestHeight, localBestHash, onNewBlock, initialSync)
|
||||||
}
|
}
|
||||||
glog.Info("resync: local at ", localBestHeight, " is behind")
|
glog.Info("resync: local at ", localBestHeight, " is behind")
|
||||||
w.startHeight = localBestHeight + 1
|
w.startHeight = localBestHeight + 1
|
||||||
|
@ -129,13 +129,13 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error {
|
||||||
}
|
}
|
||||||
// after parallel load finish the sync using standard way,
|
// after parallel load finish the sync using standard way,
|
||||||
// new blocks may have been created in the meantime
|
// new blocks may have been created in the meantime
|
||||||
return w.resyncIndex(onNewBlock)
|
return w.resyncIndex(onNewBlock, initialSync)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return w.connectBlocks(onNewBlock)
|
return w.connectBlocks(onNewBlock, initialSync)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock func(hash string)) error {
|
func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, onNewBlock func(hash string), initialSync bool) error {
|
||||||
// find forked blocks, disconnect them and then synchronize again
|
// find forked blocks, disconnect them and then synchronize again
|
||||||
var height uint32
|
var height uint32
|
||||||
hashes := []string{localBestHash}
|
hashes := []string{localBestHash}
|
||||||
|
@ -160,18 +160,19 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on
|
||||||
if err := w.DisconnectBlocks(height+1, localBestHeight, hashes); err != nil {
|
if err := w.DisconnectBlocks(height+1, localBestHeight, hashes); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return w.resyncIndex(onNewBlock)
|
return w.resyncIndex(onNewBlock, initialSync)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error {
|
func (w *SyncWorker) connectBlocks(onNewBlock func(hash string), initialSync bool) error {
|
||||||
bch := make(chan blockResult, 8)
|
bch := make(chan blockResult, 8)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
defer close(done)
|
defer close(done)
|
||||||
|
|
||||||
go w.getBlockChain(bch, done)
|
go w.getBlockChain(bch, done)
|
||||||
|
|
||||||
var lastRes blockResult
|
var lastRes, empty blockResult
|
||||||
for res := range bch {
|
|
||||||
|
connect := func(res blockResult) error {
|
||||||
lastRes = res
|
lastRes = res
|
||||||
if res.err != nil {
|
if res.err != nil {
|
||||||
return res.err
|
return res.err
|
||||||
|
@ -186,6 +187,34 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error {
|
||||||
if res.block.Height > 0 && res.block.Height%1000 == 0 {
|
if res.block.Height > 0 && res.block.Height%1000 == 0 {
|
||||||
glog.Info("connected block ", res.block.Height, " ", res.block.Hash)
|
glog.Info("connected block ", res.block.Height, " ", res.block.Hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if initialSync {
|
||||||
|
ConnectLoop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-w.chanOsSignal:
|
||||||
|
return errors.Errorf("connectBlocks interrupted at height %d", lastRes.block.Height)
|
||||||
|
case res := <-bch:
|
||||||
|
if res == empty {
|
||||||
|
break ConnectLoop
|
||||||
|
}
|
||||||
|
err := connect(res)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// while regular sync, OS sig is handled by waitForSignalAndShutdown
|
||||||
|
for res := range bch {
|
||||||
|
err := connect(res)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if lastRes.block != nil {
|
if lastRes.block != nil {
|
||||||
|
|
Loading…
Reference in New Issue