Modify bitcoin type mempool resync to preserve first seen time order
parent
3f973bf47d
commit
b227a8e777
|
@ -65,11 +65,24 @@ func (a MempoolTxidEntries) Less(i, j int) bool {
|
|||
return hi > hj
|
||||
}
|
||||
|
||||
func (m *BaseMempool) updateMappings(newTxEntries map[string]txEntry, newAddrDescToTx map[string][]Outpoint) {
|
||||
m.mux.Lock()
|
||||
m.txEntries = newTxEntries
|
||||
m.addrDescToTx = newAddrDescToTx
|
||||
m.mux.Unlock()
|
||||
func (m *BaseMempool) removeEntryFromMempool(txid string, entry txEntry) {
|
||||
delete(m.txEntries, txid)
|
||||
for _, si := range entry.addrIndexes {
|
||||
outpoints, found := m.addrDescToTx[si.addrDesc]
|
||||
if found {
|
||||
newOutpoints := make([]Outpoint, 0, len(outpoints)-1)
|
||||
for _, o := range outpoints {
|
||||
if o.Txid != txid {
|
||||
newOutpoints = append(newOutpoints, o)
|
||||
}
|
||||
}
|
||||
if len(newOutpoints) > 0 {
|
||||
m.addrDescToTx[si.addrDesc] = newOutpoints
|
||||
} else {
|
||||
delete(m.addrDescToTx, si.addrDesc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetAllEntries returns all mempool entries sorted by fist seen time in descending order
|
||||
|
|
|
@ -19,7 +19,9 @@ type MempoolBitcoinType struct {
|
|||
func NewMempoolBitcoinType(chain BlockChain, workers int, subworkers int) *MempoolBitcoinType {
|
||||
m := &MempoolBitcoinType{
|
||||
BaseMempool: BaseMempool{
|
||||
chain: chain,
|
||||
chain: chain,
|
||||
txEntries: make(map[string]txEntry),
|
||||
addrDescToTx: make(map[string][]Outpoint),
|
||||
},
|
||||
chanTxid: make(chan string, 1),
|
||||
chanAddrIndex: make(chan txidio, 1),
|
||||
|
@ -137,29 +139,30 @@ func (m *MempoolBitcoinType) Resync() (int, error) {
|
|||
return 0, err
|
||||
}
|
||||
glog.V(2).Info("mempool: resync ", len(txs), " txs")
|
||||
// allocate slightly larger capacity of the maps
|
||||
newTxEntries := make(map[string]txEntry, len(m.txEntries)+5)
|
||||
newAddrDescToTx := make(map[string][]Outpoint, len(m.addrDescToTx)+5)
|
||||
dispatched := 0
|
||||
txTime := uint32(time.Now().Unix())
|
||||
onNewData := func(txid string, entry txEntry) {
|
||||
onNewEntry := func(txid string, entry txEntry) {
|
||||
if len(entry.addrIndexes) > 0 {
|
||||
newTxEntries[txid] = entry
|
||||
m.mux.Lock()
|
||||
m.txEntries[txid] = entry
|
||||
for _, si := range entry.addrIndexes {
|
||||
newAddrDescToTx[si.addrDesc] = append(newAddrDescToTx[si.addrDesc], Outpoint{txid, si.n})
|
||||
m.addrDescToTx[si.addrDesc] = append(m.addrDescToTx[si.addrDesc], Outpoint{txid, si.n})
|
||||
}
|
||||
m.mux.Unlock()
|
||||
}
|
||||
}
|
||||
txsMap := make(map[string]struct{}, len(txs))
|
||||
dispatched := 0
|
||||
txTime := uint32(time.Now().Unix())
|
||||
// get transaction in parallel using goroutines created in NewUTXOMempool
|
||||
for _, txid := range txs {
|
||||
io, exists := m.txEntries[txid]
|
||||
txsMap[txid] = struct{}{}
|
||||
_, exists := m.txEntries[txid]
|
||||
if !exists {
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
// store as many processed transactions as possible
|
||||
case tio := <-m.chanAddrIndex:
|
||||
onNewData(tio.txid, txEntry{tio.io, txTime})
|
||||
onNewEntry(tio.txid, txEntry{tio.io, txTime})
|
||||
dispatched--
|
||||
// send transaction to be processed
|
||||
case m.chanTxid <- txid:
|
||||
|
@ -167,15 +170,20 @@ func (m *MempoolBitcoinType) Resync() (int, error) {
|
|||
break loop
|
||||
}
|
||||
}
|
||||
} else {
|
||||
onNewData(txid, io)
|
||||
}
|
||||
}
|
||||
for i := 0; i < dispatched; i++ {
|
||||
tio := <-m.chanAddrIndex
|
||||
onNewData(tio.txid, txEntry{tio.io, txTime})
|
||||
onNewEntry(tio.txid, txEntry{tio.io, txTime})
|
||||
}
|
||||
|
||||
for txid, entry := range m.txEntries {
|
||||
if _, exists := txsMap[txid]; !exists {
|
||||
m.mux.Lock()
|
||||
m.removeEntryFromMempool(txid, entry)
|
||||
m.mux.Unlock()
|
||||
}
|
||||
}
|
||||
m.updateMappings(newTxEntries, newAddrDescToTx)
|
||||
glog.Info("mempool: resync finished in ", time.Since(start), ", ", len(m.txEntries), " transactions in mempool")
|
||||
return len(m.txEntries), nil
|
||||
}
|
||||
|
|
|
@ -132,26 +132,6 @@ func (m *MempoolEthereumType) AddTransactionToMempool(txid string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *MempoolEthereumType) removeEntryFromMempool(txid string, entry txEntry) {
|
||||
delete(m.txEntries, txid)
|
||||
for _, si := range entry.addrIndexes {
|
||||
outpoints, found := m.addrDescToTx[si.addrDesc]
|
||||
if found {
|
||||
newOutpoints := make([]Outpoint, 0, len(outpoints)-1)
|
||||
for _, o := range outpoints {
|
||||
if o.Txid != txid {
|
||||
newOutpoints = append(newOutpoints, o)
|
||||
}
|
||||
}
|
||||
if len(newOutpoints) > 0 {
|
||||
m.addrDescToTx[si.addrDesc] = newOutpoints
|
||||
} else {
|
||||
delete(m.addrDescToTx, si.addrDesc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveTransactionFromMempool removes transaction from mempool
|
||||
func (m *MempoolEthereumType) RemoveTransactionFromMempool(txid string) {
|
||||
m.mux.Lock()
|
||||
|
|
Loading…
Reference in New Issue