rework index type

pull/1/head
Jan Pochyla 2017-10-05 14:35:07 +02:00
parent abaedd4f39
commit 7907bfeac7
4 changed files with 323 additions and 376 deletions

View File

@ -293,15 +293,6 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*Tx, error) {
return &res.Result, nil
}
// GetAddress returns address of transaction output.
func (b *BitcoinRPC) GetAddress(txid string, vout uint32) (string, error) {
tx, err := b.GetTransaction(txid)
if err != nil {
return "", err
}
return tx.GetAddress(vout), nil
}
func (b *BitcoinRPC) call(req interface{}, res interface{}) error {
httpData, err := json.Marshal(req)
if err != nil {

View File

@ -3,6 +3,7 @@ package main
import (
"flag"
"log"
"sync"
"time"
"github.com/pkg/profile"
@ -12,105 +13,19 @@ type BlockParser interface {
ParseBlock(b []byte) (*Block, error)
}
type Blocks interface {
type Blockchain interface {
GetBestBlockHash() (string, error)
GetBlockHash(height uint32) (string, error)
GetBlockHeader(hash string) (*BlockHeader, error)
GetBlock(hash string) (*Block, error)
}
type Outpoints interface {
// GetAddress looks up a transaction output and returns its address.
// Address can be empty string in case it's not found or not
// intelligable.
GetAddress(txid string, vout uint32) (string, error)
}
type Addresses interface {
GetTransactions(address string, lower uint32, higher uint32, fn func(txids []string) error) error
}
type Indexer interface {
ConnectBlock(block *Block, txids map[string][]string) error
DisconnectBlock(block *Block, txids map[string][]string) error
GetLastBlockHash() (string, error)
}
func (b *Block) GetAllAddresses(outpoints Outpoints) (map[string][]string, error) {
addrs := make(map[string][]string, 0) // Address to a list of txids.
for i, _ := range b.Txs {
tx := &b.Txs[i]
ta, err := b.GetTxAddresses(outpoints, tx)
if err != nil {
return nil, err
}
for a, _ := range ta {
addrs[a] = append(addrs[a], tx.Txid)
}
}
return addrs, nil
}
func (b *Block) GetTxAddresses(outpoints Outpoints, tx *Tx) (map[string]struct{}, error) {
addrs := make(map[string]struct{}) // Only unique values.
// Process outputs.
for _, o := range tx.Vout {
a := o.GetAddress()
if a != "" {
addrs[a] = struct{}{}
}
}
// Process inputs. For each input, we need to take a look to the
// outpoint index.
for _, i := range tx.Vin {
if i.Coinbase != "" {
continue
}
// Lookup output in in the outpoint index. In case it's not
// found, take a look in this block.
a, err := outpoints.GetAddress(i.Txid, i.Vout)
if err != nil {
return nil, err
}
if a == "" {
a = b.GetAddress(i.Txid, i.Vout)
}
if a != "" {
addrs[a] = struct{}{}
} else {
log.Printf("warn: output not found: %s:%d", i.Txid, i.Vout)
}
}
return addrs, nil
}
func (b *Block) GetAddress(txid string, vout uint32) string {
for i, _ := range b.Txs {
if b.Txs[i].Txid == txid {
return b.Txs[i].GetAddress(vout)
}
}
return "" // tx not found
}
func (t *Tx) GetAddress(vout uint32) string {
if vout < uint32(len(t.Vout)) {
return t.Vout[vout].GetAddress()
}
return "" // output not found
}
func (o *Vout) GetAddress() string {
if len(o.ScriptPubKey.Addresses) == 1 {
return o.ScriptPubKey.Addresses[0]
}
return "" // output address not intelligible
type Index interface {
GetBestBlockHash() (string, error)
GetBlockHash(height uint32) (string, error)
GetTransactions(address string, lower uint32, higher uint32, fn func(txid string) error) error
ConnectBlock(block *Block) error
DisconnectBlock(block *Block) error
}
var (
@ -121,8 +36,9 @@ var (
dbPath = flag.String("path", "./data", "path to address index directory")
blockHeight = flag.Int("blockheight", -1, "height of the starting block")
blockUntil = flag.Int("blockuntil", -1, "height of the final block")
blockHeight = flag.Int("blockheight", -1, "height of the starting block")
blockUntil = flag.Int("blockuntil", -1, "height of the final block")
queryAddress = flag.String("address", "", "query contents of this address")
resync = flag.Bool("resync", false, "resync until tip")
@ -133,10 +49,6 @@ var (
func main() {
flag.Parse()
if *prof {
defer profile.Start().Stop()
}
if *repair {
if err := RepairRocksDB(*dbPath); err != nil {
log.Fatal(err)
@ -144,8 +56,15 @@ func main() {
return
}
timeout := time.Duration(*rpcTimeout) * time.Second
rpc := NewBitcoinRPC(*rpcURL, *rpcUser, *rpcPass, timeout)
if *prof {
defer profile.Start().Stop()
}
rpc := NewBitcoinRPC(
*rpcURL,
*rpcUser,
*rpcPass,
time.Duration(*rpcTimeout)*time.Second)
db, err := NewRocksDB(*dbPath)
if err != nil {
@ -154,7 +73,7 @@ func main() {
defer db.Close()
if *resync {
if err := resyncIndex(rpc, db, db); err != nil {
if err := resyncIndex(rpc, db); err != nil {
log.Fatal(err)
}
}
@ -172,54 +91,48 @@ func main() {
log.Fatal(err)
}
} else {
if err = connectBlockRange(rpc, db, db, height, until); err != nil {
if err = connectBlocksParallel(rpc, db, height, until); err != nil {
log.Fatal(err)
}
}
}
}
func printResult(txids []string) error {
for i, txid := range txids {
log.Printf("%d: %s", i, txid)
}
func printResult(txid string) error {
log.Printf("%s", txid)
return nil
}
func resyncIndex(
blocks Blocks,
outpoints Outpoints,
index Indexer,
) error {
best, err := blocks.GetBestBlockHash()
func resyncIndex(chain Blockchain, index Index) error {
remote, err := chain.GetBestBlockHash()
if err != nil {
return err
}
last, err := index.GetLastBlockHash()
local, err := index.GetBestBlockHash()
if err != nil {
last = ""
local = ""
}
// If the local block is missing, we're indexing from the genesis block.
if last == "" {
if local == "" {
log.Printf("resync: genesis")
hash, err := blocks.GetBlockHash(0)
hash, err := chain.GetBlockHash(0)
if err != nil {
return err
}
return connectBlock(blocks, outpoints, index, hash)
return connectBlock(chain, index, hash)
}
// If the locally indexed block is the same as the best block on the
// network, we're done.
if last == best {
log.Printf("resync: synced on %s", last)
if local == remote {
log.Printf("resync: synced on %s", local)
return nil
}
// Is local tip on the best chain?
header, err := blocks.GetBlockHeader(last)
header, err := chain.GetBlockHeader(local)
forked := false
if err != nil {
if e, ok := err.(*RPCError); ok && e.Message == "Block not found" {
@ -236,114 +149,142 @@ func resyncIndex(
if forked {
log.Printf("resync: local is forked")
// TODO: resync after disconnecting
return disconnectBlock(blocks, outpoints, index, header.Hash)
return disconnectBlock(chain, index, header.Hash)
} else {
log.Printf("resync: local is behind")
return connectBlock(blocks, outpoints, index, header.Next)
return connectBlock(chain, index, header.Next)
}
}
func connectBlock(
blocks Blocks,
outpoints Outpoints,
index Indexer,
chain Blockchain,
index Index,
hash string,
) error {
bch := make(chan blockResult, 8)
done := make(chan struct{})
defer close(done)
go getBlockChain(hash, blocks, bch, done)
go getBlockChain(hash, chain, bch, done)
for res := range bch {
err := res.err
block := res.block
if err != nil {
return err
if res.err != nil {
return res.err
}
addrs, err := block.GetAllAddresses(outpoints)
err := index.ConnectBlock(res.block)
if err != nil {
return err
}
if err := index.ConnectBlock(block, addrs); err != nil {
return err
}
}
return nil
}
func disconnectBlock(
blocks Blocks,
outpoints Outpoints,
index Indexer,
chain Blockchain,
index Index,
hash string,
) error {
return nil
}
func connectBlockRange(
blocks Blocks,
outpoints Outpoints,
index Indexer,
func connectBlocksParallel(
chain Blockchain,
index Index,
lower uint32,
higher uint32,
) error {
bch := make(chan blockResult, 3)
const chunkSize = 100
const numWorkers = 8
go getBlockRange(lower, higher, blocks, bch)
var wg sync.WaitGroup
for res := range bch {
if res.err != nil {
return res.err
work := func(i int) {
defer wg.Done()
offset := uint32(chunkSize * i)
stride := uint32(chunkSize * numWorkers)
for low := offset; low <= higher; low += stride {
high := low + chunkSize - 1
if high > higher {
high = higher
}
err := connectBlockChunk(chain, index, low, high)
if err != nil {
log.Fatal(err) // TODO
}
}
addrs, err := res.block.GetAllAddresses(outpoints)
}
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go work(i)
}
wg.Wait()
return nil
}
func connectBlockChunk(
chain Blockchain,
index Index,
lower uint32,
higher uint32,
) error {
connected, err := isBlockConnected(chain, index, higher)
if err != nil || connected {
return err
}
height := lower
hash, err := chain.GetBlockHash(lower)
if err != nil {
return err
}
for height <= higher {
block, err := chain.GetBlock(hash)
if err != nil {
return err
}
if err := index.ConnectBlock(res.block, addrs); err != nil {
hash = block.Next
height = block.Height + 1
err = index.ConnectBlock(block)
if err != nil {
return err
}
}
return nil
}
func isBlockConnected(
chain Blockchain,
index Index,
height uint32,
) (bool, error) {
local, err := index.GetBlockHash(height)
if err != nil {
return false, err
}
remote, err := chain.GetBlockHash(height)
if err != nil {
return false, err
}
if local != remote {
return false, nil
}
return true, nil
}
type blockResult struct {
block *Block
err error
}
func getBlockRange(
lower uint32,
higher uint32,
blocks Blocks,
results chan<- blockResult,
) {
defer close(results)
height := lower
hash, err := blocks.GetBlockHash(height)
if err != nil {
results <- blockResult{err: err}
return
}
for height <= higher {
block, err := blocks.GetBlock(hash)
if err != nil {
results <- blockResult{err: err}
return
}
hash = block.Next
height = block.Height + 1
results <- blockResult{block: block}
}
}
func getBlockChain(
hash string,
blocks Blocks,
chain Blockchain,
out chan blockResult,
done chan struct{},
) {
@ -355,7 +296,7 @@ func getBlockChain(
return
default:
}
block, err := blocks.GetBlock(hash)
block, err := chain.GetBlock(hash)
if err != nil {
out <- blockResult{err: err}
return

View File

@ -66,29 +66,14 @@ func (d *RocksDB) Close() error {
return nil
}
func (d *RocksDB) GetAddress(txid string, vout uint32) (string, error) {
// log.Printf("rocksdb: outpoint get %s:%d", txid, vout)
k, err := packOutpointKey(txid, vout)
if err != nil {
return "", err
}
v, err := d.db.Get(d.ro, k)
if err != nil {
return "", err
}
defer v.Free()
return unpackAddress(v.Data())
}
func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn func(txids []string) error) (err error) {
func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn func(txid string) error) (err error) {
log.Printf("rocksdb: address get %d:%d %s", lower, higher, address)
kstart, err := packAddressKey(lower, address)
kstart, err := packOutputKey(address, lower)
if err != nil {
return err
}
kstop, err := packAddressKey(higher, address)
kstop, err := packOutputKey(address, higher)
if err != nil {
return err
}
@ -97,211 +82,237 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f
defer it.Close()
for it.Seek(kstart); it.Valid(); it.Next() {
k := it.Key()
v := it.Value()
if bytes.Compare(k.Data(), kstop) > 0 {
key := it.Key()
val := it.Value()
if bytes.Compare(key.Data(), kstop) > 0 {
break
}
txids, err := unpackAddressVal(v.Data())
outpoints, err := unpackOutputValue(val.Data())
if err != nil {
return err
}
if err := fn(txids); err != nil {
return err
for _, o := range outpoints {
if err := fn(o.txid); err != nil {
return err
}
}
}
return nil
}
func (d *RocksDB) ConnectBlock(block *Block, txids map[string][]string) error {
return d.writeBlock(block, txids, false /* delete */)
const (
opInsert = 0
opDelete = 1
)
func (d *RocksDB) ConnectBlock(block *Block) error {
return d.writeBlock(block, opInsert)
}
func (d *RocksDB) DisconnectBlock(block *Block, txids map[string][]string) error {
return d.writeBlock(block, txids, true /* delete */)
func (d *RocksDB) DisconnectBlock(block *Block) error {
return d.writeBlock(block, opDelete)
}
func (d *RocksDB) writeBlock(
block *Block,
txids map[string][]string,
delete bool,
) error {
func (d *RocksDB) writeBlock(block *Block, op int) error {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
if err := d.writeHeight(wb, block, delete); err != nil {
if err := d.writeHeight(wb, block, op); err != nil {
return err
}
if err := d.writeOutpoints(wb, block, delete); err != nil {
if err := d.writeOutputs(wb, block, op); err != nil {
return err
}
if err := d.writeAddresses(wb, block, txids, delete); err != nil {
if err := d.writeInputs(wb, block, op); err != nil {
return err
}
return d.db.Write(d.wo, wb)
}
// Address Index
// Output Index
func (d *RocksDB) writeAddresses(
type outpoint struct {
txid string
vout uint32
}
func (d *RocksDB) writeOutputs(
wb *gorocksdb.WriteBatch,
block *Block,
txids map[string][]string,
delete bool,
op int,
) error {
if delete {
log.Printf("rocksdb: address delete %d in %d %s", len(txids), block.Height, block.Hash)
} else {
log.Printf("rocksdb: address put %d in %d %s", len(txids), block.Height, block.Hash)
switch op {
case opInsert:
log.Printf("rocksdb: outputs insert %d %s", block.Height, block.Hash)
case opDelete:
log.Printf("rocksdb: outputs delete %d %s", block.Height, block.Hash)
}
for addr, txids := range txids {
k, err := packAddressKey(block.Height, addr)
if err != nil {
return err
}
v, err := packAddressVal(txids)
if err != nil {
return err
}
if delete {
wb.Delete(k)
} else {
wb.Put(k, v)
}
}
return nil
}
func packAddressKey(height uint32, address string) (b []byte, err error) {
b, err = packAddress(address)
if err != nil {
return
}
h := packUint(height)
b = append(b, h...)
return
}
func packAddressVal(txids []string) (b []byte, err error) {
for _, txid := range txids {
t, err := packTxid(txid)
if err != nil {
return nil, err
}
b = append(b, t...)
}
return
}
const txidLen = 32
func unpackAddressVal(b []byte) (txids []string, err error) {
for i := 0; i < len(b); i += txidLen {
t, err := unpackTxid(b[i : i+txidLen])
if err != nil {
return nil, err
}
txids = append(txids, t)
}
return
}
// Outpoint index
func (d *RocksDB) writeOutpoints(
wb *gorocksdb.WriteBatch,
block *Block,
delete bool,
) error {
if delete {
log.Printf("rocksdb: outpoints delete %d in %d %s", len(block.Txs), block.Height, block.Hash)
} else {
log.Printf("rocksdb: outpoints put %d in %d %s", len(block.Txs), block.Height, block.Hash)
}
records := make(map[string][]outpoint)
for _, tx := range block.Txs {
for _, vout := range tx.Vout {
k, err := packOutpointKey(tx.Txid, vout.N)
if err != nil {
return err
}
v, err := packAddress(vout.GetAddress())
if err != nil {
return err
}
if delete {
wb.Delete(k)
for _, output := range tx.Vout {
address := output.GetAddress()
if address != "" {
records[address] = append(records[address], outpoint{
txid: tx.Txid,
vout: output.N,
})
} else {
if len(v) > 0 {
wb.Put(k, v)
}
log.Printf("rocksdb: skipping %s:%d", tx.Txid, output.N)
}
}
}
for address, outpoints := range records {
key, err := packOutputKey(address, block.Height)
if err != nil {
return err
}
val, err := packOutputValue(outpoints)
switch op {
case opInsert:
wb.Put(key, val)
case opDelete:
wb.Delete(key)
}
}
return nil
}
func packOutpointKey(txid string, vout uint32) (b []byte, err error) {
t, err := packTxid(txid)
func packOutputKey(address string, height uint32) ([]byte, error) {
baddress, err := packAddress(address)
if err != nil {
return nil, err
}
v := packVarint(vout)
b = append(b, t...)
b = append(b, v...)
return
bheight := packUint(height)
buf := make([]byte, 0, len(baddress)+len(bheight))
buf = append(buf, baddress...)
buf = append(buf, bheight...)
return buf, nil
}
func packOutputValue(outpoints []outpoint) ([]byte, error) {
buf := make([]byte, 0)
for _, o := range outpoints {
btxid, err := packTxid(o.txid)
if err != nil {
return nil, err
}
bvout := packVaruint(o.vout)
buf = append(buf, btxid...)
buf = append(buf, bvout...)
}
return buf, nil
}
func unpackOutputValue(buf []byte) ([]outpoint, error) {
outpoints := make([]outpoint, 0)
for i := 0; i < len(buf); {
txid, err := unpackTxid(buf[i : i+txIdUnpackedLen])
if err != nil {
return nil, err
}
i += txIdUnpackedLen
vout, voutLen := unpackVaruint(buf[i:])
i += voutLen
outpoints = append(outpoints, outpoint{
txid: txid,
vout: vout,
})
}
return outpoints, nil
}
// Input index
func (d *RocksDB) writeInputs(
wb *gorocksdb.WriteBatch,
block *Block,
op int,
) error {
switch op {
case opInsert:
log.Printf("rocksdb: inputs insert %d %s", block.Height, block.Hash)
case opDelete:
log.Printf("rocksdb: inputs delete %d %s", block.Height, block.Hash)
}
for _, tx := range block.Txs {
for i, input := range tx.Vin {
key, err := packOutpoint(input.Txid, input.Vout)
if err != nil {
return err
}
val, err := packOutpoint(tx.Txid, uint32(i))
if err != nil {
return err
}
switch op {
case opInsert:
wb.Put(key, val)
case opDelete:
wb.Delete(key)
}
}
}
return nil
}
func packOutpoint(txid string, vout uint32) ([]byte, error) {
btxid, err := packTxid(txid)
if err != nil {
return nil, err
}
bvout := packVaruint(vout)
buf := make([]byte, 0, len(btxid)+len(bvout))
buf = append(buf, btxid...)
buf = append(buf, bvout...)
return buf, nil
}
// Block index
const (
lastBlockHash = 0x00
)
func (d *RocksDB) GetBestBlockHash() (string, error) {
return "", nil // TODO
}
var (
lastBlockHashKey = []byte{lastBlockHash}
)
func (d *RocksDB) GetLastBlockHash() (string, error) {
v, err := d.db.Get(d.ro, lastBlockHashKey)
func (d *RocksDB) GetBlockHash(height uint32) (string, error) {
key := packUint(height)
val, err := d.db.Get(d.ro, key)
if err != nil {
return "", err
}
defer v.Free()
return unpackBlockValue(v.Data())
defer val.Free()
return unpackBlockValue(val.Data())
}
func (d *RocksDB) writeHeight(
wb *gorocksdb.WriteBatch,
block *Block,
delete bool,
op int,
) error {
if delete {
log.Printf("rocksdb: height delete %d %s", block.Height, block.Hash)
} else {
switch op {
case opInsert:
log.Printf("rocksdb: height put %d %s", block.Height, block.Hash)
case opDelete:
log.Printf("rocksdb: height delete %d %s", block.Height, block.Hash)
}
bk := packUint(block.Height)
key := packUint(block.Height)
if delete {
bv, err := packBlockValue(block.Prev)
switch op {
case opInsert:
val, err := packBlockValue(block.Hash)
if err != nil {
return err
}
wb.Delete(bk)
wb.Put(lastBlockHashKey, bv)
} else {
bv, err := packBlockValue(block.Hash)
if err != nil {
return err
}
wb.Put(bk, bv)
wb.Put(lastBlockHashKey, bv)
wb.Put(key, val)
case opDelete:
wb.Delete(key)
}
return nil
@ -309,57 +320,54 @@ func (d *RocksDB) writeHeight(
// Helpers
const txIdUnpackedLen = 32
var ErrInvalidAddress = errors.New("invalid address")
func packUint(i uint32) []byte {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, i)
return b
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, i)
return buf
}
func packVarint(i uint32) []byte {
b := make([]byte, vlq.MaxLen32)
n := vlq.PutUint(b, uint64(i))
return b[:n]
func packVaruint(i uint32) []byte {
buf := make([]byte, vlq.MaxLen32)
ofs := vlq.PutUint(buf, uint64(i))
return buf[:ofs]
}
var (
ErrInvalidAddress = errors.New("invalid address")
)
func unpackVaruint(buf []byte) (uint32, int) {
i, ofs := vlq.Uint(buf)
return uint32(i), ofs
}
func packAddress(s string) ([]byte, error) {
var b []byte
if len(s) == 0 {
return b, nil
}
b = base58.Decode(s)
if len(b) <= 4 {
func packAddress(address string) ([]byte, error) {
buf := base58.Decode(address)
if len(buf) <= 4 {
return nil, ErrInvalidAddress
}
b = b[:len(b)-4] // Slice off the checksum
return b, nil
return buf[:len(buf)-4], nil // Slice off the checksum
}
func unpackAddress(b []byte) (string, error) {
if len(b) == 0 {
return "", nil
}
if len(b) == 1 {
func unpackAddress(buf []byte) (string, error) {
if len(buf) < 2 {
return "", ErrInvalidAddress
}
return base58.CheckEncode(b[1:], b[0]), nil
return base58.CheckEncode(buf[1:], buf[0]), nil
}
func packTxid(s string) ([]byte, error) {
return hex.DecodeString(s)
func packTxid(txid string) ([]byte, error) {
return hex.DecodeString(txid)
}
func unpackTxid(b []byte) (string, error) {
return hex.EncodeToString(b), nil
func unpackTxid(buf []byte) (string, error) {
return hex.EncodeToString(buf), nil
}
func packBlockValue(hash string) ([]byte, error) {
return hex.DecodeString(hash)
}
func unpackBlockValue(b []byte) (string, error) {
return hex.EncodeToString(b), nil
func unpackBlockValue(buf []byte) (string, error) {
return hex.EncodeToString(buf), nil
}

View File

@ -26,6 +26,13 @@ type Vout struct {
ScriptPubKey ScriptPubKey `json:"scriptPubKey"`
}
func (vout *Vout) GetAddress() string {
if len(vout.ScriptPubKey.Addresses) != 1 {
return "" // output address not intelligible
}
return vout.ScriptPubKey.Addresses[0]
}
type Tx struct {
Txid string `json:"txid"`
Version int32 `json:"version"`