ZeroMQ handler in the background with graceful shutdown

pull/1/head
Martin Boehm 2018-01-22 16:46:54 +01:00
parent bc5b99003e
commit 4427af769b
2 changed files with 99 additions and 33 deletions

View File

@ -2,38 +2,89 @@ package bitcoin
import (
"encoding/binary"
"encoding/hex"
"log"
zmq "github.com/pebbe/zmq4"
)
func ZeroMQ(binding string) {
// MQ is message queue listener handle
type MQ struct {
context *zmq.Context
socket *zmq.Socket
isRunning bool
finished chan bool
}
// MQMessage contains data received from Bitcoind message queue
type MQMessage struct {
Topic string
Sequence uint32
Body []byte
}
// New creates new Bitcoind ZeroMQ listener
// callback function receives messages
func New(binding string, callback func(*MQMessage)) (*MQ, error) {
context, err := zmq.NewContext()
if err != nil {
log.Fatal(err)
return nil, err
}
socket, err := context.NewSocket(zmq.SUB)
if err != nil {
log.Fatal(err)
return nil, err
}
socket.SetSubscribe("hashblock")
socket.SetSubscribe("hashtx")
socket.SetSubscribe("rawblock")
socket.SetSubscribe("rawtx")
socket.Connect(binding)
defer socket.Close()
for i := 0; i < 101; i++ {
msg, err := socket.RecvMessageBytes(0)
if err != nil {
log.Fatal(err)
}
topic := string(msg[0])
body := hex.EncodeToString(msg[1])
sequence := uint32(0)
if len(msg[len(msg)-1]) == 4 {
sequence = binary.LittleEndian.Uint32(msg[len(msg)-1])
}
log.Printf("%s-%d (%v) %s", topic, sequence, msg[len(msg)-1], body)
}
log.Printf("MQ listening to %s", binding)
mq := &MQ{context, socket, true, make(chan bool)}
go mq.run(callback)
return mq, nil
}
func (mq *MQ) run(callback func(*MQMessage)) {
mq.isRunning = true
for {
msg, err := mq.socket.RecvMessageBytes(0)
if err != nil {
if zmq.AsErrno(err) == zmq.Errno(zmq.ETERM) {
close(mq.finished)
log.Print("MQ loop terminated")
break
}
log.Printf("MQ RecvMessageBytes error %v", err)
}
if msg != nil && len(msg) >= 3 {
sequence := uint32(0)
if len(msg[len(msg)-1]) == 4 {
sequence = binary.LittleEndian.Uint32(msg[len(msg)-1])
}
m := &MQMessage{
Topic: string(msg[0]),
Sequence: sequence,
Body: msg[1],
}
callback(m)
}
}
mq.isRunning = false
}
// Shutdown stops listening to the ZeroMQ and closes the connection
func (mq *MQ) Shutdown() error {
log.Printf("MQ server shutdown")
if mq.isRunning {
// if errors in socket.Close or context.Term, let it close ungracefully
if err := mq.socket.Close(); err != nil {
return err
}
if err := mq.context.Term(); err != nil {
return err
}
_, _ = <-mq.finished
log.Printf("MQ server shutdown finished")
}
return nil
}

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"encoding/hex"
"flag"
"log"
"os"
@ -62,9 +63,8 @@ var (
func main() {
flag.Parse()
if *zeroMQBinding != "nil" {
bitcoin.ZeroMQ(*zeroMQBinding)
return
if *prof {
defer profile.Start().Stop()
}
if *repair {
@ -74,10 +74,6 @@ func main() {
return
}
if *prof {
defer profile.Start().Stop()
}
rpc := bitcoin.NewBitcoinRPC(
*rpcURL,
*rpcUser,
@ -101,16 +97,24 @@ func main() {
if *httpServerBinding != "nil" {
httpServer, err = server.New(*httpServerBinding, db)
if err != nil {
log.Fatalf("https: %s", err)
log.Fatalf("https: %v", err)
}
go func() {
err = httpServer.Run()
if err != nil {
log.Fatalf("https: %s", err)
log.Fatalf("https: %v", err)
}
}()
}
var mq *bitcoin.MQ
if *zeroMQBinding != "nil" {
mq, err = bitcoin.New(*zeroMQBinding, mqHandler)
if err != nil {
log.Fatalf("mq: %v", err)
}
}
if *resync {
if err := resyncIndex(rpc, db); err != nil {
log.Fatal(err)
@ -144,27 +148,38 @@ func main() {
}
if httpServer != nil {
waitForSignalAndShutdown(httpServer, 5*time.Second)
waitForSignalAndShutdown(httpServer, mq, 5*time.Second)
}
}
func waitForSignalAndShutdown(s *server.HttpServer, timeout time.Duration) {
stop := make(chan os.Signal, 1)
func mqHandler(m *bitcoin.MQMessage) {
body := hex.EncodeToString(m.Body)
log.Printf("MQ: %s-%d %s", m.Topic, m.Sequence, body)
}
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
func waitForSignalAndShutdown(s *server.HttpServer, mq *bitcoin.MQ, timeout time.Duration) {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
sig := <-stop
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
log.Printf("Shutdown %v", sig)
log.Printf("Shutdown with reason: %v", sig)
if mq != nil {
if err := mq.Shutdown(); err != nil {
log.Printf("MQ.Shutdown error: %v", err)
}
}
if s != nil {
if err := s.Shutdown(ctx); err != nil {
log.Printf("Error: %v", err)
log.Printf("HttpServer.Shutdown error: %v", err)
}
}
}
func printResult(txid string) error {