Improve handling of os interrupt signal in connectBlocksParallel
parent
98d91d4ce1
commit
cac412527d
32
db/sync.go
32
db/sync.go
|
@ -4,6 +4,7 @@ import (
|
|||
"blockbook/bchain"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -171,32 +172,36 @@ func (w *SyncWorker) connectBlocksParallel(lower, higher uint32) error {
|
|||
var err error
|
||||
var wg sync.WaitGroup
|
||||
hch := make(chan hashHeight, w.syncWorkers)
|
||||
running := make([]bool, w.syncWorkers)
|
||||
hchClosed := atomic.Value{}
|
||||
hchClosed.Store(false)
|
||||
work := func(i int) {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
var block *bchain.Block
|
||||
for hh := range hch {
|
||||
running[i] = true
|
||||
for {
|
||||
block, err = w.chain.GetBlockWithoutHeader(hh.hash, hh.height)
|
||||
if err != nil {
|
||||
glog.Error("Connect block error ", err, ". Retrying...")
|
||||
// signal came while looping in the error loop
|
||||
if hchClosed.Load() == true {
|
||||
glog.Error("Worker ", i, " connect block error ", err, ". Exiting...")
|
||||
return
|
||||
}
|
||||
glog.Error("Worker ", i, " connect block error ", err, ". Retrying...")
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if w.dryRun {
|
||||
running[i] = false
|
||||
continue
|
||||
}
|
||||
err = w.db.ConnectBlock(block)
|
||||
if err != nil {
|
||||
glog.Error("Connect block ", hh.height, " ", hh.hash, " error ", err)
|
||||
glog.Error("Worker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err)
|
||||
}
|
||||
running[i] = false
|
||||
}
|
||||
glog.Info("Worker ", i, " exiting...")
|
||||
}
|
||||
for i := 0; i < w.syncWorkers; i++ {
|
||||
wg.Add(1)
|
||||
|
@ -208,19 +213,6 @@ ConnectLoop:
|
|||
for h := lower; h <= higher; {
|
||||
select {
|
||||
case <-w.chanOsSignal:
|
||||
// wait for the workers to finish block
|
||||
i := 0
|
||||
WaitAgain:
|
||||
for ; i < 60; i++ {
|
||||
for _, r := range running {
|
||||
if r {
|
||||
glog.Info("Waiting ", i, "s for workers to finish ", running)
|
||||
time.Sleep(time.Millisecond * 1000)
|
||||
continue WaitAgain
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
err = errors.Errorf("connectBlocksParallel interrupted at height %d", h)
|
||||
break ConnectLoop
|
||||
default:
|
||||
|
@ -238,6 +230,8 @@ ConnectLoop:
|
|||
}
|
||||
}
|
||||
close(hch)
|
||||
// signal stop to workers that are in w.chain.GetBlockWithoutHeader error loop
|
||||
hchClosed.Store(true)
|
||||
wg.Wait()
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue