diff --git a/blockbook.go b/blockbook.go index 9df288f1..7f70cbe4 100644 --- a/blockbook.go +++ b/blockbook.go @@ -235,8 +235,8 @@ func main() { return } } else if !*synchronize { - if err = syncWorker.ConnectBlocksParallelInChunks(height, until); err != nil { - glog.Error("connectBlocksParallelInChunks ", err) + if err = syncWorker.ConnectBlocksParallel(height, until); err != nil { + glog.Error("connectBlocksParallel ", err) return } } diff --git a/db/sync.go b/db/sync.go index e0216583..4557a738 100644 --- a/db/sync.go +++ b/db/sync.go @@ -115,7 +115,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock func(hash string)) error { } if remoteBestHeight-w.startHeight > uint32(w.syncChunk) { glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers) - err = w.connectBlocksParallel(w.startHeight, remoteBestHeight) + err = w.ConnectBlocksParallel(w.startHeight, remoteBestHeight) if err != nil { return err } @@ -184,7 +184,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error { return nil } -func (w *SyncWorker) connectBlocksParallel(lower, higher uint32) error { +func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { type hashHeight struct { hash string height uint32 @@ -194,6 +194,11 @@ func (w *SyncWorker) connectBlocksParallel(lower, higher uint32) error { hch := make(chan hashHeight, w.syncWorkers) hchClosed := atomic.Value{} hchClosed.Store(false) + lastConnectedBlock := int(lower) - 1 + connectedCh := make([]chan struct{}, w.syncWorkers) + var connectedMux sync.Mutex + totalWaitDuration := time.Duration(0) + totalWaitCount := 0 work := func(i int) { defer wg.Done() var err error @@ -217,10 +222,52 @@ func (w *SyncWorker) connectBlocksParallel(lower, higher uint32) error { if w.dryRun { continue } + // check if the block is the next in line to be connected + // if not, wait for the previous block connect to complete + chi := int(hh.height) % w.syncWorkers + waitForBlock := false + waitDuration := time.Duration(0) + glog.Info(i, " Going to connect block ", hh.height) + connectedMux.Lock() + if uint32(lastConnectedBlock+1) != hh.height { + if connectedCh[chi] != nil { + glog.Fatal("Channel ", chi, " is not nil!") + } + connectedCh[chi] = make(chan struct{}) + waitForBlock = true + } + connectedMux.Unlock() + if waitForBlock { + start := time.Now() + glog.Info(i, " Waiting for block ", hh.height, " ", chi) + <-connectedCh[chi] + if hchClosed.Load() == true { + glog.Error("Worker ", i, " connect block error ", err, ". Exiting...") + return + } + waitDuration = time.Since(start) + connectedCh[chi] = nil + } err = w.db.ConnectBlock(block) if err != nil { glog.Error("Worker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err) } + connectedMux.Lock() + if lastConnectedBlock < int(hh.height) { + lastConnectedBlock = int(hh.height) + } + chi = (chi + 1) % w.syncWorkers + if connectedCh[chi] != nil { + glog.Info(i, " closing channel ", chi) + close(connectedCh[chi]) + connectedCh[chi] = nil + } + totalWaitDuration += waitDuration + if waitDuration > 0 { + totalWaitCount++ + } + glog.Info("Connected block ", hh.height) + connectedMux.Unlock() } glog.Info("Worker ", i, " exiting...") } @@ -246,7 +293,7 @@ ConnectLoop: } hch <- hashHeight{hash, h} if h > 0 && h%1000 == 0 { - glog.Info("connecting block ", h, " ", hash) + glog.Info("connecting block ", h, " ", hash, " block wait time ", totalWaitDuration, " wait count ", totalWaitCount) } h++ } @@ -254,6 +301,13 @@ ConnectLoop: close(hch) // signal stop to workers that are in w.chain.GetBlockWithoutHeader error loop hchClosed.Store(true) + connectedMux.Lock() + for _, ch := range connectedCh { + if ch != nil { + close(ch) + } + } + connectedMux.Unlock() wg.Wait() return err } @@ -296,39 +350,6 @@ func (w *SyncWorker) connectBlockChunk(lower, higher uint32) error { return nil } -// ConnectBlocksParallelInChunks connect blocks in chunks -func (w *SyncWorker) ConnectBlocksParallelInChunks(lower, higher uint32) error { - var wg sync.WaitGroup - - work := func(i int) { - defer wg.Done() - - offset := uint32(w.syncChunk * i) - stride := uint32(w.syncChunk * w.syncWorkers) - - for low := lower + offset; low <= higher; low += stride { - high := low + uint32(w.syncChunk-1) - if high > higher { - high = higher - } - err := w.connectBlockChunk(low, high) - if err != nil { - if err == bchain.ErrBlockNotFound { - break - } - glog.Fatalf("connectBlocksParallel %d-%d %v", low, high, err) - } - } - } - for i := 0; i < w.syncWorkers; i++ { - wg.Add(1) - go work(i) - } - wg.Wait() - - return nil -} - func (w *SyncWorker) isBlockConnected(height uint32) (bool, error) { local, err := w.db.GetBlockHash(height) if err != nil {