Reconnect subsciptions after recovery from network/geth failure
parent
9aec4ec717
commit
40619d126f
|
@ -139,25 +139,83 @@ func (b *EthereumRPC) Initialize() error {
|
|||
}
|
||||
glog.Info("rpc: block chain ", b.Network)
|
||||
|
||||
// Subscribe to new blocks
|
||||
sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads")
|
||||
if err != nil {
|
||||
return errors.Annotatef(err, "EthSubscribe newHeads")
|
||||
// subscriptions
|
||||
if err = b.subscribe(func() (*rpc.ClientSubscription, error) {
|
||||
// invalidate the previous subscription - it is either the first one or there was an error
|
||||
b.newBlockSubscription = nil
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
|
||||
defer cancel()
|
||||
sub, err := b.rpc.EthSubscribe(ctx, b.chanNewBlock, "newHeads")
|
||||
if err != nil {
|
||||
return nil, errors.Annotatef(err, "EthSubscribe newHeads")
|
||||
}
|
||||
b.newBlockSubscription = sub
|
||||
glog.Info("Subscribed to newHeads")
|
||||
return sub, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
b.newBlockSubscription = sub
|
||||
|
||||
// Subscribe to new mempool transactions
|
||||
sub, err = b.rpc.EthSubscribe(ctx, b.chanNewTx, "newPendingTransactions")
|
||||
if err != nil {
|
||||
return errors.Annotatef(err, "EthSubscribe newPendingTransactions")
|
||||
if err = b.subscribe(func() (*rpc.ClientSubscription, error) {
|
||||
// invalidate the previous subscription - it is either the first one or there was an error
|
||||
b.newTxSubscription = nil
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
|
||||
defer cancel()
|
||||
sub, err := b.rpc.EthSubscribe(ctx, b.chanNewTx, "newPendingTransactions")
|
||||
if err != nil {
|
||||
return nil, errors.Annotatef(err, "EthSubscribe newPendingTransactions")
|
||||
}
|
||||
b.newTxSubscription = sub
|
||||
glog.Info("Subscribed to newPendingTransactions")
|
||||
return sub, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
b.newTxSubscription = sub
|
||||
|
||||
// create mempool
|
||||
b.Mempool = bchain.NewNonUTXOMempool(b)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// subscribeNewBlocks subscribes to new blocks notification
|
||||
func (b *EthereumRPC) subscribe(f func() (*rpc.ClientSubscription, error)) error {
|
||||
s, err := f()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
Loop:
|
||||
for {
|
||||
// wait for error in subscription
|
||||
e := <-s.Err()
|
||||
// nil error means sub.Unsubscribe called, exit goroutine
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
glog.Error("Subscription error ", e)
|
||||
timer := time.NewTimer(time.Second)
|
||||
// try in 1 second interval to resubscribe
|
||||
for {
|
||||
select {
|
||||
case e = <-s.Err():
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
case <-timer.C:
|
||||
ns, err := f()
|
||||
if err == nil {
|
||||
// subscription successful, restart wait for next error
|
||||
s = ns
|
||||
continue Loop
|
||||
}
|
||||
timer.Reset(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown cleans up rpc interface to ethereum
|
||||
func (b *EthereumRPC) Shutdown() error {
|
||||
if b.newBlockSubscription != nil {
|
||||
|
|
Loading…
Reference in New Issue