blockbook/rocksdb.go

349 lines
6.9 KiB
Go

package main
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"log"
"github.com/bsm/go-vlq"
"github.com/btcsuite/btcutil/base58"
"github.com/tecbot/gorocksdb"
)
type RocksDB struct {
db *gorocksdb.DB
wo *gorocksdb.WriteOptions
ro *gorocksdb.ReadOptions
}
// NewRocksDB opens an internal handle to RocksDB environment. Close
// needs to be called to release it.
func NewRocksDB(path string) (d *RocksDB, err error) {
log.Printf("rocksdb: open %s", path)
fp := gorocksdb.NewBloomFilter(10)
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockCache(gorocksdb.NewLRUCache(3 << 30))
bbto.SetFilterPolicy(fp)
opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCreateIfMissing(true)
opts.SetMaxBackgroundCompactions(4)
db, err := gorocksdb.OpenDb(opts, path)
if err != nil {
return
}
wo := gorocksdb.NewDefaultWriteOptions()
ro := gorocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
return &RocksDB{db, wo, ro}, nil
}
// Close releases the RocksDB environment opened in NewRocksDB.
func (d *RocksDB) Close() error {
log.Printf("rocksdb: close")
d.wo.Destroy()
d.ro.Destroy()
d.db.Close()
return nil
}
func (d *RocksDB) GetOutpointAddresses(txid string, vout uint32) ([]string, error) {
log.Printf("rocksdb: outpoint get %s:%d", txid, vout)
k, err := packOutpointKey(txid, vout)
if err != nil {
return nil, err
}
v, err := d.db.Get(d.ro, k)
if err != nil {
return nil, err
}
defer v.Free()
return unpackOutpointValue(v.Data())
}
func (d *RocksDB) GetAddressTransactions(address string, lower uint32, higher uint32, fn func(txids []string) error) (err error) {
log.Printf("rocksdb: address get %d:%d %s", lower, higher, address)
kstart, err := packAddressKey(lower, address)
if err != nil {
return err
}
kstop, err := packAddressKey(higher, address)
if err != nil {
return err
}
it := d.db.NewIterator(d.ro)
defer it.Close()
for it.Seek(kstart); it.Valid(); it.Next() {
k := it.Key()
v := it.Value()
if bytes.Compare(k.Data(), kstop) > 0 {
break
}
txids, err := unpackAddressVal(v.Data())
if err != nil {
return err
}
if err := fn(txids); err != nil {
return err
}
}
return nil
}
func (d *RocksDB) ConnectBlock(block *Block, txids map[string][]string) error {
return d.writeBlock(block, txids, false /* delete */)
}
func (d *RocksDB) DisconnectBlock(block *Block, txids map[string][]string) error {
return d.writeBlock(block, txids, true /* delete */)
}
func (d *RocksDB) writeBlock(
block *Block,
txids map[string][]string,
delete bool,
) error {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
if err := d.writeHeight(wb, block, delete); err != nil {
return err
}
if err := d.writeOutpoints(wb, block, delete); err != nil {
return err
}
if err := d.writeAddresses(wb, block, txids, delete); err != nil {
return err
}
return d.db.Write(d.wo, wb)
}
// Address Index
func (d *RocksDB) writeAddresses(
wb *gorocksdb.WriteBatch,
block *Block,
txids map[string][]string,
delete bool,
) error {
if delete {
log.Printf("rocksdb: address delete %d in %d %s", len(txids), block.Height, block.Hash)
} else {
log.Printf("rocksdb: address put %d in %d %s", len(txids), block.Height, block.Hash)
}
for addr, txids := range txids {
k, err := packAddressKey(block.Height, addr)
if err != nil {
return err
}
v, err := packAddressVal(txids)
if err != nil {
return err
}
if delete {
wb.Delete(k)
} else {
wb.Put(k, v)
}
}
return nil
}
func packAddressKey(height uint32, address string) (b []byte, err error) {
b, err = packAddress(address)
if err != nil {
return
}
h := packUint(height)
b = append(b, h...)
return
}
func packAddressVal(txids []string) (b []byte, err error) {
for _, txid := range txids {
t, err := packTxid(txid)
if err != nil {
return nil, err
}
b = append(b, t...)
}
return
}
const transactionIDLen = 32
func unpackAddressVal(b []byte) (txids []string, err error) {
for i := 0; i < len(b); i += transactionIDLen {
t, err := unpackTxid(b[i : i+transactionIDLen])
if err != nil {
return nil, err
}
txids = append(txids, t)
}
return
}
// Outpoint index
func (d *RocksDB) writeOutpoints(
wb *gorocksdb.WriteBatch,
block *Block,
delete bool,
) error {
if delete {
log.Printf("rocksdb: outpoints delete %d in %d %s", len(block.Txs), block.Height, block.Hash)
} else {
log.Printf("rocksdb: outpoints put %d in %d %s", len(block.Txs), block.Height, block.Hash)
}
for _, tx := range block.Txs {
for _, vout := range tx.Vout {
k, err := packOutpointKey(tx.Txid, vout.N)
if err != nil {
return err
}
v, err := packOutpointValue(vout.ScriptPubKey.Addresses)
if err != nil {
return err
}
if delete {
wb.Delete(k)
} else {
wb.Put(k, v)
}
}
}
return nil
}
func packOutpointKey(txid string, vout uint32) (b []byte, err error) {
t, err := packTxid(txid)
if err != nil {
return nil, err
}
v := packVarint(vout)
b = append(b, t...)
b = append(b, v...)
return
}
func packOutpointValue(addrs []string) (b []byte, err error) {
for _, addr := range addrs {
a, err := packAddress(addr)
if err != nil {
return nil, err
}
i := packVarint(uint32(len(a)))
b = append(b, i...)
b = append(b, a...)
}
return
}
func unpackOutpointValue(b []byte) (addrs []string, err error) {
r := bytes.NewReader(b)
for r.Len() > 0 {
alen, err := vlq.ReadUint(r)
if err != nil {
return nil, err
}
abuf := make([]byte, alen)
_, err = r.Read(abuf)
if err != nil {
return nil, err
}
addr, err := unpackAddress(abuf)
if err != nil {
return nil, err
}
addrs = append(addrs, addr)
}
return
}
// Block index
func (d *RocksDB) writeHeight(
wb *gorocksdb.WriteBatch,
block *Block,
delete bool,
) error {
if delete {
log.Printf("rocksdb: height delete %d %s", block.Height, block.Hash)
} else {
log.Printf("rocksdb: height put %d %s", block.Height, block.Hash)
}
bv, err := packBlockValue(block.Hash)
if err != nil {
return err
}
bk := packUint(block.Height)
wb.Put(bk, bv)
return nil
}
// Helpers
func packUint(i uint32) []byte {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, i)
return b
}
func packVarint(i uint32) []byte {
b := make([]byte, vlq.MaxLen32)
n := vlq.PutUint(b, uint64(i))
return b[:n]
}
var (
ErrInvalidAddress = errors.New("invalid address")
)
func packAddress(s string) (b []byte, err error) {
b = base58.Decode(s)
if len(b) > 4 {
b = b[:len(b)-4]
} else {
err = ErrInvalidAddress
}
return
}
func unpackAddress(b []byte) (s string, err error) {
if len(b) > 1 {
s = base58.CheckEncode(b[1:], b[0])
} else {
err = ErrInvalidAddress
}
return
}
func packTxid(s string) (b []byte, err error) {
return hex.DecodeString(s)
}
func unpackTxid(b []byte) (s string, err error) {
return hex.EncodeToString(b), nil
}
func packBlockValue(hash string) ([]byte, error) {
return hex.DecodeString(hash)
}