From 787a8ea90659f6deb26d3af5a4c412474cdc1df5 Mon Sep 17 00:00:00 2001 From: Jan Pochyla Date: Mon, 11 Sep 2017 12:20:21 +0200 Subject: [PATCH] add resync, store only single addresses --- bitcoinrpc.go | 19 ++-- blockbook.go | 237 ++++++++++++++++++++++++++++++++++++++------------ rocksdb.go | 125 +++++++++++++------------- 3 files changed, 256 insertions(+), 125 deletions(-) diff --git a/bitcoinrpc.go b/bitcoinrpc.go index 53f0ca78..e2847cbc 100644 --- a/bitcoinrpc.go +++ b/bitcoinrpc.go @@ -75,9 +75,7 @@ func (b *BitcoinRPC) GetBlockAndParse(hash string) (block *Block, err error) { } block, err = b.Parser.ParseBlock(data) if err == nil { - block.Hash = header.Hash - block.Height = header.Height - block.Next = header.Next + block.BlockHeader = header } return } @@ -117,6 +115,14 @@ func (b *BitcoinRPC) GetBlockHash(height uint32) (hash string, err error) { return } +// GetBlockHeader returns header of block with given hash. +func (b *BitcoinRPC) GetBlockHeader(hash string) (header *BlockHeader, err error) { + log.Printf("rpc: getblockheader") + header = &BlockHeader{} + err = b.client.Call("getblockheader", &header, hash) + return +} + // GetTransaction returns the number of blocks in the longest chain. If the // transaction cache is turned on, returned Tx.Confirmations is stale. func (b *BitcoinRPC) GetTransaction(txid string) (tx *Tx, err error) { @@ -135,11 +141,10 @@ func (b *BitcoinRPC) GetTransaction(txid string) (tx *Tx, err error) { return } -// GetAddresses returns all unique addresses from given transaction output. -func (b *BitcoinRPC) GetAddresses(txid string, vout uint32) ([]string, error) { +func (b *BitcoinRPC) GetAddress(txid string, vout uint32) (string, error) { tx, err := b.GetTransaction(txid) if err != nil { - return nil, err + return "", err } - return tx.Vout[vout].ScriptPubKey.Addresses, nil + return tx.GetAddress(vout) } diff --git a/blockbook.go b/blockbook.go index 91e7ae88..87f69726 100644 --- a/blockbook.go +++ b/blockbook.go @@ -12,17 +12,21 @@ type BlockParser interface { } var ( - ErrTxNotFound = errors.New("transaction not found") + ErrNotFound = errors.New("not found") ) type Blocks interface { GetBestBlockHash() (string, error) GetBlockHash(height uint32) (string, error) + GetBlockHeader(hash string) (*BlockHeader, error) GetBlock(hash string) (*Block, error) } type Outpoints interface { - GetAddresses(txid string, vout uint32) ([]string, error) + // GetAddress looks up a transaction output and returns its address. + // ErrNotFound is returned if the output is not found. Address can be + // empty string in case it's not intelligable. + GetAddress(txid string, vout uint32) (string, error) } type Addresses interface { @@ -32,6 +36,89 @@ type Addresses interface { type Indexer interface { ConnectBlock(block *Block, txids map[string][]string) error DisconnectBlock(block *Block, txids map[string][]string) error + GetLastBlockHash() (string, error) +} + +func (b *Block) GetAllAddresses(outpoints Outpoints) (map[string][]string, error) { + addrs := make(map[string][]string, 0) // Address to a list of txids. + + for _, tx := range b.Txs { + ta, err := b.GetTxAddresses(outpoints, tx) + if err != nil { + return nil, err + } + for a, _ := range ta { + addrs[a] = append(addrs[a], tx.Txid) + } + } + + return addrs, nil +} + +func (b *Block) GetTxAddresses(outpoints Outpoints, tx *Tx) (map[string]struct{}, error) { + addrs := make(map[string]struct{}) // Only unique values. + + // Process outputs. + for _, o := range tx.Vout { + a := o.GetAddress() + if a != "" { + addrs[a] = struct{}{} + } + } + + // Process inputs. For each input, we need to take a look to the + // outpoint index. + for _, i := range tx.Vin { + if i.Coinbase != "" { + continue + } + + // Lookup output in in the outpoint index. In case it's not + // found, take a look in this block. + a, err := outpoints.GetAddress(i.Txid, i.Vout) + if err == ErrNotFound { + a, err = b.GetAddress(i.Txid, i.Vout) + } + if err != nil { + return nil, err + } + if a != "" { + addrs[a] = struct{}{} + } + } + + return addrs, nil +} + +func (b *Block) GetAddress(txid string, vout uint32) (string, error) { + var t *Tx + for _, tx := range b.Txs { + if tx.Txid == txid { + t = tx + break + } + } + if t == nil { + // Transaction output was not found. + return "", ErrNotFound + } + return t.GetAddress(vout) +} + +func (t *Tx) GetAddress(vout uint32) (string, error) { + if vout >= uint32(len(t.Vout)) { + // The output doesn't exist. + return "", ErrNotFound + } + return t.Vout[vout].GetAddress(), nil +} + +func (o *Vout) GetAddress() string { + if len(o.ScriptPubKey.Addresses) != 1 { + // The output address is not intelligible. + return "" + } + return o.ScriptPubKey.Addresses[0] } var ( @@ -48,6 +135,8 @@ var ( blockHeight = flag.Int("blockheight", -1, "height of the starting block") blockUntil = flag.Int("blockuntil", -1, "height of the final block") queryAddress = flag.String("address", "", "query contents of this address") + + resync = flag.Bool("resync", false, "resync until tip") ) func main() { @@ -89,11 +178,17 @@ func main() { log.Fatal(err) } } else { - if err = indexBlocks(rpc, db, db, height, until); err != nil { + if err = connectBlockRange(rpc, db, db, height, until); err != nil { log.Fatal(err) } } } + + if *resync { + if err := resyncIndex(rpc, db, db); err != nil { + log.Fatal(err) + } + } } func printResult(txids []string) error { @@ -103,75 +198,98 @@ func printResult(txids []string) error { return nil } -func (b *Block) GetAllAddresses(outpoints Outpoints) (map[string][]string, error) { - addrs := make(map[string][]string, 0) +func resyncIndex( + blocks Blocks, + outpoints Outpoints, + index Indexer, +) error { + best, err := blocks.GetBestBlockHash() + if err != nil { + return err + } + last, err := index.GetLastBlockHash() + if err != nil { + last = "" + } - for _, tx := range b.Txs { - ta, err := b.GetTxAddresses(outpoints, tx) + // If the local block is missing, we're indexing from the genesis block. + if last == "" { + log.Printf("resync: genesis") + + hash, err := blocks.GetBlockHash(0) if err != nil { - return nil, err + return err } - for _, addr := range ta { - addrs[addr] = append(addrs[addr], tx.Txid) + return connectBlock(blocks, outpoints, index, hash) + } + + // If the locally indexed block is the same as the best block on the + // network, we're done. + if last == best { + log.Printf("resync: synced on %s", last) + return nil + } + + // Is local tip on the best chain? + header, err := blocks.GetBlockHeader(last) + forked := false + if err != nil { + if e, ok := err.(*cmdError); ok && e.Message == "Block not found" { + forked = true + } else { + return err + } + } else { + if header.Confirmations < 0 { + forked = true } } - return addrs, nil + if forked { + log.Printf("resync: local is forked") + // TODO: resync after disconnecting + return disconnectBlock(blocks, outpoints, index, header.Hash) + } else { + log.Printf("resync: local is behind") + return connectBlock(blocks, outpoints, index, header.Next) + } } -func (b *Block) GetTxAddresses(outpoints Outpoints, tx *Tx) ([]string, error) { - seen := make(map[string]struct{}) // Only unique values. - - // Process outputs. - for _, o := range tx.Vout { - for _, a := range o.ScriptPubKey.Addresses { - seen[a] = struct{}{} - } - } - - // Process inputs. For each input, we need to take a look to the - // outpoint index. - for _, i := range tx.Vin { - if i.Coinbase != "" { - continue - } - - // Lookup output in in the outpoint index. In case it's not - // found, take a look in this block. - va, err := outpoints.GetAddresses(i.Txid, i.Vout) - if err == ErrTxNotFound { - va, err = b.GetAddresses(i.Txid, i.Vout) - } +func connectBlock( + blocks Blocks, + outpoints Outpoints, + index Indexer, + hash string, +) error { + for hash != "" { + block, err := blocks.GetBlock(hash) if err != nil { - return nil, err + return err + } + addrs, err := block.GetAllAddresses(outpoints) + if err != nil { + return err + } + if err := index.ConnectBlock(block, addrs); err != nil { + return err } - for _, a := range va { - seen[a] = struct{}{} - } + hash = block.Next } - // Convert the result set into a slice. - addrs := make([]string, len(seen)) - i := 0 - for a := range seen { - addrs[i] = a - i++ - } - return addrs, nil + return nil } -func (b *Block) GetAddresses(txid string, vout uint32) ([]string, error) { - // TODO: Lookup transaction in constant time. - for _, tx := range b.Txs { - if tx.Txid == txid { - return tx.Vout[vout].ScriptPubKey.Addresses, nil - } - } - return nil, ErrTxNotFound +func disconnectBlock( + blocks Blocks, + outpoints Outpoints, + index Indexer, + hash string, +) error { + return nil } -func indexBlocks( +func connectBlockRange( blocks Blocks, outpoints Outpoints, index Indexer, @@ -180,7 +298,7 @@ func indexBlocks( ) error { bch := make(chan blockResult, 3) - go getBlocks(lower, higher, blocks, bch) + go getBlockRange(lower, higher, blocks, bch) for res := range bch { if res.err != nil { @@ -202,7 +320,12 @@ type blockResult struct { err error } -func getBlocks(lower uint32, higher uint32, blocks Blocks, results chan<- blockResult) { +func getBlockRange( + lower uint32, + higher uint32, + blocks Blocks, + results chan<- blockResult, +) { defer close(results) height := lower diff --git a/rocksdb.go b/rocksdb.go index 75d9c385..f1c0939e 100644 --- a/rocksdb.go +++ b/rocksdb.go @@ -55,18 +55,18 @@ func (d *RocksDB) Close() error { return nil } -func (d *RocksDB) GetAddresses(txid string, vout uint32) ([]string, error) { +func (d *RocksDB) GetAddress(txid string, vout uint32) (string, error) { log.Printf("rocksdb: outpoint get %s:%d", txid, vout) k, err := packOutpointKey(txid, vout) if err != nil { - return nil, err + return "", err } v, err := d.db.Get(d.ro, k) if err != nil { - return nil, err + return "", err } defer v.Free() - return unpackOutpointValue(v.Data()) + return unpackAddress(v.Data()) } func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn func(txids []string) error) (err error) { @@ -184,11 +184,11 @@ func packAddressVal(txids []string) (b []byte, err error) { return } -const transactionIDLen = 32 +const txidLen = 32 func unpackAddressVal(b []byte) (txids []string, err error) { - for i := 0; i < len(b); i += transactionIDLen { - t, err := unpackTxid(b[i : i+transactionIDLen]) + for i := 0; i < len(b); i += txidLen { + t, err := unpackTxid(b[i : i+txidLen]) if err != nil { return nil, err } @@ -216,7 +216,7 @@ func (d *RocksDB) writeOutpoints( if err != nil { return err } - v, err := packOutpointValue(vout.ScriptPubKey.Addresses) + v, err := packAddress(vout.GetAddress()) if err != nil { return err } @@ -241,42 +241,25 @@ func packOutpointKey(txid string, vout uint32) (b []byte, err error) { return } -func packOutpointValue(addrs []string) (b []byte, err error) { - for _, addr := range addrs { - a, err := packAddress(addr) - if err != nil { - return nil, err - } - i := packVarint(uint32(len(a))) - b = append(b, i...) - b = append(b, a...) - } - return -} - -func unpackOutpointValue(b []byte) (addrs []string, err error) { - r := bytes.NewReader(b) - for r.Len() > 0 { - alen, err := vlq.ReadUint(r) - if err != nil { - return nil, err - } - abuf := make([]byte, alen) - _, err = r.Read(abuf) - if err != nil { - return nil, err - } - addr, err := unpackAddress(abuf) - if err != nil { - return nil, err - } - addrs = append(addrs, addr) - } - return -} - // Block index +const ( + lastBlockHash = 0x00 +) + +var ( + lastBlockHashKey = []byte{lastBlockHash} +) + +func (d *RocksDB) GetLastBlockHash() (string, error) { + v, err := d.db.Get(d.ro, lastBlockHashKey) + if err != nil { + return "", err + } + defer v.Free() + return unpackBlockValue(v.Data()) +} + func (d *RocksDB) writeHeight( wb *gorocksdb.WriteBatch, block *Block, @@ -288,12 +271,24 @@ func (d *RocksDB) writeHeight( log.Printf("rocksdb: height put %d %s", block.Height, block.Hash) } - bv, err := packBlockValue(block.Hash) - if err != nil { - return err - } bk := packUint(block.Height) - wb.Put(bk, bv) + + if delete { + bv, err := packBlockValue(block.Prev) + if err != nil { + return err + } + wb.Delete(bk) + wb.Put(lastBlockHashKey, bv) + + } else { + bv, err := packBlockValue(block.Hash) + if err != nil { + return err + } + wb.Put(bk, bv) + wb.Put(lastBlockHashKey, bv) + } return nil } @@ -316,33 +311,41 @@ var ( ErrInvalidAddress = errors.New("invalid address") ) -func packAddress(s string) (b []byte, err error) { +func packAddress(s string) ([]byte, error) { + var b []byte + if len(s) == 0 { + return b, nil + } b = base58.Decode(s) - if len(b) > 4 { - b = b[:len(b)-4] - } else { - err = ErrInvalidAddress + if len(b) <= 4 { + return nil, ErrInvalidAddress } - return + b = b[:len(b)-4] // Slice off the checksum + return b, nil } -func unpackAddress(b []byte) (s string, err error) { - if len(b) > 1 { - s = base58.CheckEncode(b[1:], b[0]) - } else { - err = ErrInvalidAddress +func unpackAddress(b []byte) (string, error) { + if len(b) == 0 { + return "", nil } - return + if len(b) == 1 { + return "", ErrInvalidAddress + } + return base58.CheckEncode(b[1:], b[0]), nil } -func packTxid(s string) (b []byte, err error) { +func packTxid(s string) ([]byte, error) { return hex.DecodeString(s) } -func unpackTxid(b []byte) (s string, err error) { +func unpackTxid(b []byte) (string, error) { return hex.EncodeToString(b), nil } func packBlockValue(hash string) ([]byte, error) { return hex.DecodeString(hash) } + +func unpackBlockValue(b []byte) (string, error) { + return hex.EncodeToString(b), nil +}