add resync, store only single addresses

pull/1/head
Jan Pochyla 2017-09-11 12:20:21 +02:00
parent 145ff58148
commit 787a8ea906
3 changed files with 256 additions and 125 deletions

View File

@ -75,9 +75,7 @@ func (b *BitcoinRPC) GetBlockAndParse(hash string) (block *Block, err error) {
}
block, err = b.Parser.ParseBlock(data)
if err == nil {
block.Hash = header.Hash
block.Height = header.Height
block.Next = header.Next
block.BlockHeader = header
}
return
}
@ -117,6 +115,14 @@ func (b *BitcoinRPC) GetBlockHash(height uint32) (hash string, err error) {
return
}
// GetBlockHeader returns header of block with given hash.
func (b *BitcoinRPC) GetBlockHeader(hash string) (header *BlockHeader, err error) {
log.Printf("rpc: getblockheader")
header = &BlockHeader{}
err = b.client.Call("getblockheader", &header, hash)
return
}
// GetTransaction returns the number of blocks in the longest chain. If the
// transaction cache is turned on, returned Tx.Confirmations is stale.
func (b *BitcoinRPC) GetTransaction(txid string) (tx *Tx, err error) {
@ -135,11 +141,10 @@ func (b *BitcoinRPC) GetTransaction(txid string) (tx *Tx, err error) {
return
}
// GetAddresses returns all unique addresses from given transaction output.
func (b *BitcoinRPC) GetAddresses(txid string, vout uint32) ([]string, error) {
func (b *BitcoinRPC) GetAddress(txid string, vout uint32) (string, error) {
tx, err := b.GetTransaction(txid)
if err != nil {
return nil, err
return "", err
}
return tx.Vout[vout].ScriptPubKey.Addresses, nil
return tx.GetAddress(vout)
}

View File

@ -12,17 +12,21 @@ type BlockParser interface {
}
var (
ErrTxNotFound = errors.New("transaction not found")
ErrNotFound = errors.New("not found")
)
type Blocks interface {
GetBestBlockHash() (string, error)
GetBlockHash(height uint32) (string, error)
GetBlockHeader(hash string) (*BlockHeader, error)
GetBlock(hash string) (*Block, error)
}
type Outpoints interface {
GetAddresses(txid string, vout uint32) ([]string, error)
// GetAddress looks up a transaction output and returns its address.
// ErrNotFound is returned if the output is not found. Address can be
// empty string in case it's not intelligable.
GetAddress(txid string, vout uint32) (string, error)
}
type Addresses interface {
@ -32,6 +36,89 @@ type Addresses interface {
type Indexer interface {
ConnectBlock(block *Block, txids map[string][]string) error
DisconnectBlock(block *Block, txids map[string][]string) error
GetLastBlockHash() (string, error)
}
func (b *Block) GetAllAddresses(outpoints Outpoints) (map[string][]string, error) {
addrs := make(map[string][]string, 0) // Address to a list of txids.
for _, tx := range b.Txs {
ta, err := b.GetTxAddresses(outpoints, tx)
if err != nil {
return nil, err
}
for a, _ := range ta {
addrs[a] = append(addrs[a], tx.Txid)
}
}
return addrs, nil
}
func (b *Block) GetTxAddresses(outpoints Outpoints, tx *Tx) (map[string]struct{}, error) {
addrs := make(map[string]struct{}) // Only unique values.
// Process outputs.
for _, o := range tx.Vout {
a := o.GetAddress()
if a != "" {
addrs[a] = struct{}{}
}
}
// Process inputs. For each input, we need to take a look to the
// outpoint index.
for _, i := range tx.Vin {
if i.Coinbase != "" {
continue
}
// Lookup output in in the outpoint index. In case it's not
// found, take a look in this block.
a, err := outpoints.GetAddress(i.Txid, i.Vout)
if err == ErrNotFound {
a, err = b.GetAddress(i.Txid, i.Vout)
}
if err != nil {
return nil, err
}
if a != "" {
addrs[a] = struct{}{}
}
}
return addrs, nil
}
func (b *Block) GetAddress(txid string, vout uint32) (string, error) {
var t *Tx
for _, tx := range b.Txs {
if tx.Txid == txid {
t = tx
break
}
}
if t == nil {
// Transaction output was not found.
return "", ErrNotFound
}
return t.GetAddress(vout)
}
func (t *Tx) GetAddress(vout uint32) (string, error) {
if vout >= uint32(len(t.Vout)) {
// The output doesn't exist.
return "", ErrNotFound
}
return t.Vout[vout].GetAddress(), nil
}
func (o *Vout) GetAddress() string {
if len(o.ScriptPubKey.Addresses) != 1 {
// The output address is not intelligible.
return ""
}
return o.ScriptPubKey.Addresses[0]
}
var (
@ -48,6 +135,8 @@ var (
blockHeight = flag.Int("blockheight", -1, "height of the starting block")
blockUntil = flag.Int("blockuntil", -1, "height of the final block")
queryAddress = flag.String("address", "", "query contents of this address")
resync = flag.Bool("resync", false, "resync until tip")
)
func main() {
@ -89,11 +178,17 @@ func main() {
log.Fatal(err)
}
} else {
if err = indexBlocks(rpc, db, db, height, until); err != nil {
if err = connectBlockRange(rpc, db, db, height, until); err != nil {
log.Fatal(err)
}
}
}
if *resync {
if err := resyncIndex(rpc, db, db); err != nil {
log.Fatal(err)
}
}
}
func printResult(txids []string) error {
@ -103,75 +198,98 @@ func printResult(txids []string) error {
return nil
}
func (b *Block) GetAllAddresses(outpoints Outpoints) (map[string][]string, error) {
addrs := make(map[string][]string, 0)
func resyncIndex(
blocks Blocks,
outpoints Outpoints,
index Indexer,
) error {
best, err := blocks.GetBestBlockHash()
if err != nil {
return err
}
last, err := index.GetLastBlockHash()
if err != nil {
last = ""
}
for _, tx := range b.Txs {
ta, err := b.GetTxAddresses(outpoints, tx)
// If the local block is missing, we're indexing from the genesis block.
if last == "" {
log.Printf("resync: genesis")
hash, err := blocks.GetBlockHash(0)
if err != nil {
return nil, err
return err
}
for _, addr := range ta {
addrs[addr] = append(addrs[addr], tx.Txid)
return connectBlock(blocks, outpoints, index, hash)
}
// If the locally indexed block is the same as the best block on the
// network, we're done.
if last == best {
log.Printf("resync: synced on %s", last)
return nil
}
// Is local tip on the best chain?
header, err := blocks.GetBlockHeader(last)
forked := false
if err != nil {
if e, ok := err.(*cmdError); ok && e.Message == "Block not found" {
forked = true
} else {
return err
}
} else {
if header.Confirmations < 0 {
forked = true
}
}
return addrs, nil
if forked {
log.Printf("resync: local is forked")
// TODO: resync after disconnecting
return disconnectBlock(blocks, outpoints, index, header.Hash)
} else {
log.Printf("resync: local is behind")
return connectBlock(blocks, outpoints, index, header.Next)
}
}
func (b *Block) GetTxAddresses(outpoints Outpoints, tx *Tx) ([]string, error) {
seen := make(map[string]struct{}) // Only unique values.
// Process outputs.
for _, o := range tx.Vout {
for _, a := range o.ScriptPubKey.Addresses {
seen[a] = struct{}{}
}
}
// Process inputs. For each input, we need to take a look to the
// outpoint index.
for _, i := range tx.Vin {
if i.Coinbase != "" {
continue
}
// Lookup output in in the outpoint index. In case it's not
// found, take a look in this block.
va, err := outpoints.GetAddresses(i.Txid, i.Vout)
if err == ErrTxNotFound {
va, err = b.GetAddresses(i.Txid, i.Vout)
}
func connectBlock(
blocks Blocks,
outpoints Outpoints,
index Indexer,
hash string,
) error {
for hash != "" {
block, err := blocks.GetBlock(hash)
if err != nil {
return nil, err
return err
}
addrs, err := block.GetAllAddresses(outpoints)
if err != nil {
return err
}
if err := index.ConnectBlock(block, addrs); err != nil {
return err
}
for _, a := range va {
seen[a] = struct{}{}
}
hash = block.Next
}
// Convert the result set into a slice.
addrs := make([]string, len(seen))
i := 0
for a := range seen {
addrs[i] = a
i++
}
return addrs, nil
return nil
}
func (b *Block) GetAddresses(txid string, vout uint32) ([]string, error) {
// TODO: Lookup transaction in constant time.
for _, tx := range b.Txs {
if tx.Txid == txid {
return tx.Vout[vout].ScriptPubKey.Addresses, nil
}
}
return nil, ErrTxNotFound
func disconnectBlock(
blocks Blocks,
outpoints Outpoints,
index Indexer,
hash string,
) error {
return nil
}
func indexBlocks(
func connectBlockRange(
blocks Blocks,
outpoints Outpoints,
index Indexer,
@ -180,7 +298,7 @@ func indexBlocks(
) error {
bch := make(chan blockResult, 3)
go getBlocks(lower, higher, blocks, bch)
go getBlockRange(lower, higher, blocks, bch)
for res := range bch {
if res.err != nil {
@ -202,7 +320,12 @@ type blockResult struct {
err error
}
func getBlocks(lower uint32, higher uint32, blocks Blocks, results chan<- blockResult) {
func getBlockRange(
lower uint32,
higher uint32,
blocks Blocks,
results chan<- blockResult,
) {
defer close(results)
height := lower

View File

@ -55,18 +55,18 @@ func (d *RocksDB) Close() error {
return nil
}
func (d *RocksDB) GetAddresses(txid string, vout uint32) ([]string, error) {
func (d *RocksDB) GetAddress(txid string, vout uint32) (string, error) {
log.Printf("rocksdb: outpoint get %s:%d", txid, vout)
k, err := packOutpointKey(txid, vout)
if err != nil {
return nil, err
return "", err
}
v, err := d.db.Get(d.ro, k)
if err != nil {
return nil, err
return "", err
}
defer v.Free()
return unpackOutpointValue(v.Data())
return unpackAddress(v.Data())
}
func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn func(txids []string) error) (err error) {
@ -184,11 +184,11 @@ func packAddressVal(txids []string) (b []byte, err error) {
return
}
const transactionIDLen = 32
const txidLen = 32
func unpackAddressVal(b []byte) (txids []string, err error) {
for i := 0; i < len(b); i += transactionIDLen {
t, err := unpackTxid(b[i : i+transactionIDLen])
for i := 0; i < len(b); i += txidLen {
t, err := unpackTxid(b[i : i+txidLen])
if err != nil {
return nil, err
}
@ -216,7 +216,7 @@ func (d *RocksDB) writeOutpoints(
if err != nil {
return err
}
v, err := packOutpointValue(vout.ScriptPubKey.Addresses)
v, err := packAddress(vout.GetAddress())
if err != nil {
return err
}
@ -241,42 +241,25 @@ func packOutpointKey(txid string, vout uint32) (b []byte, err error) {
return
}
func packOutpointValue(addrs []string) (b []byte, err error) {
for _, addr := range addrs {
a, err := packAddress(addr)
if err != nil {
return nil, err
}
i := packVarint(uint32(len(a)))
b = append(b, i...)
b = append(b, a...)
}
return
}
func unpackOutpointValue(b []byte) (addrs []string, err error) {
r := bytes.NewReader(b)
for r.Len() > 0 {
alen, err := vlq.ReadUint(r)
if err != nil {
return nil, err
}
abuf := make([]byte, alen)
_, err = r.Read(abuf)
if err != nil {
return nil, err
}
addr, err := unpackAddress(abuf)
if err != nil {
return nil, err
}
addrs = append(addrs, addr)
}
return
}
// Block index
const (
lastBlockHash = 0x00
)
var (
lastBlockHashKey = []byte{lastBlockHash}
)
func (d *RocksDB) GetLastBlockHash() (string, error) {
v, err := d.db.Get(d.ro, lastBlockHashKey)
if err != nil {
return "", err
}
defer v.Free()
return unpackBlockValue(v.Data())
}
func (d *RocksDB) writeHeight(
wb *gorocksdb.WriteBatch,
block *Block,
@ -288,12 +271,24 @@ func (d *RocksDB) writeHeight(
log.Printf("rocksdb: height put %d %s", block.Height, block.Hash)
}
bv, err := packBlockValue(block.Hash)
if err != nil {
return err
}
bk := packUint(block.Height)
wb.Put(bk, bv)
if delete {
bv, err := packBlockValue(block.Prev)
if err != nil {
return err
}
wb.Delete(bk)
wb.Put(lastBlockHashKey, bv)
} else {
bv, err := packBlockValue(block.Hash)
if err != nil {
return err
}
wb.Put(bk, bv)
wb.Put(lastBlockHashKey, bv)
}
return nil
}
@ -316,33 +311,41 @@ var (
ErrInvalidAddress = errors.New("invalid address")
)
func packAddress(s string) (b []byte, err error) {
func packAddress(s string) ([]byte, error) {
var b []byte
if len(s) == 0 {
return b, nil
}
b = base58.Decode(s)
if len(b) > 4 {
b = b[:len(b)-4]
} else {
err = ErrInvalidAddress
if len(b) <= 4 {
return nil, ErrInvalidAddress
}
return
b = b[:len(b)-4] // Slice off the checksum
return b, nil
}
func unpackAddress(b []byte) (s string, err error) {
if len(b) > 1 {
s = base58.CheckEncode(b[1:], b[0])
} else {
err = ErrInvalidAddress
func unpackAddress(b []byte) (string, error) {
if len(b) == 0 {
return "", nil
}
return
if len(b) == 1 {
return "", ErrInvalidAddress
}
return base58.CheckEncode(b[1:], b[0]), nil
}
func packTxid(s string) (b []byte, err error) {
func packTxid(s string) ([]byte, error) {
return hex.DecodeString(s)
}
func unpackTxid(b []byte) (s string, err error) {
func unpackTxid(b []byte) (string, error) {
return hex.EncodeToString(b), nil
}
func packBlockValue(hash string) ([]byte, error) {
return hex.DecodeString(hash)
}
func unpackBlockValue(b []byte) (string, error) {
return hex.EncodeToString(b), nil
}