Get mempoool imput addresses concurrently

pull/7/head
Martin Boehm 2018-05-28 16:50:54 +02:00
parent e5d79b09bc
commit 919a26dfd6
1 changed files with 31 additions and 3 deletions

View File

@ -44,8 +44,18 @@ func NewUTXOMempool(chain BlockChain, workers int, subworkers int) *UTXOMempool
}
for i := 0; i < workers; i++ {
go func(i int) {
chanInput := make(chan *Vin, 1)
chanResult := make(chan *addrIndex, 1)
for j := 0; j < subworkers; j++ {
go func(j int) {
for input := range chanInput {
ai := m.getInputAddress(input)
chanResult <- ai
}
}(j)
}
for txid := range m.chanTxid {
io, ok := m.getMempoolTxAddrs(txid)
io, ok := m.getTxAddrs(txid, chanInput, chanResult)
if !ok {
io = []addrIndex{}
}
@ -101,7 +111,7 @@ func (m *UTXOMempool) getInputAddress(input *Vin) *addrIndex {
}
func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) {
func (m *UTXOMempool) getTxAddrs(txid string, chanInput chan *Vin, chanResult chan *addrIndex) ([]addrIndex, bool) {
tx, err := m.chain.GetTransactionForMempool(txid)
if err != nil {
glog.Error("cannot get transaction ", txid, ": ", err)
@ -122,11 +132,29 @@ func (m *UTXOMempool) getMempoolTxAddrs(txid string) ([]addrIndex, bool) {
m.onNewTxAddr(tx.Txid, output.ScriptPubKey.Addresses[0])
}
}
dispatched := 0
for _, input := range tx.Vin {
if input.Coinbase != "" {
continue
}
ai := m.getInputAddress(&input)
loop:
for {
select {
// store as many processed results as possible
case ai := <-chanResult:
if ai != nil {
io = append(io, *ai)
}
dispatched--
// send input to be processed
case chanInput <- &input:
dispatched++
break loop
}
}
}
for i := 0; i < dispatched; i++ {
ai := <-chanResult
if ai != nil {
io = append(io, *ai)
}