Use context to limit shutdown time of bchain/mq

pull/7/head
Martin Boehm 2018-05-30 14:37:30 +02:00
parent faaff8643d
commit a899f9e97a
6 changed files with 58 additions and 31 deletions

View File

@ -7,6 +7,7 @@ import (
"blockbook/bchain/coins/eth"
"blockbook/bchain/coins/zec"
"blockbook/common"
"context"
"encoding/json"
"fmt"
"io/ioutil"
@ -74,8 +75,8 @@ func (c *blockChainWithMetrics) Initialize() error {
return c.b.Initialize()
}
func (c *blockChainWithMetrics) Shutdown() error {
return c.b.Shutdown()
func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error {
return c.b.Shutdown(ctx)
}
func (c *blockChainWithMetrics) IsTestnet() bool {

View File

@ -3,6 +3,7 @@ package btc
import (
"blockbook/bchain"
"bytes"
"context"
"encoding/hex"
"encoding/json"
"io"
@ -135,9 +136,9 @@ func (b *BitcoinRPC) Initialize() error {
return nil
}
func (b *BitcoinRPC) Shutdown() error {
func (b *BitcoinRPC) Shutdown(ctx context.Context) error {
if b.mq != nil {
if err := b.mq.Shutdown(); err != nil {
if err := b.mq.Shutdown(ctx); err != nil {
glog.Error("MQ.Shutdown error: ", err)
return err
}

View File

@ -217,7 +217,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error
}
// Shutdown cleans up rpc interface to ethereum
func (b *EthereumRPC) Shutdown() error {
func (b *EthereumRPC) Shutdown(ctx context.Context) error {
if b.newBlockSubscription != nil {
b.newBlockSubscription.Unsubscribe()
}

View File

@ -1,6 +1,7 @@
package bchain
import (
"context"
"encoding/binary"
"time"
@ -13,7 +14,7 @@ type MQ struct {
context *zmq.Context
socket *zmq.Socket
isRunning bool
finished chan bool
finished chan error
binding string
}
@ -57,7 +58,7 @@ func NewMQ(binding string, callback func(NotificationType)) (*MQ, error) {
return nil, err
}
glog.Info("MQ listening to ", binding)
mq := &MQ{context, socket, true, make(chan bool), binding}
mq := &MQ{context, socket, true, make(chan error), binding}
go mq.run(callback)
return mq, nil
}
@ -69,7 +70,7 @@ func (mq *MQ) run(callback func(NotificationType)) {
}
mq.isRunning = false
glog.Info("MQ loop terminated")
mq.finished <- true
mq.finished <- nil
}()
mq.isRunning = true
for {
@ -107,27 +108,42 @@ func (mq *MQ) run(callback func(NotificationType)) {
}
// Shutdown stops listening to the ZeroMQ and closes the connection
func (mq *MQ) Shutdown() error {
func (mq *MQ) Shutdown(ctx context.Context) error {
glog.Info("MQ server shutdown")
if mq.isRunning {
// if errors in the closing sequence, let it close ungracefully
if err := mq.socket.SetUnsubscribe("hashtx"); err != nil {
go func() {
// if errors in the closing sequence, let it close ungracefully
if err := mq.socket.SetUnsubscribe("hashtx"); err != nil {
mq.finished <- err
return
}
if err := mq.socket.SetUnsubscribe("hashblock"); err != nil {
mq.finished <- err
return
}
if err := mq.socket.Unbind(mq.binding); err != nil {
mq.finished <- err
return
}
if err := mq.socket.Close(); err != nil {
mq.finished <- err
return
}
if err := mq.context.Term(); err != nil {
mq.finished <- err
return
}
}()
var err error
select {
case <-ctx.Done():
err = ctx.Err()
case err = <-mq.finished:
}
if err != nil {
return err
}
if err := mq.socket.SetUnsubscribe("hashblock"); err != nil {
return err
}
if err := mq.socket.Unbind(mq.binding); err != nil {
return err
}
if err := mq.socket.Close(); err != nil {
return err
}
if err := mq.context.Term(); err != nil {
return err
}
<-mq.finished
glog.Info("MQ server shutdown finished")
}
glog.Info("MQ server shutdown finished")
return nil
}

View File

@ -1,6 +1,7 @@
package bchain
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -115,7 +116,7 @@ func (e *RPCError) Error() string {
type BlockChain interface {
// life-cycle methods
Initialize() error
Shutdown() error
Shutdown(ctx context.Context) error
// chain info
IsTestnet() bool
GetNetworkName() string

View File

@ -6,6 +6,7 @@ import (
"log"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"
@ -29,7 +30,7 @@ const resyncIndexPeriodMs = 935093
// debounce too close requests for resync
const debounceResyncIndexMs = 1009
// resync mempool at least each resyncIndexPeriodMs (could be more often if invoked by message from ZeroMQ)
// resync mempool at least each resyncMempoolPeriodMs (could be more often if invoked by message from ZeroMQ)
const resyncMempoolPeriodMs = 60017
// debounce too close requests for resync mempool (ZeroMQ sends message for each tx, when new block there are many transactions)
@ -85,6 +86,7 @@ var (
callbacksOnNewBlockHash []func(hash string)
callbacksOnNewTxAddr []func(txid string, addr string)
chanOsSignal chan os.Signal
inShutdown int32
)
func init() {
@ -293,7 +295,7 @@ func main() {
}
if httpServer != nil || socketIoServer != nil || chain != nil {
waitForSignalAndShutdown(httpServer, socketIoServer, chain, 5*time.Second)
waitForSignalAndShutdown(httpServer, socketIoServer, chain, 10*time.Second)
}
if *synchronize {
@ -335,8 +337,10 @@ Loop:
timer.Reset(0)
}
case <-timer.C:
// do the action and start the loop again
f()
// do the action, if not in shutdown, then start the loop again
if atomic.LoadInt32(&inShutdown) == 0 {
f()
}
timer.Reset(tickTime)
firstDebounce = time.Time{}
}
@ -394,6 +398,9 @@ func onNewTxAddr(txid string, addr string) {
}
func pushSynchronizationHandler(nt bchain.NotificationType) {
if atomic.LoadInt32(&inShutdown) != 0 {
return
}
glog.V(1).Infof("MQ: notification ", nt)
if nt == bchain.NotificationNewBlock {
chanSyncIndex <- struct{}{}
@ -406,6 +413,7 @@ func pushSynchronizationHandler(nt bchain.NotificationType) {
func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketIoServer, chain bchain.BlockChain, timeout time.Duration) {
sig := <-chanOsSignal
atomic.StoreInt32(&inShutdown, 1)
glog.Infof("Shutdown: %v", sig)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
@ -424,7 +432,7 @@ func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketI
}
if chain != nil {
if err := chain.Shutdown(); err != nil {
if err := chain.Shutdown(ctx); err != nil {
glog.Error("BlockChain.Shutdown error: ", err)
}
}