Improve ZeroMQ startup/shutdown
parent
e8b260a393
commit
531da09227
17
bchain/mq.go
17
bchain/mq.go
|
@ -33,13 +33,22 @@ func NewMQ(binding string, callback func(*MQMessage)) (*MQ, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
socket.SetSubscribe("hashblock")
|
||||
socket.SetSubscribe("hashtx")
|
||||
err = socket.SetSubscribe("hashblock")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = socket.SetSubscribe("hashtx")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// for now do not use raw subscriptions - we would have to handle skipped/lost notifications from zeromq
|
||||
// on each notification we do sync or syncmempool respectively
|
||||
// socket.SetSubscribe("rawblock")
|
||||
// socket.SetSubscribe("rawtx")
|
||||
socket.Connect(binding)
|
||||
err = socket.Connect(binding)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.Info("MQ listening to ", binding)
|
||||
mq := &MQ{context, socket, true, make(chan bool)}
|
||||
go mq.run(callback)
|
||||
|
@ -78,6 +87,8 @@ func (mq *MQ) run(callback func(*MQMessage)) {
|
|||
func (mq *MQ) Shutdown() error {
|
||||
glog.Info("MQ server shutdown")
|
||||
if mq.isRunning {
|
||||
mq.socket.SetUnsubscribe("hashtx")
|
||||
mq.socket.SetUnsubscribe("hashblock")
|
||||
// if errors in socket.Close or context.Term, let it close ungracefully
|
||||
if err := mq.socket.Close(); err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue