Store time of mempool transaction
parent
827cbcd1d8
commit
f2dc4a56d8
|
@ -152,8 +152,8 @@ func (c *blockChainWithMetrics) CreateMempool() (bchain.Mempool, error) {
|
|||
return c.b.CreateMempool()
|
||||
}
|
||||
|
||||
func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error {
|
||||
return c.b.InitializeMempool(addrDescForOutpoint)
|
||||
func (c *blockChainWithMetrics) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
|
||||
return c.b.InitializeMempool(addrDescForOutpoint, onNewTxAddr)
|
||||
}
|
||||
|
||||
func (c *blockChainWithMetrics) Shutdown(ctx context.Context) error {
|
||||
|
@ -293,9 +293,9 @@ func (c *mempoolWithMetrics) observeRPCLatency(method string, start time.Time, e
|
|||
c.m.RPCLatency.With(common.Labels{"method": method, "error": e}).Observe(float64(time.Since(start)) / 1e6) // in milliseconds
|
||||
}
|
||||
|
||||
func (c *mempoolWithMetrics) Resync(onNewTxAddr bchain.OnNewTxAddrFunc) (count int, err error) {
|
||||
func (c *mempoolWithMetrics) Resync() (count int, err error) {
|
||||
defer func(s time.Time) { c.observeRPCLatency("ResyncMempool", s, err) }(time.Now())
|
||||
count, err = c.mempool.Resync(onNewTxAddr)
|
||||
count, err = c.mempool.Resync()
|
||||
if err == nil {
|
||||
c.m.MempoolSize.Set(float64(count))
|
||||
}
|
||||
|
|
|
@ -139,11 +139,12 @@ func (b *BitcoinRPC) CreateMempool() (bchain.Mempool, error) {
|
|||
}
|
||||
|
||||
// InitializeMempool creates ZeroMQ subscription and sets AddrDescForOutpointFunc to the Mempool
|
||||
func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error {
|
||||
func (b *BitcoinRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
|
||||
if b.Mempool == nil {
|
||||
return errors.New("Mempool not created")
|
||||
}
|
||||
b.Mempool.AddrDescForOutpoint = addrDescForOutpoint
|
||||
b.Mempool.OnNewTxAddr = onNewTxAddr
|
||||
if b.mq == nil {
|
||||
mq, err := bchain.NewMQ(b.ChainConfig.MessageQueueBinding, b.pushHandler)
|
||||
if err != nil {
|
||||
|
|
|
@ -172,10 +172,11 @@ func (b *EthereumRPC) CreateMempool() (bchain.Mempool, error) {
|
|||
}
|
||||
|
||||
// InitializeMempool creates subscriptions to newHeads and newPendingTransactions
|
||||
func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error {
|
||||
func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
|
||||
if b.Mempool == nil {
|
||||
return errors.New("Mempool not created")
|
||||
}
|
||||
b.Mempool.OnNewTxAddr = onNewTxAddr
|
||||
if b.isETC {
|
||||
glog.Info(b.ChainConfig.CoinName, " does not support subscription to newHeads")
|
||||
} else {
|
||||
|
|
|
@ -12,6 +12,11 @@ type addrIndex struct {
|
|||
n int32
|
||||
}
|
||||
|
||||
type txEntry struct {
|
||||
addrIndexes []addrIndex
|
||||
time uint32
|
||||
}
|
||||
|
||||
type txidio struct {
|
||||
txid string
|
||||
io []addrIndex
|
||||
|
@ -21,11 +26,11 @@ type txidio struct {
|
|||
type MempoolBitcoinType struct {
|
||||
chain BlockChain
|
||||
mux sync.Mutex
|
||||
txToInputOutput map[string][]addrIndex
|
||||
txEntries map[string]txEntry
|
||||
addrDescToTx map[string][]Outpoint
|
||||
chanTxid chan string
|
||||
chanAddrIndex chan txidio
|
||||
onNewTxAddr OnNewTxAddrFunc
|
||||
OnNewTxAddr OnNewTxAddrFunc
|
||||
AddrDescForOutpoint AddrDescForOutpointFunc
|
||||
}
|
||||
|
||||
|
@ -79,10 +84,10 @@ func (m *MempoolBitcoinType) GetAddrDescTransactions(addrDesc AddressDescriptor)
|
|||
return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil
|
||||
}
|
||||
|
||||
func (m *MempoolBitcoinType) updateMappings(newTxToInputOutput map[string][]addrIndex, newAddrDescToTx map[string][]Outpoint) {
|
||||
func (m *MempoolBitcoinType) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) {
|
||||
m.mux.Lock()
|
||||
defer m.mux.Unlock()
|
||||
m.txToInputOutput = newTxToInputOutput
|
||||
m.txEntries = newTxEntries
|
||||
m.addrDescToTx = newAddrDescToTx
|
||||
}
|
||||
|
||||
|
@ -128,8 +133,8 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch
|
|||
if len(addrDesc) > 0 {
|
||||
io = append(io, addrIndex{string(addrDesc), int32(output.N)})
|
||||
}
|
||||
if m.onNewTxAddr != nil {
|
||||
m.onNewTxAddr(tx, addrDesc)
|
||||
if m.OnNewTxAddr != nil {
|
||||
m.OnNewTxAddr(tx, addrDesc)
|
||||
}
|
||||
}
|
||||
dispatched := 0
|
||||
|
@ -166,37 +171,37 @@ func (m *MempoolBitcoinType) getTxAddrs(txid string, chanInput chan Outpoint, ch
|
|||
// Resync gets mempool transactions and maps outputs to transactions.
|
||||
// Resync is not reentrant, it should be called from a single thread.
|
||||
// Read operations (GetTransactions) are safe.
|
||||
func (m *MempoolBitcoinType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
|
||||
func (m *MempoolBitcoinType) Resync() (int, error) {
|
||||
start := time.Now()
|
||||
glog.V(1).Info("mempool: resync")
|
||||
m.onNewTxAddr = onNewTxAddr
|
||||
txs, err := m.chain.GetMempoolTransactions()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
glog.V(2).Info("mempool: resync ", len(txs), " txs")
|
||||
// allocate slightly larger capacity of the maps
|
||||
newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5)
|
||||
newTxEntries := make(map[string]txEntry, len(m.txEntries)+5)
|
||||
newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5)
|
||||
dispatched := 0
|
||||
onNewData := func(txid string, io []addrIndex) {
|
||||
if len(io) > 0 {
|
||||
newTxToInputOutput[txid] = io
|
||||
for _, si := range io {
|
||||
txTime := uint32(time.Now().Unix())
|
||||
onNewData := func(txid string, entry txEntry) {
|
||||
if len(entry.addrIndexes) > 0 {
|
||||
newTxEntries[txid] = entry
|
||||
for _, si := range entry.addrIndexes {
|
||||
newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n})
|
||||
}
|
||||
}
|
||||
}
|
||||
// get transaction in parallel using goroutines created in NewUTXOMempool
|
||||
for _, txid := range txs {
|
||||
io, exists := m.txToInputOutput[txid]
|
||||
io, exists := m.txEntries[txid]
|
||||
if !exists {
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
// store as many processed transactions as possible
|
||||
case tio := <-m.chanAddrIndex:
|
||||
onNewData(tio.txid, tio.io)
|
||||
onNewData(tio.txid, txEntry{tio.io, txTime})
|
||||
dispatched--
|
||||
// send transaction to be processed
|
||||
case m.chanTxid <- txid:
|
||||
|
@ -210,10 +215,9 @@ func (m *MempoolBitcoinType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
|
|||
}
|
||||
for i := 0; i < dispatched; i++ {
|
||||
tio := <-m.chanAddrIndex
|
||||
onNewData(tio.txid, tio.io)
|
||||
onNewData(tio.txid, txEntry{tio.io, txTime})
|
||||
}
|
||||
m.updateMappings(newTxToInputOutput, newAddrDescToTx)
|
||||
m.onNewTxAddr = nil
|
||||
glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool")
|
||||
return len(m.txToInputOutput), nil
|
||||
m.updateMappings(newTxEntries, newAddrDescToTx)
|
||||
glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool")
|
||||
return len(m.txEntries), nil
|
||||
}
|
||||
|
|
|
@ -9,10 +9,11 @@ import (
|
|||
|
||||
// MempoolEthereumType is mempool handle of EthereumType chains
|
||||
type MempoolEthereumType struct {
|
||||
chain BlockChain
|
||||
mux sync.Mutex
|
||||
txToInputOutput map[string][]addrIndex
|
||||
addrDescToTx map[string][]Outpoint
|
||||
chain BlockChain
|
||||
mux sync.Mutex
|
||||
txEntries map[string]txEntry
|
||||
addrDescToTx map[string][]Outpoint
|
||||
OnNewTxAddr OnNewTxAddrFunc
|
||||
}
|
||||
|
||||
// NewMempoolEthereumType creates new mempool handler.
|
||||
|
@ -37,10 +38,10 @@ func (m *MempoolEthereumType) GetAddrDescTransactions(addrDesc AddressDescriptor
|
|||
return append([]Outpoint(nil), m.addrDescToTx[string(addrDesc)]...), nil
|
||||
}
|
||||
|
||||
func (m *MempoolEthereumType) updateMappings(newTxToInputOutput map[string][]addrIndex, newAddrDescToTx map[string][]Outpoint) {
|
||||
func (m *MempoolEthereumType) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) {
|
||||
m.mux.Lock()
|
||||
defer m.mux.Unlock()
|
||||
m.txToInputOutput = newTxToInputOutput
|
||||
m.txEntries = newTxEntries
|
||||
m.addrDescToTx = newAddrDescToTx
|
||||
}
|
||||
|
||||
|
@ -56,73 +57,83 @@ func appendAddress(io []addrIndex, i int32, a string, parser BlockChainParser) [
|
|||
return io
|
||||
}
|
||||
|
||||
func (m *MempoolEthereumType) createTxEntry(txid string, txTime uint32) (txEntry, bool) {
|
||||
tx, err := m.chain.GetTransactionForMempool(txid)
|
||||
if err != nil {
|
||||
if err != ErrTxNotFound {
|
||||
glog.Warning("cannot get transaction ", txid, ": ", err)
|
||||
}
|
||||
return txEntry{}, false
|
||||
}
|
||||
parser := m.chain.GetChainParser()
|
||||
addrIndexes := make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin))
|
||||
for _, output := range tx.Vout {
|
||||
addrDesc, err := parser.GetAddrDescFromVout(&output)
|
||||
if err != nil {
|
||||
if err != ErrAddressMissing {
|
||||
glog.Error("error in output addrDesc in ", txid, " ", output.N, ": ", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if len(addrDesc) > 0 {
|
||||
addrIndexes = append(addrIndexes, addrIndex{string(addrDesc), int32(output.N)})
|
||||
}
|
||||
}
|
||||
for _, input := range tx.Vin {
|
||||
for i, a := range input.Addresses {
|
||||
addrIndexes = appendAddress(addrIndexes, ^int32(i), a, parser)
|
||||
}
|
||||
}
|
||||
t, err := parser.EthereumTypeGetErc20FromTx(tx)
|
||||
if err != nil {
|
||||
glog.Error("GetErc20FromTx for tx ", txid, ", ", err)
|
||||
} else {
|
||||
for i := range t {
|
||||
addrIndexes = appendAddress(addrIndexes, ^int32(i+1), t[i].From, parser)
|
||||
addrIndexes = appendAddress(addrIndexes, int32(i+1), t[i].To, parser)
|
||||
}
|
||||
}
|
||||
if m.OnNewTxAddr != nil {
|
||||
sent := make(map[string]struct{})
|
||||
for _, si := range addrIndexes {
|
||||
if _, found := sent[si.addrDesc]; !found {
|
||||
m.OnNewTxAddr(tx, AddressDescriptor(si.addrDesc))
|
||||
sent[si.addrDesc] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return txEntry{addrIndexes: addrIndexes, time: txTime}, true
|
||||
}
|
||||
|
||||
// Resync gets mempool transactions and maps outputs to transactions.
|
||||
// Resync is not reentrant, it should be called from a single thread.
|
||||
// Read operations (GetTransactions) are safe.
|
||||
func (m *MempoolEthereumType) Resync(onNewTxAddr OnNewTxAddrFunc) (int, error) {
|
||||
func (m *MempoolEthereumType) Resync() (int, error) {
|
||||
start := time.Now()
|
||||
glog.V(1).Info("Mempool: resync")
|
||||
txs, err := m.chain.GetMempoolTransactions()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
parser := m.chain.GetChainParser()
|
||||
// allocate slightly larger capacity of the maps
|
||||
newTxToInputOutput := make(map[string][]addrIndex, len(m.txToInputOutput)+5)
|
||||
newTxEntries := make(map[string]txEntry, len(m.txEntries)+5)
|
||||
newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5)
|
||||
txTime := uint32(time.Now().Unix())
|
||||
var ok bool
|
||||
for _, txid := range txs {
|
||||
io, exists := m.txToInputOutput[txid]
|
||||
entry, exists := m.txEntries[txid]
|
||||
if !exists {
|
||||
tx, err := m.chain.GetTransactionForMempool(txid)
|
||||
if err != nil {
|
||||
if err != ErrTxNotFound {
|
||||
glog.Warning("cannot get transaction ", txid, ": ", err)
|
||||
}
|
||||
entry, ok = m.createTxEntry(txid, txTime)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
io = make([]addrIndex, 0, len(tx.Vout)+len(tx.Vin))
|
||||
for _, output := range tx.Vout {
|
||||
addrDesc, err := parser.GetAddrDescFromVout(&output)
|
||||
if err != nil {
|
||||
if err != ErrAddressMissing {
|
||||
glog.Error("error in output addrDesc in ", txid, " ", output.N, ": ", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if len(addrDesc) > 0 {
|
||||
io = append(io, addrIndex{string(addrDesc), int32(output.N)})
|
||||
}
|
||||
}
|
||||
for _, input := range tx.Vin {
|
||||
for i, a := range input.Addresses {
|
||||
appendAddress(io, ^int32(i), a, parser)
|
||||
}
|
||||
}
|
||||
t, err := parser.EthereumTypeGetErc20FromTx(tx)
|
||||
if err != nil {
|
||||
glog.Error("GetErc20FromTx for tx ", txid, ", ", err)
|
||||
} else {
|
||||
for i := range t {
|
||||
io = appendAddress(io, ^int32(i+1), t[i].From, parser)
|
||||
io = appendAddress(io, int32(i+1), t[i].To, parser)
|
||||
}
|
||||
}
|
||||
if onNewTxAddr != nil {
|
||||
sent := make(map[string]struct{})
|
||||
for _, si := range io {
|
||||
if _, found := sent[si.addrDesc]; !found {
|
||||
onNewTxAddr(tx, AddressDescriptor(si.addrDesc))
|
||||
sent[si.addrDesc] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
newTxToInputOutput[txid] = io
|
||||
for _, si := range io {
|
||||
newTxEntries[txid] = entry
|
||||
for _, si := range entry.addrIndexes {
|
||||
newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n})
|
||||
}
|
||||
}
|
||||
m.updateMappings(newTxToInputOutput, newAddrDescToTx)
|
||||
glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txToInputOutput), " transactions in mempool")
|
||||
return len(m.txToInputOutput), nil
|
||||
m.updateMappings(newTxEntries, newAddrDescToTx)
|
||||
glog.Info("Mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool")
|
||||
return len(m.txEntries), nil
|
||||
}
|
||||
|
|
|
@ -202,7 +202,7 @@ type BlockChain interface {
|
|||
// create mempool but do not initialize it
|
||||
CreateMempool() (Mempool, error)
|
||||
// initialize mempool, create ZeroMQ (or other) subscription
|
||||
InitializeMempool(AddrDescForOutpointFunc) error
|
||||
InitializeMempool(AddrDescForOutpointFunc, OnNewTxAddrFunc) error
|
||||
// shutdown mempool, ZeroMQ and block chain connections
|
||||
Shutdown(ctx context.Context) error
|
||||
// chain info
|
||||
|
@ -278,7 +278,7 @@ type BlockChainParser interface {
|
|||
|
||||
// Mempool defines common interface to mempool
|
||||
type Mempool interface {
|
||||
Resync(onNewTxAddr OnNewTxAddrFunc) (int, error)
|
||||
Resync() (int, error)
|
||||
GetTransactions(address string) ([]Outpoint, error)
|
||||
GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error)
|
||||
}
|
||||
|
|
10
blockbook.go
10
blockbook.go
|
@ -231,13 +231,13 @@ func main() {
|
|||
if chain.GetChainParser().GetChainType() == bchain.ChainBitcoinType {
|
||||
addrDescForOutpoint = index.AddrDescForOutpoint
|
||||
}
|
||||
err = chain.InitializeMempool(addrDescForOutpoint)
|
||||
err = chain.InitializeMempool(addrDescForOutpoint, onNewTxAddr)
|
||||
if err != nil {
|
||||
glog.Error("initializeMempool ", err)
|
||||
return
|
||||
}
|
||||
var mempoolCount int
|
||||
if mempoolCount, err = mempool.Resync(nil); err != nil {
|
||||
if mempoolCount, err = mempool.Resync(); err != nil {
|
||||
glog.Error("resyncMempool ", err)
|
||||
return
|
||||
}
|
||||
|
@ -250,6 +250,8 @@ func main() {
|
|||
|
||||
if publicServer != nil {
|
||||
// start full public interface
|
||||
callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock)
|
||||
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr)
|
||||
publicServer.ConnectFullPublicInterface()
|
||||
}
|
||||
|
||||
|
@ -342,8 +344,6 @@ func startPublicServer() (*server.PublicServer, error) {
|
|||
}
|
||||
}
|
||||
}()
|
||||
callbacksOnNewBlock = append(callbacksOnNewBlock, publicServer.OnNewBlock)
|
||||
callbacksOnNewTxAddr = append(callbacksOnNewTxAddr, publicServer.OnNewTxAddr)
|
||||
return publicServer, err
|
||||
}
|
||||
|
||||
|
@ -474,7 +474,7 @@ func syncMempoolLoop() {
|
|||
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
|
||||
tickAndDebounce(time.Duration(*resyncMempoolPeriodMs)*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() {
|
||||
internalState.StartedMempoolSync()
|
||||
if count, err := mempool.Resync(onNewTxAddr); err != nil {
|
||||
if count, err := mempool.Resync(); err != nil {
|
||||
glog.Error("syncMempoolLoop ", errors.ErrorStack(err))
|
||||
} else {
|
||||
internalState.FinishedMempoolSync(count)
|
||||
|
|
|
@ -25,7 +25,7 @@ func (c *fakeBlockChain) Initialize() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc) error {
|
||||
func (c *fakeBlockChain) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOutpointFunc, onNewTxAddr bchain.OnNewTxAddrFunc) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -174,7 +174,7 @@ func initBlockChain(coinName string, cfg json.RawMessage) (bchain.BlockChain, bc
|
|||
return nil, nil, fmt.Errorf("Mempool creation failed: %s", err)
|
||||
}
|
||||
|
||||
err = cli.InitializeMempool(nil)
|
||||
err = cli.InitializeMempool(nil, nil)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("Mempool initialization failed: %s", err)
|
||||
}
|
||||
|
|
|
@ -200,7 +200,7 @@ func testMempoolSync(t *testing.T, h *TestHandler) {
|
|||
for i := 0; i < 3; i++ {
|
||||
txs := getMempool(t, h)
|
||||
|
||||
n, err := h.Mempool.Resync(nil)
|
||||
n, err := h.Mempool.Resync()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue