Merge branch 'master' into ethereum

indexv1
Martin Boehm 2018-03-19 11:47:24 +01:00
commit f336217c1d
8 changed files with 112 additions and 48 deletions

View File

@ -6,14 +6,15 @@ import (
"blockbook/bchain/coins/eth"
"blockbook/bchain/coins/zec"
"blockbook/common"
"encoding/json"
"fmt"
"io/ioutil"
"reflect"
"time"
"github.com/juju/errors"
)
type blockChainFactory func(url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error)
type blockChainFactory func(config json.RawMessage, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error)
var blockChainFactories = make(map[string]blockChainFactory)
@ -26,10 +27,19 @@ func init() {
}
// NewBlockChain creates bchain.BlockChain of type defined by parameter coin
func NewBlockChain(coin string, url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) {
func NewBlockChain(coin string, configfile string, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error) {
bcf, ok := blockChainFactories[coin]
if !ok {
return nil, errors.New(fmt.Sprint("Unsupported coin ", coin, ". Must be one of ", reflect.ValueOf(blockChainFactories).MapKeys()))
}
return bcf(url, user, password, timeout, parse, metrics)
data, err := ioutil.ReadFile(configfile)
if err != nil {
return nil, errors.Annotatef(err, "Error reading file %v", configfile)
}
var config json.RawMessage
err = json.Unmarshal(data, &config)
if err != nil {
return nil, errors.Annotatef(err, "Error parsing file %v", configfile)
}
return bcf(config, pushHandler, metrics)
}

View File

@ -30,21 +30,38 @@ type BitcoinRPC struct {
Mempool *bchain.Mempool
ParseBlocks bool
metrics *common.Metrics
mq *bchain.MQ
}
type configuration struct {
RPCURL string `json:"rpcURL"`
RPCUser string `json:"rpcUser"`
RPCPass string `json:"rpcPass"`
RPCTimeout int `json:"rpcTimeout"`
Parse bool `json:"parse"`
ZeroMQBinding string `json:"zeroMQBinding"`
}
// NewBitcoinRPC returns new BitcoinRPC instance.
func NewBitcoinRPC(url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) {
func NewBitcoinRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error) {
var err error
var c configuration
err = json.Unmarshal(config, &c)
if err != nil {
return nil, errors.Annotatef(err, "Invalid configuragion file")
}
transport := &http.Transport{
Dial: (&net.Dialer{KeepAlive: 600 * time.Second}).Dial,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100, // necessary to not to deplete ports
}
s := &BitcoinRPC{
client: http.Client{Timeout: timeout, Transport: transport},
rpcURL: url,
user: user,
password: password,
ParseBlocks: parse,
client: http.Client{Timeout: time.Duration(c.RPCTimeout) * time.Second, Transport: transport},
rpcURL: c.RPCURL,
user: c.RPCUser,
password: c.RPCPass,
ParseBlocks: c.Parse,
metrics: metrics,
}
chainName, err := s.GetBlockChainInfo()
@ -66,12 +83,30 @@ func NewBitcoinRPC(url string, user string, password string, timeout time.Durati
s.Network = "testnet"
}
glog.Info("rpc: block chain ", s.Parser.Params.Name)
mq, err := bchain.NewMQ(c.ZeroMQBinding, pushHandler)
if err != nil {
glog.Error("mq: ", err)
return nil, err
}
s.mq = mq
s.Mempool = bchain.NewMempool(s, metrics)
glog.Info("rpc: block chain ", s.Parser.Params.Name)
return s, nil
}
func (b *BitcoinRPC) Shutdown() error {
if b.mq != nil {
if err := b.mq.Shutdown(); err != nil {
glog.Error("MQ.Shutdown error: ", err)
return err
}
}
return nil
}
func (b *BitcoinRPC) IsTestnet() bool {
return b.Testnet
}

View File

@ -24,12 +24,24 @@ bitcoin-0.15.1/bin/bitcoind -datadir=/data/btc/bitcoin -rpcworkqueue=32 -zmqpubh
```
Run the *run-btc-bitcoind.sh* to get initial import of data.
Create blockchain configuration file */data/testnet/blockbook/btc.json*
```
{
"rpcURL": "http://127.0.0.1:8332",
"rpcUser": "rpc",
"rpcPass": "rpc",
"rpcTimeout": 25,
"parse": true,
"zeroMQBinding": "tcp://127.0.0.1:8334"
}
```
Create script that runs blockbook *run-btc-blockbook.sh*
```
#!/bin/bash
cd go/src/blockbook
./blockbook -path=/data/btc/blockbook/db -sync -parse -rpcurl=http://127.0.0.1:8332 -httpserver=:8335 -socketio=:8336 -certfile=server/testcert -zeromq=tcp://127.0.0.1:8334 -explorer=https://bitcore1.trezor.io/ -coin=btc $1
./blockbook -coin=btc -blockchaincfg=/data/btc/blockbook/btc.json -datadir=/data/btc/blockbook/db -sync -httpserver=:8335 -socketio=:8336 -certfile=server/testcert -explorer=https://bitcore1.trezor.io/ $1
```
To run blockbook with logging to file (run with nohup or daemonize or using screen)
```

View File

@ -24,12 +24,24 @@ bitcoin-0.15.1/bin/bitcoind -datadir=/data/testnet/bitcoin -rpcworkqueue=32 -zmq
```
Run the *run-testnet-bitcoind.sh* to get initial import of data.
Create blockchain configuration file */data/testnet/blockbook/btc-testnet.json*
```
{
"rpcURL": "http://127.0.0.1:18332",
"rpcUser": "rpc",
"rpcPass": "rpc",
"rpcTimeout": 25,
"parse": true,
"zeroMQBinding": "tcp://127.0.0.1:18334"
}
```
Create script that runs blockbook *run-testnet-blockbook.sh*
```
#!/bin/bash
cd go/src/blockbook
./blockbook -path=/data/testnet/blockbook/db -sync -parse -rpcurl=http://127.0.0.1:18332 -httpserver=:18335 -socketio=:18336 -certfile=server/testcert -zeromq=tcp://127.0.0.1:18334 -explorer=https://testnet-bitcore1.trezor.io -coin=btc-testnet $1
./blockbook -coin=btc-testnet -blockchaincfg=/data/testnet/blockbook/btc-testnet.json -datadir=/data/testnet/blockbook/db -sync -httpserver=:18335 -socketio=:18336 -certfile=server/testcert -explorer=https://testnet-bitcore1.trezor.io $1
```
To run blockbook with logging to file (run with nohup or daemonize or using screen)
```

View File

@ -4,15 +4,15 @@ import (
"blockbook/bchain"
"blockbook/bchain/coins/btc"
"blockbook/common"
"time"
"encoding/json"
)
type ZCashRPC struct {
*btc.BitcoinRPC
}
func NewZCashRPC(url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) {
b, err := btc.NewBitcoinRPC(url, user, password, timeout, parse, metrics)
func NewZCashRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error) {
b, err := btc.NewBitcoinRPC(config, pushHandler, metrics)
if err != nil {
return nil, err
}

View File

@ -86,6 +86,8 @@ func (e *RPCError) Error() string {
}
type BlockChain interface {
// cleanup
Shutdown() error
// chain info
IsTestnet() bool
GetNetworkName() string

View File

@ -37,12 +37,9 @@ const resyncMempoolPeriodMs = 60017
const debounceResyncMempoolMs = 1009
var (
rpcURL = flag.String("rpcurl", "http://localhost:8332", "url of blockchain RPC service")
rpcUser = flag.String("rpcuser", "rpc", "rpc username")
rpcPass = flag.String("rpcpass", "rpc", "rpc password")
rpcTimeout = flag.Uint("rpctimeout", 25, "rpc timeout in seconds")
blockchain = flag.String("blockchaincfg", "", "path to blockchain RPC service configuration json file")
dbPath = flag.String("path", "./data", "path to address index directory")
dbPath = flag.String("datadir", "./data", "path to database directory")
blockFrom = flag.Int("blockheight", -1, "height of the starting block")
blockUntil = flag.Int("blockuntil", -1, "height of the final block")
@ -57,7 +54,6 @@ var (
syncChunk = flag.Int("chunk", 100, "block chunk size for processing")
syncWorkers = flag.Int("workers", 8, "number of workers to process blocks")
dryRun = flag.Bool("dryrun", false, "do not index blocks, only download")
parse = flag.Bool("parse", false, "use in-process block parsing")
httpServerBinding = flag.String("httpserver", "", "http server binding [address]:port, (default no http server)")
@ -65,8 +61,6 @@ var (
certFiles = flag.String("certfile", "", "to enable SSL specify path to certificate files without extension, expecting <certfile>.crt and <certfile>.key, (default no SSL)")
zeroMQBinding = flag.String("zeromq", "", "binding to zeromq, if missing no zeromq connection")
explorerURL = flag.String("explorer", "", "address of blockchain explorer")
coin = flag.String("coin", "btc", "coin name (default btc)")
@ -115,7 +109,11 @@ func main() {
glog.Fatal("GetMetrics: ", err)
}
if chain, err = coins.NewBlockChain(*coin, *rpcURL, *rpcUser, *rpcPass, time.Duration(*rpcTimeout)*time.Second, *parse, metrics); err != nil {
if *blockchain == "" {
glog.Fatal("Missing blockchaincfg configuration parameter")
}
if chain, err = coins.NewBlockChain(*coin, *blockchain, pushSynchronizationHandler, metrics); err != nil {
glog.Fatal("rpc: ", err)
}
@ -222,19 +220,6 @@ func main() {
go syncMempoolLoop()
}
var mq *bchain.MQ
if *zeroMQBinding != "" {
if !*synchronize {
glog.Error("zeromq connection without synchronization does not make sense, ignoring zeromq parameter")
} else {
mq, err = bchain.NewMQ(*zeroMQBinding, mqHandler)
if err != nil {
glog.Error("mq: ", err)
return
}
}
}
if *blockFrom >= 0 {
if *blockUntil < 0 {
*blockUntil = *blockFrom
@ -261,8 +246,8 @@ func main() {
}
}
if httpServer != nil || socketIoServer != nil || mq != nil {
waitForSignalAndShutdown(httpServer, socketIoServer, mq, 5*time.Second)
if httpServer != nil || socketIoServer != nil || chain != nil {
waitForSignalAndShutdown(httpServer, socketIoServer, chain, 5*time.Second)
}
if *synchronize {
@ -332,7 +317,7 @@ func onNewTxAddr(txid string, addr string) {
}
}
func mqHandler(m *bchain.MQMessage) {
func pushSynchronizationHandler(m *bchain.MQMessage) {
// TODO - is coin specific, item for abstraction
body := hex.EncodeToString(m.Body)
glog.V(1).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body)
@ -345,7 +330,7 @@ func mqHandler(m *bchain.MQMessage) {
}
}
func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketIoServer, mq *bchain.MQ, timeout time.Duration) {
func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketIoServer, chain bchain.BlockChain, timeout time.Duration) {
sig := <-chanOsSignal
ctx, cancel := context.WithTimeout(context.Background(), timeout)
@ -353,12 +338,6 @@ func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketI
glog.Infof("Shutdown: %v", sig)
if mq != nil {
if err := mq.Shutdown(); err != nil {
glog.Error("MQ.Shutdown error: ", err)
}
}
if https != nil {
if err := https.Shutdown(ctx); err != nil {
glog.Error("HttpServer.Shutdown error: ", err)
@ -370,6 +349,12 @@ func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketI
glog.Error("SocketIo.Shutdown error: ", err)
}
}
if chain != nil {
if err := chain.Shutdown(); err != nil {
glog.Error("BlockChain.Shutdown error: ", err)
}
}
}
func printResult(txid string, vout uint32, isOutput bool) error {

View File

@ -0,0 +1,8 @@
{
"rpcURL": "http://localhost:8332",
"rpcUser": "rpc",
"rpcPass": "rpc",
"rpcTimeout": 25,
"parse": true,
"zeroMQBinding": "tcp://127.0.0.1:28332"
}