From 7907bfeac7e262d43a24c1b1279a9c21a049f47b Mon Sep 17 00:00:00 2001 From: Jan Pochyla Date: Thu, 5 Oct 2017 14:35:07 +0200 Subject: [PATCH] rework index type --- bitcoinrpc.go | 9 -- blockbook.go | 299 ++++++++++++++++----------------------- rocksdb.go | 384 ++++++++++++++++++++++++++------------------------ types.go | 7 + 4 files changed, 323 insertions(+), 376 deletions(-) diff --git a/bitcoinrpc.go b/bitcoinrpc.go index 46ab0cf2..c7a018fe 100644 --- a/bitcoinrpc.go +++ b/bitcoinrpc.go @@ -293,15 +293,6 @@ func (b *BitcoinRPC) GetTransaction(txid string) (*Tx, error) { return &res.Result, nil } -// GetAddress returns address of transaction output. -func (b *BitcoinRPC) GetAddress(txid string, vout uint32) (string, error) { - tx, err := b.GetTransaction(txid) - if err != nil { - return "", err - } - return tx.GetAddress(vout), nil -} - func (b *BitcoinRPC) call(req interface{}, res interface{}) error { httpData, err := json.Marshal(req) if err != nil { diff --git a/blockbook.go b/blockbook.go index f29d02ac..3eae377e 100644 --- a/blockbook.go +++ b/blockbook.go @@ -3,6 +3,7 @@ package main import ( "flag" "log" + "sync" "time" "github.com/pkg/profile" @@ -12,105 +13,19 @@ type BlockParser interface { ParseBlock(b []byte) (*Block, error) } -type Blocks interface { +type Blockchain interface { GetBestBlockHash() (string, error) GetBlockHash(height uint32) (string, error) GetBlockHeader(hash string) (*BlockHeader, error) GetBlock(hash string) (*Block, error) } -type Outpoints interface { - // GetAddress looks up a transaction output and returns its address. - // Address can be empty string in case it's not found or not - // intelligable. - GetAddress(txid string, vout uint32) (string, error) -} - -type Addresses interface { - GetTransactions(address string, lower uint32, higher uint32, fn func(txids []string) error) error -} - -type Indexer interface { - ConnectBlock(block *Block, txids map[string][]string) error - DisconnectBlock(block *Block, txids map[string][]string) error - GetLastBlockHash() (string, error) -} - -func (b *Block) GetAllAddresses(outpoints Outpoints) (map[string][]string, error) { - addrs := make(map[string][]string, 0) // Address to a list of txids. - - for i, _ := range b.Txs { - tx := &b.Txs[i] - ta, err := b.GetTxAddresses(outpoints, tx) - if err != nil { - return nil, err - } - for a, _ := range ta { - addrs[a] = append(addrs[a], tx.Txid) - } - } - - return addrs, nil -} - -func (b *Block) GetTxAddresses(outpoints Outpoints, tx *Tx) (map[string]struct{}, error) { - addrs := make(map[string]struct{}) // Only unique values. - - // Process outputs. - for _, o := range tx.Vout { - a := o.GetAddress() - if a != "" { - addrs[a] = struct{}{} - } - } - - // Process inputs. For each input, we need to take a look to the - // outpoint index. - for _, i := range tx.Vin { - if i.Coinbase != "" { - continue - } - - // Lookup output in in the outpoint index. In case it's not - // found, take a look in this block. - a, err := outpoints.GetAddress(i.Txid, i.Vout) - if err != nil { - return nil, err - } - if a == "" { - a = b.GetAddress(i.Txid, i.Vout) - } - if a != "" { - addrs[a] = struct{}{} - } else { - log.Printf("warn: output not found: %s:%d", i.Txid, i.Vout) - } - } - - return addrs, nil -} - -func (b *Block) GetAddress(txid string, vout uint32) string { - for i, _ := range b.Txs { - if b.Txs[i].Txid == txid { - return b.Txs[i].GetAddress(vout) - } - } - return "" // tx not found -} - -func (t *Tx) GetAddress(vout uint32) string { - if vout < uint32(len(t.Vout)) { - return t.Vout[vout].GetAddress() - } - return "" // output not found -} - -func (o *Vout) GetAddress() string { - if len(o.ScriptPubKey.Addresses) == 1 { - return o.ScriptPubKey.Addresses[0] - } - return "" // output address not intelligible +type Index interface { + GetBestBlockHash() (string, error) + GetBlockHash(height uint32) (string, error) + GetTransactions(address string, lower uint32, higher uint32, fn func(txid string) error) error + ConnectBlock(block *Block) error + DisconnectBlock(block *Block) error } var ( @@ -121,8 +36,9 @@ var ( dbPath = flag.String("path", "./data", "path to address index directory") - blockHeight = flag.Int("blockheight", -1, "height of the starting block") - blockUntil = flag.Int("blockuntil", -1, "height of the final block") + blockHeight = flag.Int("blockheight", -1, "height of the starting block") + blockUntil = flag.Int("blockuntil", -1, "height of the final block") + queryAddress = flag.String("address", "", "query contents of this address") resync = flag.Bool("resync", false, "resync until tip") @@ -133,10 +49,6 @@ var ( func main() { flag.Parse() - if *prof { - defer profile.Start().Stop() - } - if *repair { if err := RepairRocksDB(*dbPath); err != nil { log.Fatal(err) @@ -144,8 +56,15 @@ func main() { return } - timeout := time.Duration(*rpcTimeout) * time.Second - rpc := NewBitcoinRPC(*rpcURL, *rpcUser, *rpcPass, timeout) + if *prof { + defer profile.Start().Stop() + } + + rpc := NewBitcoinRPC( + *rpcURL, + *rpcUser, + *rpcPass, + time.Duration(*rpcTimeout)*time.Second) db, err := NewRocksDB(*dbPath) if err != nil { @@ -154,7 +73,7 @@ func main() { defer db.Close() if *resync { - if err := resyncIndex(rpc, db, db); err != nil { + if err := resyncIndex(rpc, db); err != nil { log.Fatal(err) } } @@ -172,54 +91,48 @@ func main() { log.Fatal(err) } } else { - if err = connectBlockRange(rpc, db, db, height, until); err != nil { + if err = connectBlocksParallel(rpc, db, height, until); err != nil { log.Fatal(err) } } } } -func printResult(txids []string) error { - for i, txid := range txids { - log.Printf("%d: %s", i, txid) - } +func printResult(txid string) error { + log.Printf("%s", txid) return nil } -func resyncIndex( - blocks Blocks, - outpoints Outpoints, - index Indexer, -) error { - best, err := blocks.GetBestBlockHash() +func resyncIndex(chain Blockchain, index Index) error { + remote, err := chain.GetBestBlockHash() if err != nil { return err } - last, err := index.GetLastBlockHash() + local, err := index.GetBestBlockHash() if err != nil { - last = "" + local = "" } // If the local block is missing, we're indexing from the genesis block. - if last == "" { + if local == "" { log.Printf("resync: genesis") - hash, err := blocks.GetBlockHash(0) + hash, err := chain.GetBlockHash(0) if err != nil { return err } - return connectBlock(blocks, outpoints, index, hash) + return connectBlock(chain, index, hash) } // If the locally indexed block is the same as the best block on the // network, we're done. - if last == best { - log.Printf("resync: synced on %s", last) + if local == remote { + log.Printf("resync: synced on %s", local) return nil } // Is local tip on the best chain? - header, err := blocks.GetBlockHeader(last) + header, err := chain.GetBlockHeader(local) forked := false if err != nil { if e, ok := err.(*RPCError); ok && e.Message == "Block not found" { @@ -236,114 +149,142 @@ func resyncIndex( if forked { log.Printf("resync: local is forked") // TODO: resync after disconnecting - return disconnectBlock(blocks, outpoints, index, header.Hash) + return disconnectBlock(chain, index, header.Hash) } else { log.Printf("resync: local is behind") - return connectBlock(blocks, outpoints, index, header.Next) + return connectBlock(chain, index, header.Next) } } func connectBlock( - blocks Blocks, - outpoints Outpoints, - index Indexer, + chain Blockchain, + index Index, hash string, ) error { bch := make(chan blockResult, 8) done := make(chan struct{}) defer close(done) - go getBlockChain(hash, blocks, bch, done) + go getBlockChain(hash, chain, bch, done) for res := range bch { - err := res.err - block := res.block - - if err != nil { - return err + if res.err != nil { + return res.err } - addrs, err := block.GetAllAddresses(outpoints) + err := index.ConnectBlock(res.block) if err != nil { return err } - if err := index.ConnectBlock(block, addrs); err != nil { - return err - } } return nil } func disconnectBlock( - blocks Blocks, - outpoints Outpoints, - index Indexer, + chain Blockchain, + index Index, hash string, ) error { return nil } -func connectBlockRange( - blocks Blocks, - outpoints Outpoints, - index Indexer, +func connectBlocksParallel( + chain Blockchain, + index Index, lower uint32, higher uint32, ) error { - bch := make(chan blockResult, 3) + const chunkSize = 100 + const numWorkers = 8 - go getBlockRange(lower, higher, blocks, bch) + var wg sync.WaitGroup - for res := range bch { - if res.err != nil { - return res.err + work := func(i int) { + defer wg.Done() + + offset := uint32(chunkSize * i) + stride := uint32(chunkSize * numWorkers) + + for low := offset; low <= higher; low += stride { + high := low + chunkSize - 1 + if high > higher { + high = higher + } + err := connectBlockChunk(chain, index, low, high) + if err != nil { + log.Fatal(err) // TODO + } } - addrs, err := res.block.GetAllAddresses(outpoints) + } + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go work(i) + } + wg.Wait() + + return nil +} + +func connectBlockChunk( + chain Blockchain, + index Index, + lower uint32, + higher uint32, +) error { + connected, err := isBlockConnected(chain, index, higher) + if err != nil || connected { + return err + } + + height := lower + hash, err := chain.GetBlockHash(lower) + if err != nil { + return err + } + + for height <= higher { + block, err := chain.GetBlock(hash) if err != nil { return err } - if err := index.ConnectBlock(res.block, addrs); err != nil { + hash = block.Next + height = block.Height + 1 + err = index.ConnectBlock(block) + if err != nil { return err } } + return nil } +func isBlockConnected( + chain Blockchain, + index Index, + height uint32, +) (bool, error) { + local, err := index.GetBlockHash(height) + if err != nil { + return false, err + } + remote, err := chain.GetBlockHash(height) + if err != nil { + return false, err + } + if local != remote { + return false, nil + } + return true, nil +} + type blockResult struct { block *Block err error } -func getBlockRange( - lower uint32, - higher uint32, - blocks Blocks, - results chan<- blockResult, -) { - defer close(results) - - height := lower - hash, err := blocks.GetBlockHash(height) - if err != nil { - results <- blockResult{err: err} - return - } - - for height <= higher { - block, err := blocks.GetBlock(hash) - if err != nil { - results <- blockResult{err: err} - return - } - hash = block.Next - height = block.Height + 1 - results <- blockResult{block: block} - } -} - func getBlockChain( hash string, - blocks Blocks, + chain Blockchain, out chan blockResult, done chan struct{}, ) { @@ -355,7 +296,7 @@ func getBlockChain( return default: } - block, err := blocks.GetBlock(hash) + block, err := chain.GetBlock(hash) if err != nil { out <- blockResult{err: err} return diff --git a/rocksdb.go b/rocksdb.go index 08f00617..dedaf5e1 100644 --- a/rocksdb.go +++ b/rocksdb.go @@ -66,29 +66,14 @@ func (d *RocksDB) Close() error { return nil } -func (d *RocksDB) GetAddress(txid string, vout uint32) (string, error) { - // log.Printf("rocksdb: outpoint get %s:%d", txid, vout) - - k, err := packOutpointKey(txid, vout) - if err != nil { - return "", err - } - v, err := d.db.Get(d.ro, k) - if err != nil { - return "", err - } - defer v.Free() - return unpackAddress(v.Data()) -} - -func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn func(txids []string) error) (err error) { +func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, fn func(txid string) error) (err error) { log.Printf("rocksdb: address get %d:%d %s", lower, higher, address) - kstart, err := packAddressKey(lower, address) + kstart, err := packOutputKey(address, lower) if err != nil { return err } - kstop, err := packAddressKey(higher, address) + kstop, err := packOutputKey(address, higher) if err != nil { return err } @@ -97,211 +82,237 @@ func (d *RocksDB) GetTransactions(address string, lower uint32, higher uint32, f defer it.Close() for it.Seek(kstart); it.Valid(); it.Next() { - k := it.Key() - v := it.Value() - if bytes.Compare(k.Data(), kstop) > 0 { + key := it.Key() + val := it.Value() + if bytes.Compare(key.Data(), kstop) > 0 { break } - txids, err := unpackAddressVal(v.Data()) + outpoints, err := unpackOutputValue(val.Data()) if err != nil { return err } - if err := fn(txids); err != nil { - return err + for _, o := range outpoints { + if err := fn(o.txid); err != nil { + return err + } } } return nil } -func (d *RocksDB) ConnectBlock(block *Block, txids map[string][]string) error { - return d.writeBlock(block, txids, false /* delete */) +const ( + opInsert = 0 + opDelete = 1 +) + +func (d *RocksDB) ConnectBlock(block *Block) error { + return d.writeBlock(block, opInsert) } -func (d *RocksDB) DisconnectBlock(block *Block, txids map[string][]string) error { - return d.writeBlock(block, txids, true /* delete */) +func (d *RocksDB) DisconnectBlock(block *Block) error { + return d.writeBlock(block, opDelete) } -func (d *RocksDB) writeBlock( - block *Block, - txids map[string][]string, - delete bool, -) error { +func (d *RocksDB) writeBlock(block *Block, op int) error { wb := gorocksdb.NewWriteBatch() defer wb.Destroy() - if err := d.writeHeight(wb, block, delete); err != nil { + if err := d.writeHeight(wb, block, op); err != nil { return err } - if err := d.writeOutpoints(wb, block, delete); err != nil { + if err := d.writeOutputs(wb, block, op); err != nil { return err } - if err := d.writeAddresses(wb, block, txids, delete); err != nil { + if err := d.writeInputs(wb, block, op); err != nil { return err } return d.db.Write(d.wo, wb) } -// Address Index +// Output Index -func (d *RocksDB) writeAddresses( +type outpoint struct { + txid string + vout uint32 +} + +func (d *RocksDB) writeOutputs( wb *gorocksdb.WriteBatch, block *Block, - txids map[string][]string, - delete bool, + op int, ) error { - if delete { - log.Printf("rocksdb: address delete %d in %d %s", len(txids), block.Height, block.Hash) - } else { - log.Printf("rocksdb: address put %d in %d %s", len(txids), block.Height, block.Hash) + switch op { + case opInsert: + log.Printf("rocksdb: outputs insert %d %s", block.Height, block.Hash) + case opDelete: + log.Printf("rocksdb: outputs delete %d %s", block.Height, block.Hash) } - for addr, txids := range txids { - k, err := packAddressKey(block.Height, addr) - if err != nil { - return err - } - v, err := packAddressVal(txids) - if err != nil { - return err - } - if delete { - wb.Delete(k) - } else { - wb.Put(k, v) - } - - } - return nil -} - -func packAddressKey(height uint32, address string) (b []byte, err error) { - b, err = packAddress(address) - if err != nil { - return - } - h := packUint(height) - b = append(b, h...) - return -} - -func packAddressVal(txids []string) (b []byte, err error) { - for _, txid := range txids { - t, err := packTxid(txid) - if err != nil { - return nil, err - } - b = append(b, t...) - } - return -} - -const txidLen = 32 - -func unpackAddressVal(b []byte) (txids []string, err error) { - for i := 0; i < len(b); i += txidLen { - t, err := unpackTxid(b[i : i+txidLen]) - if err != nil { - return nil, err - } - txids = append(txids, t) - } - return -} - -// Outpoint index - -func (d *RocksDB) writeOutpoints( - wb *gorocksdb.WriteBatch, - block *Block, - delete bool, -) error { - if delete { - log.Printf("rocksdb: outpoints delete %d in %d %s", len(block.Txs), block.Height, block.Hash) - } else { - log.Printf("rocksdb: outpoints put %d in %d %s", len(block.Txs), block.Height, block.Hash) - } + records := make(map[string][]outpoint) for _, tx := range block.Txs { - for _, vout := range tx.Vout { - k, err := packOutpointKey(tx.Txid, vout.N) - if err != nil { - return err - } - v, err := packAddress(vout.GetAddress()) - if err != nil { - return err - } - if delete { - wb.Delete(k) + for _, output := range tx.Vout { + address := output.GetAddress() + if address != "" { + records[address] = append(records[address], outpoint{ + txid: tx.Txid, + vout: output.N, + }) } else { - if len(v) > 0 { - wb.Put(k, v) - } + log.Printf("rocksdb: skipping %s:%d", tx.Txid, output.N) } } } + + for address, outpoints := range records { + key, err := packOutputKey(address, block.Height) + if err != nil { + return err + } + val, err := packOutputValue(outpoints) + + switch op { + case opInsert: + wb.Put(key, val) + case opDelete: + wb.Delete(key) + } + } + return nil } -func packOutpointKey(txid string, vout uint32) (b []byte, err error) { - t, err := packTxid(txid) +func packOutputKey(address string, height uint32) ([]byte, error) { + baddress, err := packAddress(address) if err != nil { return nil, err } - v := packVarint(vout) - b = append(b, t...) - b = append(b, v...) - return + bheight := packUint(height) + buf := make([]byte, 0, len(baddress)+len(bheight)) + buf = append(buf, baddress...) + buf = append(buf, bheight...) + return buf, nil +} + +func packOutputValue(outpoints []outpoint) ([]byte, error) { + buf := make([]byte, 0) + for _, o := range outpoints { + btxid, err := packTxid(o.txid) + if err != nil { + return nil, err + } + bvout := packVaruint(o.vout) + buf = append(buf, btxid...) + buf = append(buf, bvout...) + } + return buf, nil +} + +func unpackOutputValue(buf []byte) ([]outpoint, error) { + outpoints := make([]outpoint, 0) + for i := 0; i < len(buf); { + txid, err := unpackTxid(buf[i : i+txIdUnpackedLen]) + if err != nil { + return nil, err + } + i += txIdUnpackedLen + vout, voutLen := unpackVaruint(buf[i:]) + i += voutLen + outpoints = append(outpoints, outpoint{ + txid: txid, + vout: vout, + }) + } + return outpoints, nil +} + +// Input index + +func (d *RocksDB) writeInputs( + wb *gorocksdb.WriteBatch, + block *Block, + op int, +) error { + switch op { + case opInsert: + log.Printf("rocksdb: inputs insert %d %s", block.Height, block.Hash) + case opDelete: + log.Printf("rocksdb: inputs delete %d %s", block.Height, block.Hash) + } + + for _, tx := range block.Txs { + for i, input := range tx.Vin { + key, err := packOutpoint(input.Txid, input.Vout) + if err != nil { + return err + } + val, err := packOutpoint(tx.Txid, uint32(i)) + if err != nil { + return err + } + switch op { + case opInsert: + wb.Put(key, val) + case opDelete: + wb.Delete(key) + } + } + } + return nil +} + +func packOutpoint(txid string, vout uint32) ([]byte, error) { + btxid, err := packTxid(txid) + if err != nil { + return nil, err + } + bvout := packVaruint(vout) + buf := make([]byte, 0, len(btxid)+len(bvout)) + buf = append(buf, btxid...) + buf = append(buf, bvout...) + return buf, nil } // Block index -const ( - lastBlockHash = 0x00 -) +func (d *RocksDB) GetBestBlockHash() (string, error) { + return "", nil // TODO +} -var ( - lastBlockHashKey = []byte{lastBlockHash} -) - -func (d *RocksDB) GetLastBlockHash() (string, error) { - v, err := d.db.Get(d.ro, lastBlockHashKey) +func (d *RocksDB) GetBlockHash(height uint32) (string, error) { + key := packUint(height) + val, err := d.db.Get(d.ro, key) if err != nil { return "", err } - defer v.Free() - return unpackBlockValue(v.Data()) + defer val.Free() + return unpackBlockValue(val.Data()) } func (d *RocksDB) writeHeight( wb *gorocksdb.WriteBatch, block *Block, - delete bool, + op int, ) error { - if delete { - log.Printf("rocksdb: height delete %d %s", block.Height, block.Hash) - } else { + switch op { + case opInsert: log.Printf("rocksdb: height put %d %s", block.Height, block.Hash) + case opDelete: + log.Printf("rocksdb: height delete %d %s", block.Height, block.Hash) } - bk := packUint(block.Height) + key := packUint(block.Height) - if delete { - bv, err := packBlockValue(block.Prev) + switch op { + case opInsert: + val, err := packBlockValue(block.Hash) if err != nil { return err } - wb.Delete(bk) - wb.Put(lastBlockHashKey, bv) - - } else { - bv, err := packBlockValue(block.Hash) - if err != nil { - return err - } - wb.Put(bk, bv) - wb.Put(lastBlockHashKey, bv) + wb.Put(key, val) + case opDelete: + wb.Delete(key) } return nil @@ -309,57 +320,54 @@ func (d *RocksDB) writeHeight( // Helpers +const txIdUnpackedLen = 32 + +var ErrInvalidAddress = errors.New("invalid address") + func packUint(i uint32) []byte { - b := make([]byte, 4) - binary.BigEndian.PutUint32(b, i) - return b + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, i) + return buf } -func packVarint(i uint32) []byte { - b := make([]byte, vlq.MaxLen32) - n := vlq.PutUint(b, uint64(i)) - return b[:n] +func packVaruint(i uint32) []byte { + buf := make([]byte, vlq.MaxLen32) + ofs := vlq.PutUint(buf, uint64(i)) + return buf[:ofs] } -var ( - ErrInvalidAddress = errors.New("invalid address") -) +func unpackVaruint(buf []byte) (uint32, int) { + i, ofs := vlq.Uint(buf) + return uint32(i), ofs +} -func packAddress(s string) ([]byte, error) { - var b []byte - if len(s) == 0 { - return b, nil - } - b = base58.Decode(s) - if len(b) <= 4 { +func packAddress(address string) ([]byte, error) { + buf := base58.Decode(address) + if len(buf) <= 4 { return nil, ErrInvalidAddress } - b = b[:len(b)-4] // Slice off the checksum - return b, nil + return buf[:len(buf)-4], nil // Slice off the checksum } -func unpackAddress(b []byte) (string, error) { - if len(b) == 0 { - return "", nil - } - if len(b) == 1 { +func unpackAddress(buf []byte) (string, error) { + if len(buf) < 2 { return "", ErrInvalidAddress } - return base58.CheckEncode(b[1:], b[0]), nil + return base58.CheckEncode(buf[1:], buf[0]), nil } -func packTxid(s string) ([]byte, error) { - return hex.DecodeString(s) +func packTxid(txid string) ([]byte, error) { + return hex.DecodeString(txid) } -func unpackTxid(b []byte) (string, error) { - return hex.EncodeToString(b), nil +func unpackTxid(buf []byte) (string, error) { + return hex.EncodeToString(buf), nil } func packBlockValue(hash string) ([]byte, error) { return hex.DecodeString(hash) } -func unpackBlockValue(b []byte) (string, error) { - return hex.EncodeToString(b), nil +func unpackBlockValue(buf []byte) (string, error) { + return hex.EncodeToString(buf), nil } diff --git a/types.go b/types.go index e051ff2f..96cb3c45 100644 --- a/types.go +++ b/types.go @@ -26,6 +26,13 @@ type Vout struct { ScriptPubKey ScriptPubKey `json:"scriptPubKey"` } +func (vout *Vout) GetAddress() string { + if len(vout.ScriptPubKey.Addresses) != 1 { + return "" // output address not intelligible + } + return vout.ScriptPubKey.Addresses[0] +} + type Tx struct { Txid string `json:"txid"` Version int32 `json:"version"`