blockbook/db/rocksdb.go

2153 lines
62 KiB
Go

package db
import (
"bytes"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"os"
"path/filepath"
"sort"
"strconv"
"time"
"unsafe"
vlq "github.com/bsm/go-vlq"
"github.com/golang/glog"
"github.com/juju/errors"
"github.com/tecbot/gorocksdb"
"spacecruft.org/spacecruft/blockbook/bchain"
"spacecruft.org/spacecruft/blockbook/common"
)
const dbVersion = 5
const packedHeightBytes = 4
const maxAddrDescLen = 1024
// iterator creates snapshot, which takes lots of resources
// when doing huge scan, it is better to close it and reopen from time to time to free the resources
const refreshIterator = 5000000
// FiatRatesTimeFormat is a format string for storing FiatRates timestamps in rocksdb
const FiatRatesTimeFormat = "20060102150405" // YYYYMMDDhhmmss
// CurrencyRatesTicker contains coin ticker data fetched from API
type CurrencyRatesTicker struct {
Timestamp *time.Time // return as unix timestamp in API
Rates map[string]float64
}
// ResultTickerAsString contains formatted CurrencyRatesTicker data
type ResultTickerAsString struct {
Timestamp int64 `json:"ts,omitempty"`
Rates map[string]float64 `json:"rates"`
Error string `json:"error,omitempty"`
}
// ResultTickersAsString contains a formatted CurrencyRatesTicker list
type ResultTickersAsString struct {
Tickers []ResultTickerAsString `json:"tickers"`
}
// ResultTickerListAsString contains formatted data about available currency tickers
type ResultTickerListAsString struct {
Timestamp int64 `json:"ts,omitempty"`
Tickers []string `json:"available_currencies"`
Error string `json:"error,omitempty"`
}
// RepairRocksDB calls RocksDb db repair function
func RepairRocksDB(name string) error {
glog.Infof("rocksdb: repair")
opts := gorocksdb.NewDefaultOptions()
return gorocksdb.RepairDb(name, opts)
}
type connectBlockStats struct {
txAddressesHit int
txAddressesMiss int
balancesHit int
balancesMiss int
}
// AddressBalanceDetail specifies what data are returned by GetAddressBalance
type AddressBalanceDetail int
const (
// AddressBalanceDetailNoUTXO returns address balance without utxos
AddressBalanceDetailNoUTXO = 0
// AddressBalanceDetailUTXO returns address balance with utxos
AddressBalanceDetailUTXO = 1
// addressBalanceDetailUTXOIndexed returns address balance with utxos and index for updates, used only internally
addressBalanceDetailUTXOIndexed = 2
)
// RocksDB handle
type RocksDB struct {
path string
db *gorocksdb.DB
wo *gorocksdb.WriteOptions
ro *gorocksdb.ReadOptions
cfh []*gorocksdb.ColumnFamilyHandle
chainParser bchain.BlockChainParser
is *common.InternalState
metrics *common.Metrics
cache *gorocksdb.Cache
maxOpenFiles int
cbs connectBlockStats
}
const (
cfDefault = iota
cfHeight
cfAddresses
cfBlockTxs
cfTransactions
cfFiatRates
// BitcoinType
cfAddressBalance
cfTxAddresses
// EthereumType
cfAddressContracts = cfAddressBalance
)
// common columns
var cfNames []string
var cfBaseNames = []string{"default", "height", "addresses", "blockTxs", "transactions", "fiatRates"}
// type specific columns
var cfNamesBitcoinType = []string{"addressBalance", "txAddresses"}
var cfNamesEthereumType = []string{"addressContracts"}
func openDB(path string, c *gorocksdb.Cache, openFiles int) (*gorocksdb.DB, []*gorocksdb.ColumnFamilyHandle, error) {
// opts with bloom filter
opts := createAndSetDBOptions(10, c, openFiles)
// opts for addresses without bloom filter
// from documentation: if most of your queries are executed using iterators, you shouldn't set bloom filter
optsAddresses := createAndSetDBOptions(0, c, openFiles)
// default, height, addresses, blockTxids, transactions
cfOptions := []*gorocksdb.Options{opts, opts, optsAddresses, opts, opts, opts}
// append type specific options
count := len(cfNames) - len(cfOptions)
for i := 0; i < count; i++ {
cfOptions = append(cfOptions, opts)
}
db, cfh, err := gorocksdb.OpenDbColumnFamilies(opts, path, cfNames, cfOptions)
if err != nil {
return nil, nil, err
}
return db, cfh, nil
}
// NewRocksDB opens an internal handle to RocksDB environment. Close
// needs to be called to release it.
func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockChainParser, metrics *common.Metrics) (d *RocksDB, err error) {
glog.Infof("rocksdb: opening %s, required data version %v, cache size %v, max open files %v", path, dbVersion, cacheSize, maxOpenFiles)
cfNames = append([]string{}, cfBaseNames...)
chainType := parser.GetChainType()
if chainType == bchain.ChainBitcoinType {
cfNames = append(cfNames, cfNamesBitcoinType...)
} else if chainType == bchain.ChainEthereumType {
cfNames = append(cfNames, cfNamesEthereumType...)
} else {
return nil, errors.New("Unknown chain type")
}
c := gorocksdb.NewLRUCache(uint64(cacheSize))
db, cfh, err := openDB(path, c, maxOpenFiles)
if err != nil {
return nil, err
}
wo := gorocksdb.NewDefaultWriteOptions()
ro := gorocksdb.NewDefaultReadOptions()
return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}}, nil
}
func (d *RocksDB) closeDB() error {
for _, h := range d.cfh {
h.Destroy()
}
d.db.Close()
d.db = nil
return nil
}
// FiatRatesConvertDate checks if the date is in correct format and returns the Time object.
// Possible formats are: YYYYMMDDhhmmss, YYYYMMDDhhmm, YYYYMMDDhh, YYYYMMDD
func FiatRatesConvertDate(date string) (*time.Time, error) {
for format := FiatRatesTimeFormat; len(format) >= 8; format = format[:len(format)-2] {
convertedDate, err := time.Parse(format, date)
if err == nil {
return &convertedDate, nil
}
}
msg := "Date \"" + date + "\" does not match any of available formats. "
msg += "Possible formats are: YYYYMMDDhhmmss, YYYYMMDDhhmm, YYYYMMDDhh, YYYYMMDD"
return nil, errors.New(msg)
}
// FiatRatesStoreTicker stores ticker data at the specified time
func (d *RocksDB) FiatRatesStoreTicker(ticker *CurrencyRatesTicker) error {
if len(ticker.Rates) == 0 {
return errors.New("Error storing ticker: empty rates")
} else if ticker.Timestamp == nil {
return errors.New("Error storing ticker: empty timestamp")
}
ratesMarshalled, err := json.Marshal(ticker.Rates)
if err != nil {
glog.Error("Error marshalling ticker rates: ", err)
return err
}
timeFormatted := ticker.Timestamp.UTC().Format(FiatRatesTimeFormat)
err = d.db.PutCF(d.wo, d.cfh[cfFiatRates], []byte(timeFormatted), ratesMarshalled)
if err != nil {
glog.Error("Error storing ticker: ", err)
return err
}
return nil
}
// FiatRatesFindTicker gets FiatRates data closest to the specified timestamp
func (d *RocksDB) FiatRatesFindTicker(tickerTime *time.Time) (*CurrencyRatesTicker, error) {
ticker := &CurrencyRatesTicker{}
tickerTimeFormatted := tickerTime.UTC().Format(FiatRatesTimeFormat)
it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates])
defer it.Close()
for it.Seek([]byte(tickerTimeFormatted)); it.Valid(); it.Next() {
timeObj, err := time.Parse(FiatRatesTimeFormat, string(it.Key().Data()))
if err != nil {
glog.Error("FiatRatesFindTicker time parse error: ", err)
return nil, err
}
timeObj = timeObj.UTC()
ticker.Timestamp = &timeObj
err = json.Unmarshal(it.Value().Data(), &ticker.Rates)
if err != nil {
glog.Error("FiatRatesFindTicker error unpacking rates: ", err)
return nil, err
}
break
}
if err := it.Err(); err != nil {
glog.Error("FiatRatesFindTicker Iterator error: ", err)
return nil, err
}
if !it.Valid() {
return nil, nil // ticker not found
}
return ticker, nil
}
// FiatRatesFindLastTicker gets the last FiatRates record
func (d *RocksDB) FiatRatesFindLastTicker() (*CurrencyRatesTicker, error) {
ticker := &CurrencyRatesTicker{}
it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates])
defer it.Close()
for it.SeekToLast(); it.Valid(); it.Next() {
timeObj, err := time.Parse(FiatRatesTimeFormat, string(it.Key().Data()))
if err != nil {
glog.Error("FiatRatesFindTicker time parse error: ", err)
return nil, err
}
timeObj = timeObj.UTC()
ticker.Timestamp = &timeObj
err = json.Unmarshal(it.Value().Data(), &ticker.Rates)
if err != nil {
glog.Error("FiatRatesFindTicker error unpacking rates: ", err)
return nil, err
}
break
}
if err := it.Err(); err != nil {
glog.Error("FiatRatesFindLastTicker Iterator error: ", err)
return ticker, err
}
if !it.Valid() {
return nil, nil // ticker not found
}
return ticker, nil
}
// Close releases the RocksDB environment opened in NewRocksDB.
func (d *RocksDB) Close() error {
if d.db != nil {
// store the internal state of the app
if d.is != nil && d.is.DbState == common.DbStateOpen {
d.is.DbState = common.DbStateClosed
if err := d.StoreInternalState(d.is); err != nil {
glog.Info("internalState: ", err)
}
}
glog.Infof("rocksdb: close")
d.closeDB()
d.wo.Destroy()
d.ro.Destroy()
}
return nil
}
// Reopen reopens the database
// It closes and reopens db, nobody can access the database during the operation!
func (d *RocksDB) Reopen() error {
err := d.closeDB()
if err != nil {
return err
}
d.db = nil
db, cfh, err := openDB(d.path, d.cache, d.maxOpenFiles)
if err != nil {
return err
}
d.db, d.cfh = db, cfh
return nil
}
func atoUint64(s string) uint64 {
i, err := strconv.Atoi(s)
if err != nil {
return 0
}
return uint64(i)
}
// GetMemoryStats returns memory usage statistics as reported by RocksDB
func (d *RocksDB) GetMemoryStats() string {
var total, indexAndFilter, memtable uint64
type columnStats struct {
name string
indexAndFilter string
memtable string
}
cs := make([]columnStats, len(cfNames))
for i := 0; i < len(cfNames); i++ {
cs[i].name = cfNames[i]
cs[i].indexAndFilter = d.db.GetPropertyCF("rocksdb.estimate-table-readers-mem", d.cfh[i])
cs[i].memtable = d.db.GetPropertyCF("rocksdb.cur-size-all-mem-tables", d.cfh[i])
indexAndFilter += atoUint64(cs[i].indexAndFilter)
memtable += atoUint64(cs[i].memtable)
}
m := struct {
cacheUsage uint64
pinnedCacheUsage uint64
columns []columnStats
}{
cacheUsage: d.cache.GetUsage(),
pinnedCacheUsage: d.cache.GetPinnedUsage(),
columns: cs,
}
total = m.cacheUsage + indexAndFilter + memtable
return fmt.Sprintf("Total %d, indexAndFilter %d, memtable %d, %+v", total, indexAndFilter, memtable, m)
}
// StopIteration is returned by callback function to signal stop of iteration
type StopIteration struct{}
func (e *StopIteration) Error() string {
return ""
}
// GetTransactionsCallback is called by GetTransactions/GetAddrDescTransactions for each found tx
// indexes contain array of indexes (input negative, output positive) in tx where is given address
type GetTransactionsCallback func(txid string, height uint32, indexes []int32) error
// GetTransactions finds all input/output transactions for address
// Transaction are passed to callback function.
func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn GetTransactionsCallback) (err error) {
if glog.V(1) {
glog.Infof("rocksdb: address get %s %d-%d ", address, lower, higher)
}
addrDesc, err := d.chainParser.GetAddrDescFromAddress(address)
if err != nil {
return err
}
return d.GetAddrDescTransactions(addrDesc, lower, higher, fn)
}
// GetAddrDescTransactions finds all input/output transactions for address descriptor
// Transaction are passed to callback function in the order from newest block to the oldest
func (d *RocksDB) GetAddrDescTransactions(addrDesc bchain.AddressDescriptor, lower uint32, higher uint32, fn GetTransactionsCallback) (err error) {
txidUnpackedLen := d.chainParser.PackedTxidLen()
addrDescLen := len(addrDesc)
startKey := packAddressKey(addrDesc, higher)
stopKey := packAddressKey(addrDesc, lower)
indexes := make([]int32, 0, 16)
it := d.db.NewIteratorCF(d.ro, d.cfh[cfAddresses])
defer it.Close()
for it.Seek(startKey); it.Valid(); it.Next() {
key := it.Key().Data()
if bytes.Compare(key, stopKey) > 0 {
break
}
if len(key) != addrDescLen+packedHeightBytes {
if glog.V(2) {
glog.Warningf("rocksdb: addrDesc %s - mixed with %s", addrDesc, hex.EncodeToString(key))
}
continue
}
val := it.Value().Data()
if glog.V(2) {
glog.Infof("rocksdb: addresses %s: %s", hex.EncodeToString(key), hex.EncodeToString(val))
}
_, height, err := unpackAddressKey(key)
if err != nil {
return err
}
for len(val) > txidUnpackedLen {
tx, err := d.chainParser.UnpackTxid(val[:txidUnpackedLen])
if err != nil {
return err
}
indexes = indexes[:0]
val = val[txidUnpackedLen:]
for {
index, l := unpackVarint32(val)
indexes = append(indexes, index>>1)
val = val[l:]
if index&1 == 1 {
break
} else if len(val) == 0 {
glog.Warningf("rocksdb: addresses contain incorrect data %s: %s", hex.EncodeToString(key), hex.EncodeToString(val))
break
}
}
if err := fn(tx, height, indexes); err != nil {
if _, ok := err.(*StopIteration); ok {
return nil
}
return err
}
}
if len(val) != 0 {
glog.Warningf("rocksdb: addresses contain incorrect data %s: %s", hex.EncodeToString(key), hex.EncodeToString(val))
}
}
return nil
}
const (
opInsert = 0
opDelete = 1
)
// ConnectBlock indexes addresses in the block and stores them in db
func (d *RocksDB) ConnectBlock(block *bchain.Block) error {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
if glog.V(2) {
glog.Infof("rocksdb: insert %d %s", block.Height, block.Hash)
}
chainType := d.chainParser.GetChainType()
if err := d.writeHeightFromBlock(wb, block, opInsert); err != nil {
return err
}
addresses := make(addressesMap)
if chainType == bchain.ChainBitcoinType {
txAddressesMap := make(map[string]*TxAddresses)
balances := make(map[string]*AddrBalance)
if err := d.processAddressesBitcoinType(block, addresses, txAddressesMap, balances); err != nil {
return err
}
if err := d.storeTxAddresses(wb, txAddressesMap); err != nil {
return err
}
if err := d.storeBalances(wb, balances); err != nil {
return err
}
if err := d.storeAndCleanupBlockTxs(wb, block); err != nil {
return err
}
} else if chainType == bchain.ChainEthereumType {
addressContracts := make(map[string]*AddrContracts)
blockTxs, err := d.processAddressesEthereumType(block, addresses, addressContracts)
if err != nil {
return err
}
if err := d.storeAddressContracts(wb, addressContracts); err != nil {
return err
}
if err := d.storeAndCleanupBlockTxsEthereumType(wb, block, blockTxs); err != nil {
return err
}
} else {
return errors.New("Unknown chain type")
}
if err := d.storeAddresses(wb, block.Height, addresses); err != nil {
return err
}
if err := d.db.Write(d.wo, wb); err != nil {
return err
}
d.is.AppendBlockTime(uint32(block.Time))
return nil
}
// Addresses index
type txIndexes struct {
btxID []byte
indexes []int32
}
// addressesMap is a map of addresses in a block
// each address contains a slice of transactions with indexes where the address appears
// slice is used instead of map so that order is defined and also search in case of few items
type addressesMap map[string][]txIndexes
type outpoint struct {
btxID []byte
index int32
}
// TxInput holds input data of the transaction in TxAddresses
type TxInput struct {
AddrDesc bchain.AddressDescriptor
ValueSat big.Int
}
// Addresses converts AddressDescriptor of the input to array of strings
func (ti *TxInput) Addresses(p bchain.BlockChainParser) ([]string, bool, error) {
return p.GetAddressesFromAddrDesc(ti.AddrDesc)
}
// TxOutput holds output data of the transaction in TxAddresses
type TxOutput struct {
AddrDesc bchain.AddressDescriptor
Spent bool
ValueSat big.Int
}
// Addresses converts AddressDescriptor of the output to array of strings
func (to *TxOutput) Addresses(p bchain.BlockChainParser) ([]string, bool, error) {
return p.GetAddressesFromAddrDesc(to.AddrDesc)
}
// TxAddresses stores transaction inputs and outputs with amounts
type TxAddresses struct {
Height uint32
Inputs []TxInput
Outputs []TxOutput
}
// Utxo holds information about unspent transaction output
type Utxo struct {
BtxID []byte
Vout int32
Height uint32
ValueSat big.Int
}
// AddrBalance stores number of transactions and balances of an address
type AddrBalance struct {
Txs uint32
SentSat big.Int
BalanceSat big.Int
Utxos []Utxo
utxosMap map[string]int
}
// ReceivedSat computes received amount from total balance and sent amount
func (ab *AddrBalance) ReceivedSat() *big.Int {
var r big.Int
r.Add(&ab.BalanceSat, &ab.SentSat)
return &r
}
// addUtxo
func (ab *AddrBalance) addUtxo(u *Utxo) {
ab.Utxos = append(ab.Utxos, *u)
ab.manageUtxoMap(u)
}
func (ab *AddrBalance) manageUtxoMap(u *Utxo) {
l := len(ab.Utxos)
if l >= 16 {
if len(ab.utxosMap) == 0 {
ab.utxosMap = make(map[string]int, 32)
for i := 0; i < l; i++ {
s := string(ab.Utxos[i].BtxID)
if _, e := ab.utxosMap[s]; !e {
ab.utxosMap[s] = i
}
}
} else {
s := string(u.BtxID)
if _, e := ab.utxosMap[s]; !e {
ab.utxosMap[s] = l - 1
}
}
}
}
// on disconnect, the added utxos must be inserted in the right position so that utxosMap index works
func (ab *AddrBalance) addUtxoInDisconnect(u *Utxo) {
insert := -1
if len(ab.utxosMap) > 0 {
if i, e := ab.utxosMap[string(u.BtxID)]; e {
insert = i
}
} else {
for i := range ab.Utxos {
utxo := &ab.Utxos[i]
if *(*int)(unsafe.Pointer(&utxo.BtxID[0])) == *(*int)(unsafe.Pointer(&u.BtxID[0])) && bytes.Equal(utxo.BtxID, u.BtxID) {
insert = i
break
}
}
}
if insert > -1 {
// check if it is necessary to insert the utxo into the array
for i := insert; i < len(ab.Utxos); i++ {
utxo := &ab.Utxos[i]
// either the vout is greater than the inserted vout or it is a different tx
if utxo.Vout > u.Vout || *(*int)(unsafe.Pointer(&utxo.BtxID[0])) != *(*int)(unsafe.Pointer(&u.BtxID[0])) || !bytes.Equal(utxo.BtxID, u.BtxID) {
// found the right place, insert the utxo
ab.Utxos = append(ab.Utxos, *u)
copy(ab.Utxos[i+1:], ab.Utxos[i:])
ab.Utxos[i] = *u
// reset utxosMap after insert, the index will have to be rebuilt if needed
ab.utxosMap = nil
return
}
}
}
ab.Utxos = append(ab.Utxos, *u)
ab.manageUtxoMap(u)
}
// markUtxoAsSpent finds outpoint btxID:vout in utxos and marks it as spent
// for small number of utxos the linear search is done, for larger number there is a hashmap index
// it is much faster than removing the utxo from the slice as it would cause in memory reallocations
func (ab *AddrBalance) markUtxoAsSpent(btxID []byte, vout int32) {
if len(ab.utxosMap) == 0 {
for i := range ab.Utxos {
utxo := &ab.Utxos[i]
if utxo.Vout == vout && *(*int)(unsafe.Pointer(&utxo.BtxID[0])) == *(*int)(unsafe.Pointer(&btxID[0])) && bytes.Equal(utxo.BtxID, btxID) {
// mark utxo as spent by setting vout=-1
utxo.Vout = -1
return
}
}
} else {
if i, e := ab.utxosMap[string(btxID)]; e {
l := len(ab.Utxos)
for ; i < l; i++ {
utxo := &ab.Utxos[i]
if utxo.Vout == vout {
if bytes.Equal(utxo.BtxID, btxID) {
// mark utxo as spent by setting vout=-1
utxo.Vout = -1
return
}
break
}
}
}
}
glog.Errorf("Utxo %s:%d not found, utxosMap size %d", hex.EncodeToString(btxID), vout, len(ab.utxosMap))
}
type blockTxs struct {
btxID []byte
inputs []outpoint
}
func (d *RocksDB) resetValueSatToZero(valueSat *big.Int, addrDesc bchain.AddressDescriptor, logText string) {
ad, _, err := d.chainParser.GetAddressesFromAddrDesc(addrDesc)
if err != nil {
glog.Warningf("rocksdb: unparsable address hex '%v' reached negative %s %v, resetting to 0. Parser error %v", addrDesc, logText, valueSat.String(), err)
} else {
glog.Warningf("rocksdb: address %v hex '%v' reached negative %s %v, resetting to 0", ad, addrDesc, logText, valueSat.String())
}
valueSat.SetInt64(0)
}
// GetAndResetConnectBlockStats gets statistics about cache usage in connect blocks and resets the counters
func (d *RocksDB) GetAndResetConnectBlockStats() string {
s := fmt.Sprintf("%+v", d.cbs)
d.cbs = connectBlockStats{}
return s
}
func (d *RocksDB) processAddressesBitcoinType(block *bchain.Block, addresses addressesMap, txAddressesMap map[string]*TxAddresses, balances map[string]*AddrBalance) error {
blockTxIDs := make([][]byte, len(block.Txs))
blockTxAddresses := make([]*TxAddresses, len(block.Txs))
// first process all outputs so that inputs can refer to txs in this block
for txi := range block.Txs {
tx := &block.Txs[txi]
btxID, err := d.chainParser.PackTxid(tx.Txid)
if err != nil {
return err
}
blockTxIDs[txi] = btxID
ta := TxAddresses{Height: block.Height}
ta.Outputs = make([]TxOutput, len(tx.Vout))
txAddressesMap[string(btxID)] = &ta
blockTxAddresses[txi] = &ta
for i, output := range tx.Vout {
tao := &ta.Outputs[i]
tao.ValueSat = output.ValueSat
addrDesc, err := d.chainParser.GetAddrDescFromVout(&output)
if err != nil || len(addrDesc) == 0 || len(addrDesc) > maxAddrDescLen {
if err != nil {
// do not log ErrAddressMissing, transactions can be without to address (for example eth contracts)
if err != bchain.ErrAddressMissing {
glog.Warningf("rocksdb: addrDesc: %v - height %d, tx %v, output %v, error %v", err, block.Height, tx.Txid, output, err)
}
} else {
glog.V(1).Infof("rocksdb: height %d, tx %v, vout %v, skipping addrDesc of length %d", block.Height, tx.Txid, i, len(addrDesc))
}
continue
}
tao.AddrDesc = addrDesc
if d.chainParser.IsAddrDescIndexable(addrDesc) {
strAddrDesc := string(addrDesc)
balance, e := balances[strAddrDesc]
if !e {
balance, err = d.GetAddrDescBalance(addrDesc, addressBalanceDetailUTXOIndexed)
if err != nil {
return err
}
if balance == nil {
balance = &AddrBalance{}
}
balances[strAddrDesc] = balance
d.cbs.balancesMiss++
} else {
d.cbs.balancesHit++
}
balance.BalanceSat.Add(&balance.BalanceSat, &output.ValueSat)
balance.addUtxo(&Utxo{
BtxID: btxID,
Vout: int32(i),
Height: block.Height,
ValueSat: output.ValueSat,
})
counted := addToAddressesMap(addresses, strAddrDesc, btxID, int32(i))
if !counted {
balance.Txs++
}
}
}
}
// process inputs
for txi := range block.Txs {
tx := &block.Txs[txi]
spendingTxid := blockTxIDs[txi]
ta := blockTxAddresses[txi]
ta.Inputs = make([]TxInput, len(tx.Vin))
logged := false
for i, input := range tx.Vin {
tai := &ta.Inputs[i]
btxID, err := d.chainParser.PackTxid(input.Txid)
if err != nil {
// do not process inputs without input txid
if err == bchain.ErrTxidMissing {
continue
}
return err
}
stxID := string(btxID)
ita, e := txAddressesMap[stxID]
if !e {
ita, err = d.getTxAddresses(btxID)
if err != nil {
return err
}
if ita == nil {
// allow parser to process unknown input, some coins may implement special handling, default is to log warning
tai.AddrDesc = d.chainParser.GetAddrDescForUnknownInput(tx, i)
continue
}
txAddressesMap[stxID] = ita
d.cbs.txAddressesMiss++
} else {
d.cbs.txAddressesHit++
}
if len(ita.Outputs) <= int(input.Vout) {
glog.Warningf("rocksdb: height %d, tx %v, input tx %v vout %v is out of bounds of stored tx", block.Height, tx.Txid, input.Txid, input.Vout)
continue
}
spentOutput := &ita.Outputs[int(input.Vout)]
if spentOutput.Spent {
glog.Warningf("rocksdb: height %d, tx %v, input tx %v vout %v is double spend", block.Height, tx.Txid, input.Txid, input.Vout)
}
tai.AddrDesc = spentOutput.AddrDesc
tai.ValueSat = spentOutput.ValueSat
// mark the output as spent in tx
spentOutput.Spent = true
if len(spentOutput.AddrDesc) == 0 {
if !logged {
glog.V(1).Infof("rocksdb: height %d, tx %v, input tx %v vout %v skipping empty address", block.Height, tx.Txid, input.Txid, input.Vout)
logged = true
}
continue
}
if d.chainParser.IsAddrDescIndexable(spentOutput.AddrDesc) {
strAddrDesc := string(spentOutput.AddrDesc)
balance, e := balances[strAddrDesc]
if !e {
balance, err = d.GetAddrDescBalance(spentOutput.AddrDesc, addressBalanceDetailUTXOIndexed)
if err != nil {
return err
}
if balance == nil {
balance = &AddrBalance{}
}
balances[strAddrDesc] = balance
d.cbs.balancesMiss++
} else {
d.cbs.balancesHit++
}
counted := addToAddressesMap(addresses, strAddrDesc, spendingTxid, ^int32(i))
if !counted {
balance.Txs++
}
balance.BalanceSat.Sub(&balance.BalanceSat, &spentOutput.ValueSat)
balance.markUtxoAsSpent(btxID, int32(input.Vout))
if balance.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&balance.BalanceSat, spentOutput.AddrDesc, "balance")
}
balance.SentSat.Add(&balance.SentSat, &spentOutput.ValueSat)
}
}
}
return nil
}
// addToAddressesMap maintains mapping between addresses and transactions in one block
// the method assumes that outputs in the block are processed before the inputs
// the return value is true if the tx was processed before, to not to count the tx multiple times
func addToAddressesMap(addresses addressesMap, strAddrDesc string, btxID []byte, index int32) bool {
// check that the address was already processed in this block
// if not found, it has certainly not been counted
at, found := addresses[strAddrDesc]
if found {
// if the tx is already in the slice, append the index to the array of indexes
for i, t := range at {
if bytes.Equal(btxID, t.btxID) {
at[i].indexes = append(t.indexes, index)
return true
}
}
}
addresses[strAddrDesc] = append(at, txIndexes{
btxID: btxID,
indexes: []int32{index},
})
return false
}
func (d *RocksDB) storeAddresses(wb *gorocksdb.WriteBatch, height uint32, addresses addressesMap) error {
for addrDesc, txi := range addresses {
ba := bchain.AddressDescriptor(addrDesc)
key := packAddressKey(ba, height)
val := d.packTxIndexes(txi)
wb.PutCF(d.cfh[cfAddresses], key, val)
}
return nil
}
func (d *RocksDB) storeTxAddresses(wb *gorocksdb.WriteBatch, am map[string]*TxAddresses) error {
varBuf := make([]byte, maxPackedBigintBytes)
buf := make([]byte, 1024)
for txID, ta := range am {
buf = packTxAddresses(ta, buf, varBuf)
wb.PutCF(d.cfh[cfTxAddresses], []byte(txID), buf)
}
return nil
}
func (d *RocksDB) storeBalances(wb *gorocksdb.WriteBatch, abm map[string]*AddrBalance) error {
// allocate buffer initial buffer
buf := make([]byte, 1024)
varBuf := make([]byte, maxPackedBigintBytes)
for addrDesc, ab := range abm {
// balance with 0 transactions is removed from db - happens on disconnect
if ab == nil || ab.Txs <= 0 {
wb.DeleteCF(d.cfh[cfAddressBalance], bchain.AddressDescriptor(addrDesc))
} else {
buf = packAddrBalance(ab, buf, varBuf)
wb.PutCF(d.cfh[cfAddressBalance], bchain.AddressDescriptor(addrDesc), buf)
}
}
return nil
}
func (d *RocksDB) cleanupBlockTxs(wb *gorocksdb.WriteBatch, block *bchain.Block) error {
keep := d.chainParser.KeepBlockAddresses()
// cleanup old block address
if block.Height > uint32(keep) {
for rh := block.Height - uint32(keep); rh > 0; rh-- {
key := packUint(rh)
val, err := d.db.GetCF(d.ro, d.cfh[cfBlockTxs], key)
if err != nil {
return err
}
// nil data means the key was not found in DB
if val.Data() == nil {
break
}
val.Free()
d.db.DeleteCF(d.wo, d.cfh[cfBlockTxs], key)
}
}
return nil
}
func (d *RocksDB) storeAndCleanupBlockTxs(wb *gorocksdb.WriteBatch, block *bchain.Block) error {
pl := d.chainParser.PackedTxidLen()
buf := make([]byte, 0, pl*len(block.Txs))
varBuf := make([]byte, vlq.MaxLen64)
zeroTx := make([]byte, pl)
for i := range block.Txs {
tx := &block.Txs[i]
o := make([]outpoint, len(tx.Vin))
for v := range tx.Vin {
vin := &tx.Vin[v]
btxID, err := d.chainParser.PackTxid(vin.Txid)
if err != nil {
// do not process inputs without input txid
if err == bchain.ErrTxidMissing {
btxID = zeroTx
} else {
return err
}
}
o[v].btxID = btxID
o[v].index = int32(vin.Vout)
}
btxID, err := d.chainParser.PackTxid(tx.Txid)
if err != nil {
return err
}
buf = append(buf, btxID...)
l := packVaruint(uint(len(o)), varBuf)
buf = append(buf, varBuf[:l]...)
buf = append(buf, d.packOutpoints(o)...)
}
key := packUint(block.Height)
wb.PutCF(d.cfh[cfBlockTxs], key, buf)
return d.cleanupBlockTxs(wb, block)
}
func (d *RocksDB) getBlockTxs(height uint32) ([]blockTxs, error) {
pl := d.chainParser.PackedTxidLen()
val, err := d.db.GetCF(d.ro, d.cfh[cfBlockTxs], packUint(height))
if err != nil {
return nil, err
}
defer val.Free()
buf := val.Data()
bt := make([]blockTxs, 0, 8)
for i := 0; i < len(buf); {
if len(buf)-i < pl {
glog.Error("rocksdb: Inconsistent data in blockTxs ", hex.EncodeToString(buf))
return nil, errors.New("Inconsistent data in blockTxs")
}
txid := append([]byte(nil), buf[i:i+pl]...)
i += pl
o, ol, err := d.unpackNOutpoints(buf[i:])
if err != nil {
glog.Error("rocksdb: Inconsistent data in blockTxs ", hex.EncodeToString(buf))
return nil, errors.New("Inconsistent data in blockTxs")
}
bt = append(bt, blockTxs{
btxID: txid,
inputs: o,
})
i += ol
}
return bt, nil
}
// GetAddrDescBalance returns AddrBalance for given addrDesc
func (d *RocksDB) GetAddrDescBalance(addrDesc bchain.AddressDescriptor, detail AddressBalanceDetail) (*AddrBalance, error) {
val, err := d.db.GetCF(d.ro, d.cfh[cfAddressBalance], addrDesc)
if err != nil {
return nil, err
}
defer val.Free()
buf := val.Data()
// 3 is minimum length of addrBalance - 1 byte txs, 1 byte sent, 1 byte balance
if len(buf) < 3 {
return nil, nil
}
return unpackAddrBalance(buf, d.chainParser.PackedTxidLen(), detail)
}
// GetAddressBalance returns address balance for an address or nil if address not found
func (d *RocksDB) GetAddressBalance(address string, detail AddressBalanceDetail) (*AddrBalance, error) {
addrDesc, err := d.chainParser.GetAddrDescFromAddress(address)
if err != nil {
return nil, err
}
return d.GetAddrDescBalance(addrDesc, detail)
}
func (d *RocksDB) getTxAddresses(btxID []byte) (*TxAddresses, error) {
val, err := d.db.GetCF(d.ro, d.cfh[cfTxAddresses], btxID)
if err != nil {
return nil, err
}
defer val.Free()
buf := val.Data()
// 2 is minimum length of addrBalance - 1 byte height, 1 byte inputs len, 1 byte outputs len
if len(buf) < 3 {
return nil, nil
}
return unpackTxAddresses(buf)
}
// GetTxAddresses returns TxAddresses for given txid or nil if not found
func (d *RocksDB) GetTxAddresses(txid string) (*TxAddresses, error) {
btxID, err := d.chainParser.PackTxid(txid)
if err != nil {
return nil, err
}
return d.getTxAddresses(btxID)
}
// AddrDescForOutpoint is a function that returns address descriptor and value for given outpoint or nil if outpoint not found
func (d *RocksDB) AddrDescForOutpoint(outpoint bchain.Outpoint) (bchain.AddressDescriptor, *big.Int) {
ta, err := d.GetTxAddresses(outpoint.Txid)
if err != nil || ta == nil {
return nil, nil
}
if outpoint.Vout < 0 {
vin := ^outpoint.Vout
if len(ta.Inputs) <= int(vin) {
return nil, nil
}
return ta.Inputs[vin].AddrDesc, &ta.Inputs[vin].ValueSat
}
if len(ta.Outputs) <= int(outpoint.Vout) {
return nil, nil
}
return ta.Outputs[outpoint.Vout].AddrDesc, &ta.Outputs[outpoint.Vout].ValueSat
}
func packTxAddresses(ta *TxAddresses, buf []byte, varBuf []byte) []byte {
buf = buf[:0]
l := packVaruint(uint(ta.Height), varBuf)
buf = append(buf, varBuf[:l]...)
l = packVaruint(uint(len(ta.Inputs)), varBuf)
buf = append(buf, varBuf[:l]...)
for i := range ta.Inputs {
buf = appendTxInput(&ta.Inputs[i], buf, varBuf)
}
l = packVaruint(uint(len(ta.Outputs)), varBuf)
buf = append(buf, varBuf[:l]...)
for i := range ta.Outputs {
buf = appendTxOutput(&ta.Outputs[i], buf, varBuf)
}
return buf
}
func appendTxInput(txi *TxInput, buf []byte, varBuf []byte) []byte {
la := len(txi.AddrDesc)
l := packVaruint(uint(la), varBuf)
buf = append(buf, varBuf[:l]...)
buf = append(buf, txi.AddrDesc...)
l = packBigint(&txi.ValueSat, varBuf)
buf = append(buf, varBuf[:l]...)
return buf
}
func appendTxOutput(txo *TxOutput, buf []byte, varBuf []byte) []byte {
la := len(txo.AddrDesc)
if txo.Spent {
la = ^la
}
l := packVarint(la, varBuf)
buf = append(buf, varBuf[:l]...)
buf = append(buf, txo.AddrDesc...)
l = packBigint(&txo.ValueSat, varBuf)
buf = append(buf, varBuf[:l]...)
return buf
}
func unpackAddrBalance(buf []byte, txidUnpackedLen int, detail AddressBalanceDetail) (*AddrBalance, error) {
txs, l := unpackVaruint(buf)
sentSat, sl := unpackBigint(buf[l:])
balanceSat, bl := unpackBigint(buf[l+sl:])
l = l + sl + bl
ab := &AddrBalance{
Txs: uint32(txs),
SentSat: sentSat,
BalanceSat: balanceSat,
}
if detail != AddressBalanceDetailNoUTXO {
// estimate the size of utxos to avoid reallocation
ab.Utxos = make([]Utxo, 0, len(buf[l:])/txidUnpackedLen+3)
// ab.utxosMap = make(map[string]int, cap(ab.Utxos))
for len(buf[l:]) >= txidUnpackedLen+3 {
btxID := append([]byte(nil), buf[l:l+txidUnpackedLen]...)
l += txidUnpackedLen
vout, ll := unpackVaruint(buf[l:])
l += ll
height, ll := unpackVaruint(buf[l:])
l += ll
valueSat, ll := unpackBigint(buf[l:])
l += ll
u := Utxo{
BtxID: btxID,
Vout: int32(vout),
Height: uint32(height),
ValueSat: valueSat,
}
if detail == AddressBalanceDetailUTXO {
ab.Utxos = append(ab.Utxos, u)
} else {
ab.addUtxo(&u)
}
}
}
return ab, nil
}
func packAddrBalance(ab *AddrBalance, buf, varBuf []byte) []byte {
buf = buf[:0]
l := packVaruint(uint(ab.Txs), varBuf)
buf = append(buf, varBuf[:l]...)
l = packBigint(&ab.SentSat, varBuf)
buf = append(buf, varBuf[:l]...)
l = packBigint(&ab.BalanceSat, varBuf)
buf = append(buf, varBuf[:l]...)
for _, utxo := range ab.Utxos {
// if Vout < 0, utxo is marked as spent
if utxo.Vout >= 0 {
buf = append(buf, utxo.BtxID...)
l = packVaruint(uint(utxo.Vout), varBuf)
buf = append(buf, varBuf[:l]...)
l = packVaruint(uint(utxo.Height), varBuf)
buf = append(buf, varBuf[:l]...)
l = packBigint(&utxo.ValueSat, varBuf)
buf = append(buf, varBuf[:l]...)
}
}
return buf
}
func unpackTxAddresses(buf []byte) (*TxAddresses, error) {
ta := TxAddresses{}
height, l := unpackVaruint(buf)
ta.Height = uint32(height)
inputs, ll := unpackVaruint(buf[l:])
l += ll
ta.Inputs = make([]TxInput, inputs)
for i := uint(0); i < inputs; i++ {
l += unpackTxInput(&ta.Inputs[i], buf[l:])
}
outputs, ll := unpackVaruint(buf[l:])
l += ll
ta.Outputs = make([]TxOutput, outputs)
for i := uint(0); i < outputs; i++ {
l += unpackTxOutput(&ta.Outputs[i], buf[l:])
}
return &ta, nil
}
func unpackTxInput(ti *TxInput, buf []byte) int {
al, l := unpackVaruint(buf)
ti.AddrDesc = append([]byte(nil), buf[l:l+int(al)]...)
al += uint(l)
ti.ValueSat, l = unpackBigint(buf[al:])
return l + int(al)
}
func unpackTxOutput(to *TxOutput, buf []byte) int {
al, l := unpackVarint(buf)
if al < 0 {
to.Spent = true
al = ^al
}
to.AddrDesc = append([]byte(nil), buf[l:l+al]...)
al += l
to.ValueSat, l = unpackBigint(buf[al:])
return l + al
}
func (d *RocksDB) packTxIndexes(txi []txIndexes) []byte {
buf := make([]byte, 0, 32)
bvout := make([]byte, vlq.MaxLen32)
// store the txs in reverse order for ordering from newest to oldest
for j := len(txi) - 1; j >= 0; j-- {
t := &txi[j]
buf = append(buf, []byte(t.btxID)...)
for i, index := range t.indexes {
index <<= 1
if i == len(t.indexes)-1 {
index |= 1
}
l := packVarint32(index, bvout)
buf = append(buf, bvout[:l]...)
}
}
return buf
}
func (d *RocksDB) packOutpoints(outpoints []outpoint) []byte {
buf := make([]byte, 0, 32)
bvout := make([]byte, vlq.MaxLen32)
for _, o := range outpoints {
l := packVarint32(o.index, bvout)
buf = append(buf, []byte(o.btxID)...)
buf = append(buf, bvout[:l]...)
}
return buf
}
func (d *RocksDB) unpackNOutpoints(buf []byte) ([]outpoint, int, error) {
txidUnpackedLen := d.chainParser.PackedTxidLen()
n, p := unpackVaruint(buf)
outpoints := make([]outpoint, n)
for i := uint(0); i < n; i++ {
if p+txidUnpackedLen >= len(buf) {
return nil, 0, errors.New("Inconsistent data in unpackNOutpoints")
}
btxID := append([]byte(nil), buf[p:p+txidUnpackedLen]...)
p += txidUnpackedLen
vout, voutLen := unpackVarint32(buf[p:])
p += voutLen
outpoints[i] = outpoint{
btxID: btxID,
index: vout,
}
}
return outpoints, p, nil
}
// Block index
// BlockInfo holds information about blocks kept in column height
type BlockInfo struct {
Hash string
Time int64
Txs uint32
Size uint32
Height uint32 // Height is not packed!
}
func (d *RocksDB) packBlockInfo(block *BlockInfo) ([]byte, error) {
packed := make([]byte, 0, 64)
varBuf := make([]byte, vlq.MaxLen64)
b, err := d.chainParser.PackBlockHash(block.Hash)
if err != nil {
return nil, err
}
pl := d.chainParser.PackedTxidLen()
if len(b) != pl {
glog.Warning("Non standard block hash for height ", block.Height, ", hash [", block.Hash, "]")
if len(b) > pl {
b = b[:pl]
} else {
b = append(b, make([]byte, pl-len(b))...)
}
}
packed = append(packed, b...)
packed = append(packed, packUint(uint32(block.Time))...)
l := packVaruint(uint(block.Txs), varBuf)
packed = append(packed, varBuf[:l]...)
l = packVaruint(uint(block.Size), varBuf)
packed = append(packed, varBuf[:l]...)
return packed, nil
}
func (d *RocksDB) unpackBlockInfo(buf []byte) (*BlockInfo, error) {
pl := d.chainParser.PackedTxidLen()
// minimum length is PackedTxidLen + 4 bytes time + 1 byte txs + 1 byte size
if len(buf) < pl+4+2 {
return nil, nil
}
txid, err := d.chainParser.UnpackBlockHash(buf[:pl])
if err != nil {
return nil, err
}
t := unpackUint(buf[pl:])
txs, l := unpackVaruint(buf[pl+4:])
size, _ := unpackVaruint(buf[pl+4+l:])
return &BlockInfo{
Hash: txid,
Time: int64(t),
Txs: uint32(txs),
Size: uint32(size),
}, nil
}
// GetBestBlock returns the block hash of the block with highest height in the db
func (d *RocksDB) GetBestBlock() (uint32, string, error) {
it := d.db.NewIteratorCF(d.ro, d.cfh[cfHeight])
defer it.Close()
if it.SeekToLast(); it.Valid() {
bestHeight := unpackUint(it.Key().Data())
info, err := d.unpackBlockInfo(it.Value().Data())
if info != nil {
if glog.V(1) {
glog.Infof("rocksdb: bestblock %d %+v", bestHeight, info)
}
return bestHeight, info.Hash, err
}
}
return 0, "", nil
}
// GetBlockHash returns block hash at given height or empty string if not found
func (d *RocksDB) GetBlockHash(height uint32) (string, error) {
key := packUint(height)
val, err := d.db.GetCF(d.ro, d.cfh[cfHeight], key)
if err != nil {
return "", err
}
defer val.Free()
info, err := d.unpackBlockInfo(val.Data())
if info == nil {
return "", err
}
return info.Hash, nil
}
// GetBlockInfo returns block info stored in db
func (d *RocksDB) GetBlockInfo(height uint32) (*BlockInfo, error) {
key := packUint(height)
val, err := d.db.GetCF(d.ro, d.cfh[cfHeight], key)
if err != nil {
return nil, err
}
defer val.Free()
bi, err := d.unpackBlockInfo(val.Data())
if err != nil || bi == nil {
return nil, err
}
bi.Height = height
return bi, err
}
func (d *RocksDB) writeHeightFromBlock(wb *gorocksdb.WriteBatch, block *bchain.Block, op int) error {
return d.writeHeight(wb, block.Height, &BlockInfo{
Hash: block.Hash,
Time: block.Time,
Txs: uint32(len(block.Txs)),
Size: uint32(block.Size),
Height: block.Height,
}, op)
}
func (d *RocksDB) writeHeight(wb *gorocksdb.WriteBatch, height uint32, bi *BlockInfo, op int) error {
key := packUint(height)
switch op {
case opInsert:
val, err := d.packBlockInfo(bi)
if err != nil {
return err
}
wb.PutCF(d.cfh[cfHeight], key, val)
d.is.UpdateBestHeight(height)
case opDelete:
wb.DeleteCF(d.cfh[cfHeight], key)
d.is.UpdateBestHeight(height - 1)
}
return nil
}
// Disconnect blocks
func (d *RocksDB) disconnectTxAddressesInputs(wb *gorocksdb.WriteBatch, btxID []byte, inputs []outpoint, txa *TxAddresses, txAddressesToUpdate map[string]*TxAddresses,
getAddressBalance func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error),
addressFoundInTx func(addrDesc bchain.AddressDescriptor, btxID []byte) bool) error {
var err error
var balance *AddrBalance
for i, t := range txa.Inputs {
if len(t.AddrDesc) > 0 {
input := &inputs[i]
exist := addressFoundInTx(t.AddrDesc, btxID)
s := string(input.btxID)
sa, found := txAddressesToUpdate[s]
if !found {
sa, err = d.getTxAddresses(input.btxID)
if err != nil {
return err
}
if sa != nil {
txAddressesToUpdate[s] = sa
}
}
var inputHeight uint32
if sa != nil {
sa.Outputs[input.index].Spent = false
inputHeight = sa.Height
}
if d.chainParser.IsAddrDescIndexable(t.AddrDesc) {
balance, err = getAddressBalance(t.AddrDesc)
if err != nil {
return err
}
if balance != nil {
// subtract number of txs only once
if !exist {
balance.Txs--
}
balance.SentSat.Sub(&balance.SentSat, &t.ValueSat)
if balance.SentSat.Sign() < 0 {
d.resetValueSatToZero(&balance.SentSat, t.AddrDesc, "sent amount")
}
balance.BalanceSat.Add(&balance.BalanceSat, &t.ValueSat)
balance.addUtxoInDisconnect(&Utxo{
BtxID: input.btxID,
Vout: input.index,
Height: inputHeight,
ValueSat: t.ValueSat,
})
} else {
ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc)
glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc)
}
}
}
}
return nil
}
func (d *RocksDB) disconnectTxAddressesOutputs(wb *gorocksdb.WriteBatch, btxID []byte, txa *TxAddresses,
getAddressBalance func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error),
addressFoundInTx func(addrDesc bchain.AddressDescriptor, btxID []byte) bool) error {
for i, t := range txa.Outputs {
if len(t.AddrDesc) > 0 {
exist := addressFoundInTx(t.AddrDesc, btxID)
if d.chainParser.IsAddrDescIndexable(t.AddrDesc) {
balance, err := getAddressBalance(t.AddrDesc)
if err != nil {
return err
}
if balance != nil {
// subtract number of txs only once
if !exist {
balance.Txs--
}
balance.BalanceSat.Sub(&balance.BalanceSat, &t.ValueSat)
if balance.BalanceSat.Sign() < 0 {
d.resetValueSatToZero(&balance.BalanceSat, t.AddrDesc, "balance")
}
balance.markUtxoAsSpent(btxID, int32(i))
} else {
ad, _, _ := d.chainParser.GetAddressesFromAddrDesc(t.AddrDesc)
glog.Warningf("Balance for address %s (%s) not found", ad, t.AddrDesc)
}
}
}
}
return nil
}
func (d *RocksDB) disconnectBlock(height uint32, blockTxs []blockTxs) error {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
txAddressesToUpdate := make(map[string]*TxAddresses)
txAddresses := make([]*TxAddresses, len(blockTxs))
txsToDelete := make(map[string]struct{})
balances := make(map[string]*AddrBalance)
getAddressBalance := func(addrDesc bchain.AddressDescriptor) (*AddrBalance, error) {
var err error
s := string(addrDesc)
b, fb := balances[s]
if !fb {
b, err = d.GetAddrDescBalance(addrDesc, addressBalanceDetailUTXOIndexed)
if err != nil {
return nil, err
}
balances[s] = b
}
return b, nil
}
// all addresses in the block are stored in blockAddressesTxs, together with a map of transactions where they appear
blockAddressesTxs := make(map[string]map[string]struct{})
// addressFoundInTx handles updates of the blockAddressesTxs map and returns true if the address+tx was already encountered
addressFoundInTx := func(addrDesc bchain.AddressDescriptor, btxID []byte) bool {
sAddrDesc := string(addrDesc)
sBtxID := string(btxID)
a, exist := blockAddressesTxs[sAddrDesc]
if !exist {
blockAddressesTxs[sAddrDesc] = map[string]struct{}{sBtxID: {}}
} else {
_, exist = a[sBtxID]
if !exist {
a[sBtxID] = struct{}{}
}
}
return exist
}
glog.Info("Disconnecting block ", height, " containing ", len(blockTxs), " transactions")
// when connecting block, outputs are processed first
// when disconnecting, inputs must be reversed first
for i := range blockTxs {
btxID := blockTxs[i].btxID
s := string(btxID)
txsToDelete[s] = struct{}{}
txa, err := d.getTxAddresses(btxID)
if err != nil {
return err
}
if txa == nil {
ut, _ := d.chainParser.UnpackTxid(btxID)
glog.Warning("TxAddress for txid ", ut, " not found")
continue
}
txAddresses[i] = txa
if err := d.disconnectTxAddressesInputs(wb, btxID, blockTxs[i].inputs, txa, txAddressesToUpdate, getAddressBalance, addressFoundInTx); err != nil {
return err
}
}
for i := range blockTxs {
btxID := blockTxs[i].btxID
txa := txAddresses[i]
if txa == nil {
continue
}
if err := d.disconnectTxAddressesOutputs(wb, btxID, txa, getAddressBalance, addressFoundInTx); err != nil {
return err
}
}
for a := range blockAddressesTxs {
key := packAddressKey([]byte(a), height)
wb.DeleteCF(d.cfh[cfAddresses], key)
}
key := packUint(height)
wb.DeleteCF(d.cfh[cfBlockTxs], key)
wb.DeleteCF(d.cfh[cfHeight], key)
d.storeTxAddresses(wb, txAddressesToUpdate)
d.storeBalancesDisconnect(wb, balances)
for s := range txsToDelete {
b := []byte(s)
wb.DeleteCF(d.cfh[cfTransactions], b)
wb.DeleteCF(d.cfh[cfTxAddresses], b)
}
return d.db.Write(d.wo, wb)
}
// DisconnectBlockRangeBitcoinType removes all data belonging to blocks in range lower-higher
// it is able to disconnect only blocks for which there are data in the blockTxs column
func (d *RocksDB) DisconnectBlockRangeBitcoinType(lower uint32, higher uint32) error {
blocks := make([][]blockTxs, higher-lower+1)
for height := lower; height <= higher; height++ {
blockTxs, err := d.getBlockTxs(height)
if err != nil {
return err
}
if len(blockTxs) == 0 {
return errors.Errorf("Cannot disconnect blocks with height %v and lower. It is necessary to rebuild index.", height)
}
blocks[height-lower] = blockTxs
}
for height := higher; height >= lower; height-- {
err := d.disconnectBlock(height, blocks[height-lower])
if err != nil {
return err
}
}
d.is.RemoveLastBlockTimes(int(higher-lower) + 1)
glog.Infof("rocksdb: blocks %d-%d disconnected", lower, higher)
return nil
}
func (d *RocksDB) storeBalancesDisconnect(wb *gorocksdb.WriteBatch, balances map[string]*AddrBalance) {
for _, b := range balances {
if b != nil {
// remove spent utxos
us := make([]Utxo, 0, len(b.Utxos))
for _, u := range b.Utxos {
// remove utxos marked as spent
if u.Vout >= 0 {
us = append(us, u)
}
}
b.Utxos = us
// sort utxos by height
sort.SliceStable(b.Utxos, func(i, j int) bool {
return b.Utxos[i].Height < b.Utxos[j].Height
})
}
}
d.storeBalances(wb, balances)
}
func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err == nil {
if !info.IsDir() {
size += info.Size()
}
}
return err
})
return size, err
}
// DatabaseSizeOnDisk returns size of the database in bytes
func (d *RocksDB) DatabaseSizeOnDisk() int64 {
size, err := dirSize(d.path)
if err != nil {
glog.Warning("rocksdb: DatabaseSizeOnDisk: ", err)
return 0
}
return size
}
// GetTx returns transaction stored in db and height of the block containing it
func (d *RocksDB) GetTx(txid string) (*bchain.Tx, uint32, error) {
key, err := d.chainParser.PackTxid(txid)
if err != nil {
return nil, 0, err
}
val, err := d.db.GetCF(d.ro, d.cfh[cfTransactions], key)
if err != nil {
return nil, 0, err
}
defer val.Free()
data := val.Data()
if len(data) > 4 {
return d.chainParser.UnpackTx(data)
}
return nil, 0, nil
}
// PutTx stores transactions in db
func (d *RocksDB) PutTx(tx *bchain.Tx, height uint32, blockTime int64) error {
key, err := d.chainParser.PackTxid(tx.Txid)
if err != nil {
return nil
}
buf, err := d.chainParser.PackTx(tx, height, blockTime)
if err != nil {
return err
}
err = d.db.PutCF(d.wo, d.cfh[cfTransactions], key, buf)
if err == nil {
d.is.AddDBColumnStats(cfTransactions, 1, int64(len(key)), int64(len(buf)))
}
return err
}
// DeleteTx removes transactions from db
func (d *RocksDB) DeleteTx(txid string) error {
key, err := d.chainParser.PackTxid(txid)
if err != nil {
return nil
}
// use write batch so that this delete matches other deletes
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
d.internalDeleteTx(wb, key)
return d.db.Write(d.wo, wb)
}
// internalDeleteTx checks if tx is cached and updates internal state accordingly
func (d *RocksDB) internalDeleteTx(wb *gorocksdb.WriteBatch, key []byte) {
val, err := d.db.GetCF(d.ro, d.cfh[cfTransactions], key)
// ignore error, it is only for statistics
if err == nil {
l := len(val.Data())
if l > 0 {
d.is.AddDBColumnStats(cfTransactions, -1, int64(-len(key)), int64(-l))
}
defer val.Free()
}
wb.DeleteCF(d.cfh[cfTransactions], key)
}
// internal state
const internalStateKey = "internalState"
func (d *RocksDB) loadBlockTimes() ([]uint32, error) {
var times []uint32
it := d.db.NewIteratorCF(d.ro, d.cfh[cfHeight])
defer it.Close()
counter := uint32(0)
time := uint32(0)
for it.SeekToFirst(); it.Valid(); it.Next() {
height := unpackUint(it.Key().Data())
if height > counter {
glog.Warning("gap in cfHeight: expecting ", counter, ", got ", height)
for ; counter < height; counter++ {
times = append(times, time)
}
}
counter++
info, err := d.unpackBlockInfo(it.Value().Data())
if err != nil {
return nil, err
}
if info != nil {
time = uint32(info.Time)
}
times = append(times, time)
}
glog.Info("loaded ", len(times), " block times")
return times, nil
}
// LoadInternalState loads from db internal state or initializes a new one if not yet stored
func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, error) {
val, err := d.db.GetCF(d.ro, d.cfh[cfDefault], []byte(internalStateKey))
if err != nil {
return nil, err
}
defer val.Free()
data := val.Data()
var is *common.InternalState
if len(data) == 0 {
is = &common.InternalState{Coin: rpcCoin, UtxoChecked: true}
} else {
is, err = common.UnpackInternalState(data)
if err != nil {
return nil, err
}
// verify that the rpc coin matches DB coin
// running it mismatched would corrupt the database
if is.Coin == "" {
is.Coin = rpcCoin
} else if is.Coin != rpcCoin {
return nil, errors.Errorf("Coins do not match. DB coin %v, RPC coin %v", is.Coin, rpcCoin)
}
}
// make sure that column stats match the columns
sc := is.DbColumns
nc := make([]common.InternalStateColumn, len(cfNames))
for i := 0; i < len(nc); i++ {
nc[i].Name = cfNames[i]
nc[i].Version = dbVersion
for j := 0; j < len(sc); j++ {
if sc[j].Name == nc[i].Name {
// check the version of the column, if it does not match, the db is not compatible
if sc[j].Version != dbVersion {
return nil, errors.Errorf("DB version %v of column '%v' does not match the required version %v. DB is not compatible.", sc[j].Version, sc[j].Name, dbVersion)
}
nc[i].Rows = sc[j].Rows
nc[i].KeyBytes = sc[j].KeyBytes
nc[i].ValueBytes = sc[j].ValueBytes
nc[i].Updated = sc[j].Updated
break
}
}
}
is.DbColumns = nc
is.BlockTimes, err = d.loadBlockTimes()
if err != nil {
return nil, err
}
// after load, reset the synchronization data
is.IsSynchronized = false
is.IsMempoolSynchronized = false
var t time.Time
is.LastMempoolSync = t
is.SyncMode = false
return is, nil
}
// SetInconsistentState sets the internal state to DbStateInconsistent or DbStateOpen based on inconsistent parameter
// db in left in DbStateInconsistent state cannot be used and must be recreated
func (d *RocksDB) SetInconsistentState(inconsistent bool) error {
if d.is == nil {
return errors.New("Internal state not created")
}
if inconsistent {
d.is.DbState = common.DbStateInconsistent
} else {
d.is.DbState = common.DbStateOpen
}
return d.storeState(d.is)
}
// SetInternalState sets the InternalState to be used by db to collect internal state
func (d *RocksDB) SetInternalState(is *common.InternalState) {
d.is = is
}
// StoreInternalState stores the internal state to db
func (d *RocksDB) StoreInternalState(is *common.InternalState) error {
if d.metrics != nil {
for c := 0; c < len(cfNames); c++ {
rows, keyBytes, valueBytes := d.is.GetDBColumnStatValues(c)
d.metrics.DbColumnRows.With(common.Labels{"column": cfNames[c]}).Set(float64(rows))
d.metrics.DbColumnSize.With(common.Labels{"column": cfNames[c]}).Set(float64(keyBytes + valueBytes))
}
}
return d.storeState(is)
}
func (d *RocksDB) storeState(is *common.InternalState) error {
buf, err := is.Pack()
if err != nil {
return err
}
return d.db.PutCF(d.wo, d.cfh[cfDefault], []byte(internalStateKey), buf)
}
func (d *RocksDB) computeColumnSize(col int, stopCompute chan os.Signal) (int64, int64, int64, error) {
var rows, keysSum, valuesSum int64
var seekKey []byte
// do not use cache
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
for {
var key []byte
it := d.db.NewIteratorCF(ro, d.cfh[col])
if rows == 0 {
it.SeekToFirst()
} else {
glog.Info("db: Column ", cfNames[col], ": rows ", rows, ", key bytes ", keysSum, ", value bytes ", valuesSum, ", in progress...")
it.Seek(seekKey)
it.Next()
}
for count := 0; it.Valid() && count < refreshIterator; it.Next() {
select {
case <-stopCompute:
return 0, 0, 0, errors.New("Interrupted")
default:
}
key = it.Key().Data()
count++
rows++
keysSum += int64(len(key))
valuesSum += int64(len(it.Value().Data()))
}
seekKey = append([]byte{}, key...)
valid := it.Valid()
it.Close()
if !valid {
break
}
}
return rows, keysSum, valuesSum, nil
}
// ComputeInternalStateColumnStats computes stats of all db columns and sets them to internal state
// can be very slow operation
func (d *RocksDB) ComputeInternalStateColumnStats(stopCompute chan os.Signal) error {
start := time.Now()
glog.Info("db: ComputeInternalStateColumnStats start")
for c := 0; c < len(cfNames); c++ {
rows, keysSum, valuesSum, err := d.computeColumnSize(c, stopCompute)
if err != nil {
return err
}
d.is.SetDBColumnStats(c, rows, keysSum, valuesSum)
glog.Info("db: Column ", cfNames[c], ": rows ", rows, ", key bytes ", keysSum, ", value bytes ", valuesSum)
}
glog.Info("db: ComputeInternalStateColumnStats finished in ", time.Since(start))
return nil
}
func reorderUtxo(utxos []Utxo, index int) {
var from, to int
for from = index; from >= 0; from-- {
if !bytes.Equal(utxos[from].BtxID, utxos[index].BtxID) {
break
}
}
from++
for to = index + 1; to < len(utxos); to++ {
if !bytes.Equal(utxos[to].BtxID, utxos[index].BtxID) {
break
}
}
toSort := utxos[from:to]
sort.SliceStable(toSort, func(i, j int) bool {
return toSort[i].Vout < toSort[j].Vout
})
}
func (d *RocksDB) fixUtxo(addrDesc bchain.AddressDescriptor, ba *AddrBalance) (bool, bool, error) {
reorder := false
var checksum big.Int
var prevUtxo *Utxo
for i := range ba.Utxos {
utxo := &ba.Utxos[i]
checksum.Add(&checksum, &utxo.ValueSat)
if prevUtxo != nil {
if prevUtxo.Vout > utxo.Vout && *(*int)(unsafe.Pointer(&utxo.BtxID[0])) == *(*int)(unsafe.Pointer(&prevUtxo.BtxID[0])) && bytes.Equal(utxo.BtxID, prevUtxo.BtxID) {
reorderUtxo(ba.Utxos, i)
reorder = true
}
}
prevUtxo = utxo
}
if reorder {
// get the checksum again after reorder
checksum.SetInt64(0)
for i := range ba.Utxos {
utxo := &ba.Utxos[i]
checksum.Add(&checksum, &utxo.ValueSat)
}
}
if checksum.Cmp(&ba.BalanceSat) != 0 {
var checksumFromTxs big.Int
var utxos []Utxo
err := d.GetAddrDescTransactions(addrDesc, 0, ^uint32(0), func(txid string, height uint32, indexes []int32) error {
var ta *TxAddresses
var err error
// sort the indexes so that the utxos are appended in the reverse order
sort.Slice(indexes, func(i, j int) bool {
return indexes[i] > indexes[j]
})
for _, index := range indexes {
// take only outputs
if index < 0 {
break
}
if ta == nil {
ta, err = d.GetTxAddresses(txid)
if err != nil {
return err
}
}
if ta == nil {
return errors.New("DB inconsistency: tx " + txid + ": not found in txAddresses")
}
if len(ta.Outputs) <= int(index) {
glog.Warning("DB inconsistency: txAddresses " + txid + " does not have enough outputs")
} else {
tao := &ta.Outputs[index]
if !tao.Spent {
bTxid, _ := d.chainParser.PackTxid(txid)
checksumFromTxs.Add(&checksumFromTxs, &tao.ValueSat)
utxos = append(utxos, Utxo{BtxID: bTxid, Height: height, Vout: index, ValueSat: tao.ValueSat})
if checksumFromTxs.Cmp(&ba.BalanceSat) == 0 {
return &StopIteration{}
}
}
}
}
return nil
})
if err != nil {
return false, false, err
}
fixed := false
if checksumFromTxs.Cmp(&ba.BalanceSat) == 0 {
// reverse the utxos as they are added in descending order by height
for i := len(utxos)/2 - 1; i >= 0; i-- {
opp := len(utxos) - 1 - i
utxos[i], utxos[opp] = utxos[opp], utxos[i]
}
ba.Utxos = utxos
wb := gorocksdb.NewWriteBatch()
err = d.storeBalances(wb, map[string]*AddrBalance{string(addrDesc): ba})
if err == nil {
err = d.db.Write(d.wo, wb)
}
wb.Destroy()
if err != nil {
return false, false, errors.Errorf("balance %s, checksum %s, from txa %s, txs %d, error storing fixed utxos %v", ba.BalanceSat.String(), checksum.String(), checksumFromTxs.String(), ba.Txs, err)
}
fixed = true
}
return fixed, false, errors.Errorf("balance %s, checksum %s, from txa %s, txs %d", ba.BalanceSat.String(), checksum.String(), checksumFromTxs.String(), ba.Txs)
} else if reorder {
wb := gorocksdb.NewWriteBatch()
err := d.storeBalances(wb, map[string]*AddrBalance{string(addrDesc): ba})
if err == nil {
err = d.db.Write(d.wo, wb)
}
wb.Destroy()
if err != nil {
return false, false, errors.Errorf("error storing reordered utxos %v", err)
}
}
return false, reorder, nil
}
// FixUtxos checks and fixes possible
func (d *RocksDB) FixUtxos(stop chan os.Signal) error {
if d.chainParser.GetChainType() != bchain.ChainBitcoinType {
glog.Info("FixUtxos: applicable only for bitcoin type coins")
return nil
}
glog.Info("FixUtxos: starting")
var row, errorsCount, fixedCount int64
var seekKey []byte
// do not use cache
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
for {
var addrDesc bchain.AddressDescriptor
it := d.db.NewIteratorCF(ro, d.cfh[cfAddressBalance])
if row == 0 {
it.SeekToFirst()
} else {
glog.Info("FixUtxos: row ", row, ", errors ", errorsCount)
it.Seek(seekKey)
it.Next()
}
for count := 0; it.Valid() && count < refreshIterator; it.Next() {
select {
case <-stop:
return errors.New("Interrupted")
default:
}
addrDesc = it.Key().Data()
buf := it.Value().Data()
count++
row++
if len(buf) < 3 {
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", empty data")
errorsCount++
continue
}
ba, err := unpackAddrBalance(buf, d.chainParser.PackedTxidLen(), AddressBalanceDetailUTXO)
if err != nil {
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", unpackAddrBalance error ", err)
errorsCount++
continue
}
fixed, reordered, err := d.fixUtxo(addrDesc, ba)
if err != nil {
errorsCount++
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, ", error ", err, ", fixed ", fixed)
if fixed {
fixedCount++
}
} else if reordered {
glog.Error("FixUtxos: row ", row, ", addrDesc ", addrDesc, " reordered")
fixedCount++
}
}
seekKey = append([]byte{}, addrDesc...)
valid := it.Valid()
it.Close()
if !valid {
break
}
}
glog.Info("FixUtxos: finished, scanned ", row, " rows, found ", errorsCount, " errors, fixed ", fixedCount)
return nil
}
// Helpers
func packAddressKey(addrDesc bchain.AddressDescriptor, height uint32) []byte {
buf := make([]byte, len(addrDesc)+packedHeightBytes)
copy(buf, addrDesc)
// pack height as binary complement to achieve ordering from newest to oldest block
binary.BigEndian.PutUint32(buf[len(addrDesc):], ^height)
return buf
}
func unpackAddressKey(key []byte) ([]byte, uint32, error) {
i := len(key) - packedHeightBytes
if i <= 0 {
return nil, 0, errors.New("Invalid address key")
}
// height is packed in binary complement, convert it
return key[:i], ^unpackUint(key[i : i+packedHeightBytes]), nil
}
func packUint(i uint32) []byte {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, i)
return buf
}
func unpackUint(buf []byte) uint32 {
return binary.BigEndian.Uint32(buf)
}
func packVarint32(i int32, buf []byte) int {
return vlq.PutInt(buf, int64(i))
}
func packVarint(i int, buf []byte) int {
return vlq.PutInt(buf, int64(i))
}
func packVaruint(i uint, buf []byte) int {
return vlq.PutUint(buf, uint64(i))
}
func unpackVarint32(buf []byte) (int32, int) {
i, ofs := vlq.Int(buf)
return int32(i), ofs
}
func unpackVarint(buf []byte) (int, int) {
i, ofs := vlq.Int(buf)
return int(i), ofs
}
func unpackVaruint(buf []byte) (uint, int) {
i, ofs := vlq.Uint(buf)
return uint(i), ofs
}
const (
// number of bits in a big.Word
wordBits = 32 << (uint64(^big.Word(0)) >> 63)
// number of bytes in a big.Word
wordBytes = wordBits / 8
// max packed bigint words
maxPackedBigintWords = (256 - wordBytes) / wordBytes
maxPackedBigintBytes = 249
)
// big int is packed in BigEndian order without memory allocation as 1 byte length followed by bytes of big int
// number of written bytes is returned
// limitation: bigints longer than 248 bytes are truncated to 248 bytes
// caution: buffer must be big enough to hold the packed big int, buffer 249 bytes big is always safe
func packBigint(bi *big.Int, buf []byte) int {
w := bi.Bits()
lw := len(w)
// zero returns only one byte - zero length
if lw == 0 {
buf[0] = 0
return 1
}
// pack the most significant word in a special way - skip leading zeros
w0 := w[lw-1]
fb := 8
mask := big.Word(0xff) << (wordBits - 8)
for w0&mask == 0 {
fb--
mask >>= 8
}
for i := fb; i > 0; i-- {
buf[i] = byte(w0)
w0 >>= 8
}
// if the big int is too big (> 2^1984), the number of bytes would not fit to 1 byte
// in this case, truncate the number, it is not expected to work with this big numbers as amounts
s := 0
if lw > maxPackedBigintWords {
s = lw - maxPackedBigintWords
}
// pack the rest of the words in reverse order
for j := lw - 2; j >= s; j-- {
d := w[j]
for i := fb + wordBytes; i > fb; i-- {
buf[i] = byte(d)
d >>= 8
}
fb += wordBytes
}
buf[0] = byte(fb)
return fb + 1
}
func unpackBigint(buf []byte) (big.Int, int) {
var r big.Int
l := int(buf[0]) + 1
r.SetBytes(buf[1:l])
return r, l
}