blockbook/bchain/mq.go

160 lines
3.9 KiB
Go

package bchain
import (
"context"
"encoding/binary"
"time"
"github.com/golang/glog"
zmq "github.com/pebbe/zmq4"
)
// MQ is message queue listener handle
type MQ struct {
context *zmq.Context
socket *zmq.Socket
isRunning bool
finished chan error
binding string
}
// NotificationType is type of notification
type NotificationType int
const (
// NotificationUnknown is unknown
NotificationUnknown NotificationType = iota
// NotificationNewBlock message is sent when there is a new block to be imported
NotificationNewBlock NotificationType = iota
// NotificationNewTx message is sent when there is a new mempool transaction
NotificationNewTx NotificationType = iota
)
// NewMQ creates new Bitcoind ZeroMQ listener
// callback function receives messages
func NewMQ(binding string, callback func(NotificationType)) (*MQ, error) {
context, err := zmq.NewContext()
if err != nil {
return nil, err
}
socket, err := context.NewSocket(zmq.SUB)
if err != nil {
return nil, err
}
err = socket.SetSubscribe("hashblock")
if err != nil {
return nil, err
}
err = socket.SetSubscribe("hashtx")
if err != nil {
return nil, err
}
// for now do not use raw subscriptions - we would have to handle skipped/lost notifications from zeromq
// on each notification we do sync or syncmempool respectively
// socket.SetSubscribe("rawblock")
// socket.SetSubscribe("rawtx")
err = socket.Connect(binding)
if err != nil {
return nil, err
}
glog.Info("MQ listening to ", binding)
mq := &MQ{context, socket, true, make(chan error), binding}
go mq.run(callback)
return mq, nil
}
func (mq *MQ) run(callback func(NotificationType)) {
defer func() {
if r := recover(); r != nil {
glog.Error("MQ loop recovered from ", r)
}
mq.isRunning = false
glog.Info("MQ loop terminated")
mq.finished <- nil
}()
mq.isRunning = true
repeatedError := false
for {
msg, err := mq.socket.RecvMessageBytes(0)
if err != nil {
if zmq.AsErrno(err) == zmq.Errno(zmq.ETERM) || err.Error() == "Socket is closed" {
break
}
// suppress logging of error for the first time
// programs built with Go 1.14 will receive more signals
// the error should be resolved by retrying the call
// see https://golang.org/doc/go1.14#runtime
if repeatedError {
glog.Error("MQ RecvMessageBytes error ", err, ", ", zmq.AsErrno(err))
}
repeatedError = true
time.Sleep(100 * time.Millisecond)
} else {
repeatedError = false
}
if msg != nil && len(msg) >= 3 {
var nt NotificationType
switch string(msg[0]) {
case "hashblock":
nt = NotificationNewBlock
break
case "hashtx":
nt = NotificationNewTx
break
default:
nt = NotificationUnknown
glog.Infof("MQ: NotificationUnknown %v", string(msg[0]))
}
if glog.V(2) {
sequence := uint32(0)
if len(msg[len(msg)-1]) == 4 {
sequence = binary.LittleEndian.Uint32(msg[len(msg)-1])
}
glog.Infof("MQ: %v %s-%d", nt, string(msg[0]), sequence)
}
callback(nt)
}
}
}
// Shutdown stops listening to the ZeroMQ and closes the connection
func (mq *MQ) Shutdown(ctx context.Context) error {
glog.Info("MQ server shutdown")
if mq.isRunning {
go func() {
// if errors in the closing sequence, let it close ungracefully
if err := mq.socket.SetUnsubscribe("hashtx"); err != nil {
mq.finished <- err
return
}
if err := mq.socket.SetUnsubscribe("hashblock"); err != nil {
mq.finished <- err
return
}
if err := mq.socket.Unbind(mq.binding); err != nil {
mq.finished <- err
return
}
if err := mq.socket.Close(); err != nil {
mq.finished <- err
return
}
if err := mq.context.Term(); err != nil {
mq.finished <- err
return
}
}()
var err error
select {
case <-ctx.Done():
err = ctx.Err()
case err = <-mq.finished:
}
if err != nil {
return err
}
}
glog.Info("MQ server shutdown finished")
return nil
}