Try to reconnect ethereum RPC

pull/197/head
Martin Boehm 2019-06-03 17:48:09 +02:00
parent 5c2b9f763e
commit c409a350c9
1 changed files with 49 additions and 11 deletions

View File

@ -72,11 +72,8 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
if c.BlockAddressesToKeep < 100 {
c.BlockAddressesToKeep = 100
}
rc, err := rpc.Dial(c.RPCURL)
if err != nil {
return nil, err
}
ec := ethclient.NewClient(rc)
rc, ec, err := openRPC(c.RPCURL)
s := &EthereumRPC{
BaseChain: &bchain.BaseChain{},
@ -133,6 +130,15 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
return s, nil
}
func openRPC(url string) (*rpc.Client, *ethclient.Client, error) {
rc, err := rpc.Dial(url)
if err != nil {
return nil, nil, err
}
ec := ethclient.NewClient(rc)
return rc, ec, nil
}
// Initialize initializes ethereum rpc interface
func (b *EthereumRPC) Initialize() error {
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
@ -187,6 +193,16 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu
b.Mempool.OnNewTxAddr = onNewTxAddr
if err = b.subscribeEvents(); err != nil {
return err
}
b.mempoolInitialized = true
return nil
}
func (b *EthereumRPC) subscribeEvents() error {
if b.isETC {
glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads")
} else {
@ -224,8 +240,6 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu
return err
}
b.mempoolInitialized = true
return nil
}
@ -246,7 +260,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error
}
glog.Error("Subscription error ", e)
timer := time.NewTimer(time.Second * 2)
// try in 1 second interval to resubscribe
// try in 2 second interval to resubscribe
for {
select {
case e = <-s.Err():
@ -260,7 +274,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error
s = ns
continue Loop
}
glog.Error("Resubscribe error ", e)
glog.Error("Resubscribe error ", err)
timer.Reset(time.Second * 2)
}
}
@ -269,8 +283,7 @@ func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error
return nil
}
// Shutdown cleans up rpc interface to ethereum
func (b *EthereumRPC) Shutdown(ctx context.Context) error {
func (b *EthereumRPC) closeRPC() {
if b.newBlockSubscription != nil {
b.newBlockSubscription.Unsubscribe()
}
@ -280,6 +293,23 @@ func (b *EthereumRPC) Shutdown(ctx context.Context) error {
if b.rpc != nil {
b.rpc.Close()
}
}
func (b *EthereumRPC) reconnectRPC() error {
glog.Info("Reconnecting RPC")
b.closeRPC()
rc, ec, err := openRPC(b.ChainConfig.RPCURL)
if err != nil {
return err
}
b.rpc = rc
b.client = ec
return b.subscribeEvents()
}
// Shutdown cleans up rpc interface to ethereum
func (b *EthereumRPC) Shutdown(ctx context.Context) error {
b.closeRPC()
close(b.chanNewBlock)
glog.Info("rpc: shutdown")
return nil
@ -339,6 +369,14 @@ func (b *EthereumRPC) getBestHeader() (*ethtypes.Header, error) {
b.bestHeader = nil
}
}
// if the best header was not updated for 15 minutes, there could be a subscription problem, reconnect RPC
if b.bestHeaderTime.Add(15 * time.Minute).Before(time.Now()) {
err := b.reconnectRPC()
if err != nil {
return nil, err
}
b.bestHeader = nil
}
if b.bestHeader == nil {
var err error
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)