blockbook/rocksdb.go

366 lines
7.2 KiB
Go

package main
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"log"
"github.com/bsm/go-vlq"
"github.com/btcsuite/btcutil/base58"
"github.com/tecbot/gorocksdb"
)
func RepairRocksDB(name string) error {
log.Printf("rocksdb: repair")
opts := gorocksdb.NewDefaultOptions()
return gorocksdb.RepairDb(name, opts)
}
type RocksDB struct {
db *gorocksdb.DB
wo *gorocksdb.WriteOptions
ro *gorocksdb.ReadOptions
}
// NewRocksDB opens an internal handle to RocksDB environment. Close
// needs to be called to release it.
func NewRocksDB(path string) (d *RocksDB, err error) {
log.Printf("rocksdb: open %s", path)
fp := gorocksdb.NewBloomFilter(10)
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockSize(16 << 10) // 16kb
bbto.SetBlockCache(gorocksdb.NewLRUCache(8 << 30)) // 8 gb
bbto.SetFilterPolicy(fp)
opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCreateIfMissing(true)
opts.SetMaxBackgroundCompactions(4)
opts.SetMaxBackgroundFlushes(2)
opts.SetBytesPerSync(1 << 20) // 1mb
opts.SetWriteBufferSize(2 << 30) // 2 gb
opts.SetMaxOpenFiles(25000)
db, err := gorocksdb.OpenDb(opts, path)
if err != nil {
return
}
wo := gorocksdb.NewDefaultWriteOptions()
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
return &RocksDB{db, wo, ro}, nil
}
// Close releases the RocksDB environment opened in NewRocksDB.
func (d *RocksDB) Close() error {
log.Printf("rocksdb: close")
d.wo.Destroy()
d.ro.Destroy()
d.db.Close()
return nil
}
func (d *RocksDB) GetAddress(txid string, vout uint32) (string, error) {
// log.Printf("rocksdb: outpoint get %s:%d", txid, vout)
k, err := packOutpointKey(txid, vout)
if err != nil {
return "", err
}
v, err := d.db.Get(d.ro, k)
if err != nil {
return "", err
}
defer v.Free()
return unpackAddress(v.Data())
}
func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn func(txids []string) error) (err error) {
log.Printf("rocksdb: address get %d:%d %s", lower, higher, address)
kstart, err := packAddressKey(lower, address)
if err != nil {
return err
}
kstop, err := packAddressKey(higher, address)
if err != nil {
return err
}
it := d.db.NewIterator(d.ro)
defer it.Close()
for it.Seek(kstart); it.Valid(); it.Next() {
k := it.Key()
v := it.Value()
if bytes.Compare(k.Data(), kstop) > 0 {
break
}
txids, err := unpackAddressVal(v.Data())
if err != nil {
return err
}
if err := fn(txids); err != nil {
return err
}
}
return nil
}
func (d *RocksDB) ConnectBlock(block *Block, txids map[string][]string) error {
return d.writeBlock(block, txids, false /* delete */)
}
func (d *RocksDB) DisconnectBlock(block *Block, txids map[string][]string) error {
return d.writeBlock(block, txids, true /* delete */)
}
func (d *RocksDB) writeBlock(
block *Block,
txids map[string][]string,
delete bool,
) error {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
if err := d.writeHeight(wb, block, delete); err != nil {
return err
}
if err := d.writeOutpoints(wb, block, delete); err != nil {
return err
}
if err := d.writeAddresses(wb, block, txids, delete); err != nil {
return err
}
return d.db.Write(d.wo, wb)
}
// Address Index
func (d *RocksDB) writeAddresses(
wb *gorocksdb.WriteBatch,
block *Block,
txids map[string][]string,
delete bool,
) error {
if delete {
log.Printf("rocksdb: address delete %d in %d %s", len(txids), block.Height, block.Hash)
} else {
log.Printf("rocksdb: address put %d in %d %s", len(txids), block.Height, block.Hash)
}
for addr, txids := range txids {
k, err := packAddressKey(block.Height, addr)
if err != nil {
return err
}
v, err := packAddressVal(txids)
if err != nil {
return err
}
if delete {
wb.Delete(k)
} else {
wb.Put(k, v)
}
}
return nil
}
func packAddressKey(height uint32, address string) (b []byte, err error) {
b, err = packAddress(address)
if err != nil {
return
}
h := packUint(height)
b = append(b, h...)
return
}
func packAddressVal(txids []string) (b []byte, err error) {
for _, txid := range txids {
t, err := packTxid(txid)
if err != nil {
return nil, err
}
b = append(b, t...)
}
return
}
const txidLen = 32
func unpackAddressVal(b []byte) (txids []string, err error) {
for i := 0; i < len(b); i += txidLen {
t, err := unpackTxid(b[i : i+txidLen])
if err != nil {
return nil, err
}
txids = append(txids, t)
}
return
}
// Outpoint index
func (d *RocksDB) writeOutpoints(
wb *gorocksdb.WriteBatch,
block *Block,
delete bool,
) error {
if delete {
log.Printf("rocksdb: outpoints delete %d in %d %s", len(block.Txs), block.Height, block.Hash)
} else {
log.Printf("rocksdb: outpoints put %d in %d %s", len(block.Txs), block.Height, block.Hash)
}
for _, tx := range block.Txs {
for _, vout := range tx.Vout {
k, err := packOutpointKey(tx.Txid, vout.N)
if err != nil {
return err
}
v, err := packAddress(vout.GetAddress())
if err != nil {
return err
}
if delete {
wb.Delete(k)
} else {
if len(v) > 0 {
wb.Put(k, v)
}
}
}
}
return nil
}
func packOutpointKey(txid string, vout uint32) (b []byte, err error) {
t, err := packTxid(txid)
if err != nil {
return nil, err
}
v := packVarint(vout)
b = append(b, t...)
b = append(b, v...)
return
}
// Block index
const (
lastBlockHash = 0x00
)
var (
lastBlockHashKey = []byte{lastBlockHash}
)
func (d *RocksDB) GetLastBlockHash() (string, error) {
v, err := d.db.Get(d.ro, lastBlockHashKey)
if err != nil {
return "", err
}
defer v.Free()
return unpackBlockValue(v.Data())
}
func (d *RocksDB) writeHeight(
wb *gorocksdb.WriteBatch,
block *Block,
delete bool,
) error {
if delete {
log.Printf("rocksdb: height delete %d %s", block.Height, block.Hash)
} else {
log.Printf("rocksdb: height put %d %s", block.Height, block.Hash)
}
bk := packUint(block.Height)
if delete {
bv, err := packBlockValue(block.Prev)
if err != nil {
return err
}
wb.Delete(bk)
wb.Put(lastBlockHashKey, bv)
} else {
bv, err := packBlockValue(block.Hash)
if err != nil {
return err
}
wb.Put(bk, bv)
wb.Put(lastBlockHashKey, bv)
}
return nil
}
// Helpers
func packUint(i uint32) []byte {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, i)
return b
}
func packVarint(i uint32) []byte {
b := make([]byte, vlq.MaxLen32)
n := vlq.PutUint(b, uint64(i))
return b[:n]
}
var (
ErrInvalidAddress = errors.New("invalid address")
)
func packAddress(s string) ([]byte, error) {
var b []byte
if len(s) == 0 {
return b, nil
}
b = base58.Decode(s)
if len(b) <= 4 {
return nil, ErrInvalidAddress
}
b = b[:len(b)-4] // Slice off the checksum
return b, nil
}
func unpackAddress(b []byte) (string, error) {
if len(b) == 0 {
return "", nil
}
if len(b) == 1 {
return "", ErrInvalidAddress
}
return base58.CheckEncode(b[1:], b[0]), nil
}
func packTxid(s string) ([]byte, error) {
return hex.DecodeString(s)
}
func unpackTxid(b []byte) (string, error) {
return hex.EncodeToString(b), nil
}
func packBlockValue(hash string) ([]byte, error) {
return hex.DecodeString(hash)
}
func unpackBlockValue(b []byte) (string, error) {
return hex.EncodeToString(b), nil
}