Revert "Update balances in extra goroutine during import block"

This reverts commit 099321126f.
The parallel processing of balances did not bring any speedup
(actually it was a little bit slower)
pull/68/head
Martin Boehm 2018-10-01 13:22:03 +02:00
parent f171fe8362
commit 776bebdf23
1 changed files with 52 additions and 115 deletions

View File

@ -42,20 +42,17 @@ type connectBlockStats struct {
// RocksDB handle
type RocksDB struct {
path string
db *gorocksdb.DB
wo *gorocksdb.WriteOptions
ro *gorocksdb.ReadOptions
cfh []*gorocksdb.ColumnFamilyHandle
chainParser bchain.BlockChainParser
is *common.InternalState
metrics *common.Metrics
cache *gorocksdb.Cache
maxOpenFiles int
cbs connectBlockStats
chanUpdateBalance chan updateBalanceData
chanUpdateBalanceResult chan error
updateBalancesMap map[string]*AddrBalance
path string
db *gorocksdb.DB
wo *gorocksdb.WriteOptions
ro *gorocksdb.ReadOptions
cfh []*gorocksdb.ColumnFamilyHandle
chainParser bchain.BlockChainParser
is *common.InternalState
metrics *common.Metrics
cache *gorocksdb.Cache
maxOpenFiles int
cbs connectBlockStats
}
const (
@ -93,20 +90,7 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha
db, cfh, err := openDB(path, c, maxOpenFiles)
wo := gorocksdb.NewDefaultWriteOptions()
ro := gorocksdb.NewDefaultReadOptions()
rdb := &RocksDB{
path: path,
db: db,
wo: wo,
ro: ro,
cfh: cfh,
chainParser: parser,
metrics: metrics,
cache: c,
maxOpenFiles: maxOpenFiles,
cbs: connectBlockStats{},
}
rdb.initUpdateBalancesWorker()
return rdb, nil
return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}}, nil
}
func (d *RocksDB) closeDB() error {
@ -290,8 +274,6 @@ func (d *RocksDB) writeBlock(block *bchain.Block, op int) error {
txAddressesMap := make(map[string]*TxAddresses)
balances := make(map[string]*AddrBalance)
if err := d.processAddressesUTXO(block, addresses, txAddressesMap, balances); err != nil {
// reinitialize balanceWorker so that there are no left balances in the queue
d.initUpdateBalancesWorker()
return err
}
if err := d.storeAddresses(wb, block.Height, addresses); err != nil {
@ -380,80 +362,7 @@ func (d *RocksDB) GetAndResetConnectBlockStats() string {
return s
}
type updateBalanceData struct {
valueSat big.Int
strAddrDesc string
addrDesc bchain.AddressDescriptor
processed, output bool
}
func (d *RocksDB) initUpdateBalancesWorker() {
if d.chanUpdateBalance != nil {
close(d.chanUpdateBalance)
}
d.chanUpdateBalance = make(chan updateBalanceData, 16)
d.chanUpdateBalanceResult = make(chan error, 16)
go d.updateBalancesWorker()
}
// updateBalancesWorker is a single worker used to update balances in parallel to processAddressesUTXO
func (d *RocksDB) updateBalancesWorker() {
var err error
for bd := range d.chanUpdateBalance {
ab, e := d.updateBalancesMap[bd.strAddrDesc]
if !e {
ab, err = d.GetAddrDescBalance(bd.addrDesc)
if err != nil {
d.chanUpdateBalanceResult <- err
continue
}
if ab == nil {
ab = &AddrBalance{}
}
d.updateBalancesMap[bd.strAddrDesc] = ab
d.cbs.balancesMiss++
} else {
d.cbs.balancesHit++
}
// add number of trx in balance only once, address can be multiple times in tx
if !bd.processed {
ab.Txs++
}
if bd.output {
ab.BalanceSat.Add(&ab.BalanceSat, &bd.valueSat)
} else {
ab.BalanceSat.Sub(&ab.BalanceSat, &bd.valueSat)
if ab.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&ab.BalanceSat, bd.addrDesc, "balance")
}
ab.SentSat.Add(&ab.SentSat, &bd.valueSat)
}
d.chanUpdateBalanceResult <- nil
}
}
func (d *RocksDB) dispatchUpdateBalance(dispatchedBalances int, valueSat *big.Int, strAddrDesc string, addrDesc bchain.AddressDescriptor, processed, output bool) (int, error) {
loop:
for {
select {
// process as many results as possible
case err := <-d.chanUpdateBalanceResult:
if err != nil {
return 0, err
}
dispatchedBalances--
// send input to be processed
case d.chanUpdateBalance <- updateBalanceData{*valueSat, strAddrDesc, addrDesc, processed, output}:
dispatchedBalances++
break loop
}
}
return dispatchedBalances, nil
}
func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string][]outpoint, txAddressesMap map[string]*TxAddresses, balances map[string]*AddrBalance) error {
d.updateBalancesMap = balances
dispatchedBalances := 0
blockTxIDs := make([][]byte, len(block.Txs))
blockTxAddresses := make([]*TxAddresses, len(block.Txs))
// first process all outputs so that inputs can point to txs in this block
@ -495,10 +404,25 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string
btxID: btxID,
index: int32(i),
})
dispatchedBalances, err = d.dispatchUpdateBalance(dispatchedBalances, &output.ValueSat, strAddrDesc, addrDesc, processed, true)
if err != nil {
return err
ab, e := balances[strAddrDesc]
if !e {
ab, err = d.GetAddrDescBalance(addrDesc)
if err != nil {
return err
}
if ab == nil {
ab = &AddrBalance{}
}
balances[strAddrDesc] = ab
d.cbs.balancesMiss++
} else {
d.cbs.balancesHit++
}
// add number of trx in balance only once, address can be multiple times in tx
if !processed {
ab.Txs++
}
ab.BalanceSat.Add(&ab.BalanceSat, &output.ValueSat)
}
}
// process inputs
@ -564,16 +488,29 @@ func (d *RocksDB) processAddressesUTXO(block *bchain.Block, addresses map[string
btxID: spendingTxid,
index: ^int32(i),
})
dispatchedBalances, err = d.dispatchUpdateBalance(dispatchedBalances, &ot.ValueSat, strAddrDesc, ot.AddrDesc, processed, false)
if err != nil {
return err
ab, e := balances[strAddrDesc]
if !e {
ab, err = d.GetAddrDescBalance(ot.AddrDesc)
if err != nil {
return err
}
if ab == nil {
ab = &AddrBalance{}
}
balances[strAddrDesc] = ab
d.cbs.balancesMiss++
} else {
d.cbs.balancesHit++
}
}
}
for i := 0; i < dispatchedBalances; i++ {
err := <-d.chanUpdateBalanceResult
if err != nil {
return err
// add number of trx in balance only once, address can be multiple times in tx
if !processed {
ab.Txs++
}
ab.BalanceSat.Sub(&ab.BalanceSat, &ot.ValueSat)
if ab.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&ab.BalanceSat, ot.AddrDesc, "balance")
}
ab.SentSat.Add(&ab.SentSat, &ot.ValueSat)
}
}
return nil