Remove bitcoin specific code from MQ push handler

indexv1
Martin Boehm 2018-03-27 23:39:06 +02:00
parent 3127e5b5c2
commit 19d071a184
6 changed files with 40 additions and 29 deletions

View File

@ -15,7 +15,7 @@ import (
"github.com/juju/errors"
)
type blockChainFactory func(config json.RawMessage, pushHandler func(*bchain.MQMessage)) (bchain.BlockChain, error)
type blockChainFactory func(config json.RawMessage, pushHandler func(bchain.NotificationType)) (bchain.BlockChain, error)
var blockChainFactories = make(map[string]blockChainFactory)
@ -28,7 +28,7 @@ func init() {
}
// NewBlockChain creates bchain.BlockChain of type defined by parameter coin
func NewBlockChain(coin string, configfile string, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error) {
func NewBlockChain(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics) (bchain.BlockChain, error) {
bcf, ok := blockChainFactories[coin]
if !ok {
return nil, errors.New(fmt.Sprint("Unsupported coin ", coin, ". Must be one of ", reflect.ValueOf(blockChainFactories).MapKeys()))

View File

@ -41,7 +41,7 @@ type configuration struct {
}
// NewBitcoinRPC returns new BitcoinRPC instance.
func NewBitcoinRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage)) (bchain.BlockChain, error) {
func NewBitcoinRPC(config json.RawMessage, pushHandler func(bchain.NotificationType)) (bchain.BlockChain, error) {
var err error
var c configuration
err = json.Unmarshal(config, &c)

View File

@ -48,7 +48,7 @@ type configuration struct {
}
// NewEthRPC returns new EthRPC instance.
func NewEthRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage)) (bchain.BlockChain, error) {
func NewEthRPC(config json.RawMessage, pushHandler func(bchain.NotificationType)) (bchain.BlockChain, error) {
var err error
var c configuration
err = json.Unmarshal(config, &c)

View File

@ -13,7 +13,7 @@ type ZCashRPC struct {
*btc.BitcoinRPC
}
func NewZCashRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage)) (bchain.BlockChain, error) {
func NewZCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationType)) (bchain.BlockChain, error) {
b, err := btc.NewBitcoinRPC(config, pushHandler)
if err != nil {
return nil, err

View File

@ -16,16 +16,21 @@ type MQ struct {
finished chan bool
}
// MQMessage contains data received from Bitcoind message queue
type MQMessage struct {
Topic string
Sequence uint32
Body []byte
}
// NotificationType is type of notification
type NotificationType int
const (
// NotificationUnknown is unknown
NotificationUnknown NotificationType = iota
// NotificationNewBlock message is sent when there is a new block to be imported
NotificationNewBlock NotificationType = iota
// NotificationNewTx message is sent when there is a new mempool transaction
NotificationNewTx NotificationType = iota
)
// NewMQ creates new Bitcoind ZeroMQ listener
// callback function receives messages
func NewMQ(binding string, callback func(*MQMessage)) (*MQ, error) {
func NewMQ(binding string, callback func(NotificationType)) (*MQ, error) {
context, err := zmq.NewContext()
if err != nil {
return nil, err
@ -56,7 +61,7 @@ func NewMQ(binding string, callback func(*MQMessage)) (*MQ, error) {
return mq, nil
}
func (mq *MQ) run(callback func(*MQMessage)) {
func (mq *MQ) run(callback func(NotificationType)) {
mq.isRunning = true
for {
msg, err := mq.socket.RecvMessageBytes(0)
@ -70,16 +75,25 @@ func (mq *MQ) run(callback func(*MQMessage)) {
time.Sleep(100 * time.Millisecond)
}
if msg != nil && len(msg) >= 3 {
sequence := uint32(0)
if len(msg[len(msg)-1]) == 4 {
sequence = binary.LittleEndian.Uint32(msg[len(msg)-1])
var nt NotificationType
switch string(msg[0]) {
case "hashblock":
nt = NotificationNewBlock
break
case "hashtx":
nt = NotificationNewTx
break
default:
nt = NotificationUnknown
}
m := &MQMessage{
Topic: string(msg[0]),
Sequence: sequence,
Body: msg[1],
if glog.V(2) {
sequence := uint32(0)
if len(msg[len(msg)-1]) == 4 {
sequence = binary.LittleEndian.Uint32(msg[len(msg)-1])
}
glog.Infof("MQ: %v %s-%d", nt, string(msg[0]), sequence)
}
callback(m)
callback(nt)
}
}
mq.isRunning = false

View File

@ -2,7 +2,6 @@ package main
import (
"context"
"encoding/hex"
"flag"
"log"
"os"
@ -312,16 +311,14 @@ func onNewTxAddr(txid string, addr string) {
}
}
func pushSynchronizationHandler(m *bchain.MQMessage) {
// TODO - is coin specific, item for abstraction
body := hex.EncodeToString(m.Body)
glog.V(1).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body)
if m.Topic == "hashblock" {
func pushSynchronizationHandler(nt bchain.NotificationType) {
glog.V(1).Infof("MQ: notification ", nt)
if nt == bchain.NotificationNewBlock {
chanSyncIndex <- struct{}{}
} else if m.Topic == "hashtx" {
} else if nt == bchain.NotificationNewTx {
chanSyncMempool <- struct{}{}
} else {
glog.Errorf("MQ: unknown message %s-%d %s", m.Topic, m.Sequence, body)
glog.Error("MQ: unknown notification sent")
}
}