blockbook/bchain/mq.go

93 lines
2.2 KiB
Go
Raw Normal View History

2018-01-31 07:23:17 -07:00
package bchain
2018-01-19 07:58:46 -07:00
import (
"encoding/binary"
"github.com/golang/glog"
2018-01-19 07:58:46 -07:00
zmq "github.com/pebbe/zmq4"
)
// MQ is message queue listener handle
type MQ struct {
context *zmq.Context
socket *zmq.Socket
isRunning bool
finished chan bool
}
// MQMessage contains data received from Bitcoind message queue
type MQMessage struct {
Topic string
Sequence uint32
Body []byte
}
2018-01-31 09:51:48 -07:00
// NewMQ creates new Bitcoind ZeroMQ listener
// callback function receives messages
2018-01-31 07:23:17 -07:00
func NewMQ(binding string, callback func(*MQMessage)) (*MQ, error) {
2018-01-19 07:58:46 -07:00
context, err := zmq.NewContext()
if err != nil {
return nil, err
2018-01-19 07:58:46 -07:00
}
socket, err := context.NewSocket(zmq.SUB)
if err != nil {
return nil, err
2018-01-19 07:58:46 -07:00
}
socket.SetSubscribe("hashblock")
socket.SetSubscribe("hashtx")
2018-01-31 07:03:06 -07:00
// for now do not use raw subscriptions - we would have to handle skipped/lost notifications from zeromq
// on each notification we do sync or syncmempool respectively
// socket.SetSubscribe("rawblock")
// socket.SetSubscribe("rawtx")
2018-01-19 07:58:46 -07:00
socket.Connect(binding)
glog.Info("MQ listening to ", binding)
mq := &MQ{context, socket, true, make(chan bool)}
go mq.run(callback)
return mq, nil
}
func (mq *MQ) run(callback func(*MQMessage)) {
mq.isRunning = true
for {
msg, err := mq.socket.RecvMessageBytes(0)
2018-01-19 07:58:46 -07:00
if err != nil {
if zmq.AsErrno(err) == zmq.Errno(zmq.ETERM) {
close(mq.finished)
glog.Info("MQ loop terminated")
break
}
glog.Error("MQ RecvMessageBytes error ", err)
}
if msg != nil && len(msg) >= 3 {
sequence := uint32(0)
if len(msg[len(msg)-1]) == 4 {
sequence = binary.LittleEndian.Uint32(msg[len(msg)-1])
}
m := &MQMessage{
Topic: string(msg[0]),
Sequence: sequence,
Body: msg[1],
}
callback(m)
}
}
mq.isRunning = false
}
// Shutdown stops listening to the ZeroMQ and closes the connection
func (mq *MQ) Shutdown() error {
glog.Info("MQ server shutdown")
if mq.isRunning {
// if errors in socket.Close or context.Term, let it close ungracefully
if err := mq.socket.Close(); err != nil {
return err
2018-01-19 07:58:46 -07:00
}
if err := mq.context.Term(); err != nil {
return err
2018-01-19 07:58:46 -07:00
}
_, _ = <-mq.finished
glog.Info("MQ server shutdown finished")
2018-01-19 07:58:46 -07:00
}
return nil
2018-01-19 07:58:46 -07:00
}