blockbook/db/rocksdb_ethereumtype.go

463 lines
14 KiB
Go

package db
import (
"bytes"
"encoding/hex"
vlq "github.com/bsm/go-vlq"
"github.com/golang/glog"
"github.com/juju/errors"
"github.com/tecbot/gorocksdb"
"spacecruft.org/spacecruft/blockbook/bchain"
"spacecruft.org/spacecruft/blockbook/bchain/coins/eth"
)
// AddrContract is Contract address with number of transactions done by given address
type AddrContract struct {
Contract bchain.AddressDescriptor
Txs uint
}
// AddrContracts contains number of transactions and contracts for an address
type AddrContracts struct {
TotalTxs uint
NonContractTxs uint
Contracts []AddrContract
}
func (d *RocksDB) storeAddressContracts(wb *gorocksdb.WriteBatch, acm map[string]*AddrContracts) error {
buf := make([]byte, 64)
varBuf := make([]byte, vlq.MaxLen64)
for addrDesc, acs := range acm {
// address with 0 contracts is removed from db - happens on disconnect
if acs == nil || (acs.NonContractTxs == 0 && len(acs.Contracts) == 0) {
wb.DeleteCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc))
} else {
buf = buf[:0]
l := packVaruint(acs.TotalTxs, varBuf)
buf = append(buf, varBuf[:l]...)
l = packVaruint(acs.NonContractTxs, varBuf)
buf = append(buf, varBuf[:l]...)
for _, ac := range acs.Contracts {
buf = append(buf, ac.Contract...)
l = packVaruint(ac.Txs, varBuf)
buf = append(buf, varBuf[:l]...)
}
wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf)
}
}
return nil
}
// GetAddrDescContracts returns AddrContracts for given addrDesc
func (d *RocksDB) GetAddrDescContracts(addrDesc bchain.AddressDescriptor) (*AddrContracts, error) {
val, err := d.db.GetCF(d.ro, d.cfh[cfAddressContracts], addrDesc)
if err != nil {
return nil, err
}
defer val.Free()
buf := val.Data()
if len(buf) == 0 {
return nil, nil
}
tt, l := unpackVaruint(buf)
buf = buf[l:]
nct, l := unpackVaruint(buf)
buf = buf[l:]
c := make([]AddrContract, 0, 4)
for len(buf) > 0 {
if len(buf) < eth.EthereumTypeAddressDescriptorLen {
return nil, errors.New("Invalid data stored in cfAddressContracts for AddrDesc " + addrDesc.String())
}
txs, l := unpackVaruint(buf[eth.EthereumTypeAddressDescriptorLen:])
contract := append(bchain.AddressDescriptor(nil), buf[:eth.EthereumTypeAddressDescriptorLen]...)
c = append(c, AddrContract{
Contract: contract,
Txs: txs,
})
buf = buf[eth.EthereumTypeAddressDescriptorLen+l:]
}
return &AddrContracts{
TotalTxs: tt,
NonContractTxs: nct,
Contracts: c,
}, nil
}
func findContractInAddressContracts(contract bchain.AddressDescriptor, contracts []AddrContract) (int, bool) {
for i := range contracts {
if bytes.Equal(contract, contracts[i].Contract) {
return i, true
}
}
return 0, false
}
func isZeroAddress(addrDesc bchain.AddressDescriptor) bool {
for _, b := range addrDesc {
if b != 0 {
return false
}
}
return true
}
func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.AddressDescriptor, btxID []byte, index int32, contract bchain.AddressDescriptor, addresses addressesMap, addressContracts map[string]*AddrContracts, addTxCount bool) error {
var err error
strAddrDesc := string(addrDesc)
ac, e := addressContracts[strAddrDesc]
if !e {
ac, err = d.GetAddrDescContracts(addrDesc)
if err != nil {
return err
}
if ac == nil {
ac = &AddrContracts{}
}
addressContracts[strAddrDesc] = ac
d.cbs.balancesMiss++
} else {
d.cbs.balancesHit++
}
if contract == nil {
if addTxCount {
ac.NonContractTxs++
}
} else {
// do not store contracts for 0x0000000000000000000000000000000000000000 address
if !isZeroAddress(addrDesc) {
// locate the contract and set i to the index in the array of contracts
i, found := findContractInAddressContracts(contract, ac.Contracts)
if !found {
i = len(ac.Contracts)
ac.Contracts = append(ac.Contracts, AddrContract{Contract: contract})
}
// index 0 is for ETH transfers, contract indexes start with 1
if index < 0 {
index = ^int32(i + 1)
} else {
index = int32(i + 1)
}
if addTxCount {
ac.Contracts[i].Txs++
}
}
}
counted := addToAddressesMap(addresses, strAddrDesc, btxID, index)
if !counted {
ac.TotalTxs++
}
return nil
}
type ethBlockTxContract struct {
addr, contract bchain.AddressDescriptor
}
type ethBlockTx struct {
btxID []byte
from, to bchain.AddressDescriptor
contracts []ethBlockTxContract
}
func (d *RocksDB) processAddressesEthereumType(block *bchain.Block, addresses addressesMap, addressContracts map[string]*AddrContracts) ([]ethBlockTx, error) {
blockTxs := make([]ethBlockTx, len(block.Txs))
for txi, tx := range block.Txs {
btxID, err := d.chainParser.PackTxid(tx.Txid)
if err != nil {
return nil, err
}
blockTx := &blockTxs[txi]
blockTx.btxID = btxID
var from, to bchain.AddressDescriptor
// there is only one output address in EthereumType transaction, store it in format txid 0
if len(tx.Vout) == 1 && len(tx.Vout[0].ScriptPubKey.Addresses) == 1 {
to, err = d.chainParser.GetAddrDescFromAddress(tx.Vout[0].ScriptPubKey.Addresses[0])
if err != nil {
// do not log ErrAddressMissing, transactions can be without to address (for example eth contracts)
if err != bchain.ErrAddressMissing {
glog.Warningf("rocksdb: addrDesc: %v - height %d, tx %v, output", err, block.Height, tx.Txid)
}
continue
}
if err = d.addToAddressesAndContractsEthereumType(to, btxID, 0, nil, addresses, addressContracts, true); err != nil {
return nil, err
}
blockTx.to = to
}
// there is only one input address in EthereumType transaction, store it in format txid ^0
if len(tx.Vin) == 1 && len(tx.Vin[0].Addresses) == 1 {
from, err = d.chainParser.GetAddrDescFromAddress(tx.Vin[0].Addresses[0])
if err != nil {
if err != bchain.ErrAddressMissing {
glog.Warningf("rocksdb: addrDesc: %v - height %d, tx %v, input", err, block.Height, tx.Txid)
}
continue
}
if err = d.addToAddressesAndContractsEthereumType(from, btxID, ^int32(0), nil, addresses, addressContracts, !bytes.Equal(from, to)); err != nil {
return nil, err
}
blockTx.from = from
}
// store erc20 transfers
erc20, err := d.chainParser.EthereumTypeGetErc20FromTx(&tx)
if err != nil {
glog.Warningf("rocksdb: GetErc20FromTx %v - height %d, tx %v", err, block.Height, tx.Txid)
}
blockTx.contracts = make([]ethBlockTxContract, len(erc20)*2)
j := 0
for i, t := range erc20 {
var contract, from, to bchain.AddressDescriptor
contract, err = d.chainParser.GetAddrDescFromAddress(t.Contract)
if err == nil {
from, err = d.chainParser.GetAddrDescFromAddress(t.From)
if err == nil {
to, err = d.chainParser.GetAddrDescFromAddress(t.To)
}
}
if err != nil {
glog.Warningf("rocksdb: GetErc20FromTx %v - height %d, tx %v, transfer %v", err, block.Height, tx.Txid, t)
continue
}
if err = d.addToAddressesAndContractsEthereumType(to, btxID, int32(i), contract, addresses, addressContracts, true); err != nil {
return nil, err
}
eq := bytes.Equal(from, to)
bc := &blockTx.contracts[j]
j++
bc.addr = from
bc.contract = contract
if err = d.addToAddressesAndContractsEthereumType(from, btxID, ^int32(i), contract, addresses, addressContracts, !eq); err != nil {
return nil, err
}
// add to address to blockTx.contracts only if it is different from from address
if !eq {
bc = &blockTx.contracts[j]
j++
bc.addr = to
bc.contract = contract
}
}
blockTx.contracts = blockTx.contracts[:j]
}
return blockTxs, nil
}
func (d *RocksDB) storeAndCleanupBlockTxsEthereumType(wb *gorocksdb.WriteBatch, block *bchain.Block, blockTxs []ethBlockTx) error {
pl := d.chainParser.PackedTxidLen()
buf := make([]byte, 0, (pl+2*eth.EthereumTypeAddressDescriptorLen)*len(blockTxs))
varBuf := make([]byte, vlq.MaxLen64)
zeroAddress := make([]byte, eth.EthereumTypeAddressDescriptorLen)
appendAddress := func(a bchain.AddressDescriptor) {
if len(a) != eth.EthereumTypeAddressDescriptorLen {
buf = append(buf, zeroAddress...)
} else {
buf = append(buf, a...)
}
}
for i := range blockTxs {
blockTx := &blockTxs[i]
buf = append(buf, blockTx.btxID...)
appendAddress(blockTx.from)
appendAddress(blockTx.to)
l := packVaruint(uint(len(blockTx.contracts)), varBuf)
buf = append(buf, varBuf[:l]...)
for j := range blockTx.contracts {
c := &blockTx.contracts[j]
appendAddress(c.addr)
appendAddress(c.contract)
}
}
key := packUint(block.Height)
wb.PutCF(d.cfh[cfBlockTxs], key, buf)
return d.cleanupBlockTxs(wb, block)
}
func (d *RocksDB) getBlockTxsEthereumType(height uint32) ([]ethBlockTx, error) {
pl := d.chainParser.PackedTxidLen()
val, err := d.db.GetCF(d.ro, d.cfh[cfBlockTxs], packUint(height))
if err != nil {
return nil, err
}
defer val.Free()
buf := val.Data()
// nil data means the key was not found in DB
if buf == nil {
return nil, nil
}
// buf can be empty slice, this means the block did not contain any transactions
bt := make([]ethBlockTx, 0, 8)
getAddress := func(i int) (bchain.AddressDescriptor, int, error) {
if len(buf)-i < eth.EthereumTypeAddressDescriptorLen {
glog.Error("rocksdb: Inconsistent data in blockTxs ", hex.EncodeToString(buf))
return nil, 0, errors.New("Inconsistent data in blockTxs")
}
a := append(bchain.AddressDescriptor(nil), buf[i:i+eth.EthereumTypeAddressDescriptorLen]...)
// return null addresses as nil
for _, b := range a {
if b != 0 {
return a, i + eth.EthereumTypeAddressDescriptorLen, nil
}
}
return nil, i + eth.EthereumTypeAddressDescriptorLen, nil
}
var from, to bchain.AddressDescriptor
for i := 0; i < len(buf); {
if len(buf)-i < pl {
glog.Error("rocksdb: Inconsistent data in blockTxs ", hex.EncodeToString(buf))
return nil, errors.New("Inconsistent data in blockTxs")
}
txid := append([]byte(nil), buf[i:i+pl]...)
i += pl
from, i, err = getAddress(i)
if err != nil {
return nil, err
}
to, i, err = getAddress(i)
if err != nil {
return nil, err
}
cc, l := unpackVaruint(buf[i:])
i += l
contracts := make([]ethBlockTxContract, cc)
for j := range contracts {
contracts[j].addr, i, err = getAddress(i)
if err != nil {
return nil, err
}
contracts[j].contract, i, err = getAddress(i)
if err != nil {
return nil, err
}
}
bt = append(bt, ethBlockTx{
btxID: txid,
from: from,
to: to,
contracts: contracts,
})
}
return bt, nil
}
func (d *RocksDB) disconnectBlockTxsEthereumType(wb *gorocksdb.WriteBatch, height uint32, blockTxs []ethBlockTx, contracts map[string]*AddrContracts) error {
glog.Info("Disconnecting block ", height, " containing ", len(blockTxs), " transactions")
addresses := make(map[string]map[string]struct{})
disconnectAddress := func(btxID []byte, addrDesc, contract bchain.AddressDescriptor) error {
var err error
// do not process empty address
if len(addrDesc) == 0 {
return nil
}
s := string(addrDesc)
txid := string(btxID)
// find if tx for this address was already encountered
mtx, ftx := addresses[s]
if !ftx {
mtx = make(map[string]struct{})
mtx[txid] = struct{}{}
addresses[s] = mtx
} else {
_, ftx = mtx[txid]
if !ftx {
mtx[txid] = struct{}{}
}
}
c, fc := contracts[s]
if !fc {
c, err = d.GetAddrDescContracts(addrDesc)
if err != nil {
return err
}
contracts[s] = c
}
if c != nil {
if !ftx {
c.TotalTxs--
}
if contract == nil {
if c.NonContractTxs > 0 {
c.NonContractTxs--
} else {
glog.Warning("AddressContracts ", addrDesc, ", EthTxs would be negative, tx ", hex.EncodeToString(btxID))
}
} else {
i, found := findContractInAddressContracts(contract, c.Contracts)
if found {
if c.Contracts[i].Txs > 0 {
c.Contracts[i].Txs--
if c.Contracts[i].Txs == 0 {
c.Contracts = append(c.Contracts[:i], c.Contracts[i+1:]...)
}
} else {
glog.Warning("AddressContracts ", addrDesc, ", contract ", i, " Txs would be negative, tx ", hex.EncodeToString(btxID))
}
} else {
glog.Warning("AddressContracts ", addrDesc, ", contract ", contract, " not found, tx ", hex.EncodeToString(btxID))
}
}
} else {
glog.Warning("AddressContracts ", addrDesc, " not found, tx ", hex.EncodeToString(btxID))
}
return nil
}
for i := range blockTxs {
blockTx := &blockTxs[i]
if err := disconnectAddress(blockTx.btxID, blockTx.from, nil); err != nil {
return err
}
// if from==to, tx is counted only once and does not have to be disconnected again
if !bytes.Equal(blockTx.from, blockTx.to) {
if err := disconnectAddress(blockTx.btxID, blockTx.to, nil); err != nil {
return err
}
}
for _, c := range blockTx.contracts {
if err := disconnectAddress(blockTx.btxID, c.addr, c.contract); err != nil {
return err
}
}
wb.DeleteCF(d.cfh[cfTransactions], blockTx.btxID)
}
for a := range addresses {
key := packAddressKey([]byte(a), height)
wb.DeleteCF(d.cfh[cfAddresses], key)
}
return nil
}
// DisconnectBlockRangeEthereumType removes all data belonging to blocks in range lower-higher
// it is able to disconnect only blocks for which there are data in the blockTxs column
func (d *RocksDB) DisconnectBlockRangeEthereumType(lower uint32, higher uint32) error {
blocks := make([][]ethBlockTx, higher-lower+1)
for height := lower; height <= higher; height++ {
blockTxs, err := d.getBlockTxsEthereumType(height)
if err != nil {
return err
}
// nil blockTxs means blockTxs were not found in db
if blockTxs == nil {
return errors.Errorf("Cannot disconnect blocks with height %v and lower. It is necessary to rebuild index.", height)
}
blocks[height-lower] = blockTxs
}
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
contracts := make(map[string]*AddrContracts)
for height := higher; height >= lower; height-- {
if err := d.disconnectBlockTxsEthereumType(wb, height, blocks[height-lower], contracts); err != nil {
return err
}
key := packUint(height)
wb.DeleteCF(d.cfh[cfBlockTxs], key)
wb.DeleteCF(d.cfh[cfHeight], key)
}
d.storeAddressContracts(wb, contracts)
err := d.db.Write(d.wo, wb)
if err == nil {
d.is.RemoveLastBlockTimes(int(higher-lower) + 1)
glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher)
}
return err
}