Implement DisconnectBlocks in index v2 - WIP
parent
ce485099a7
commit
ed027a68c9
|
@ -248,15 +248,23 @@ type outpoint struct {
|
|||
vout int32
|
||||
}
|
||||
|
||||
func packBlockAddress(addrID []byte) []byte {
|
||||
func (d *RocksDB) packBlockAddress(addrID []byte, removedUnspentTxs map[string][]outpoint) []byte {
|
||||
vBuf := make([]byte, vlq.MaxLen32)
|
||||
vl := packVarint(int32(len(addrID)), vBuf)
|
||||
blockAddress := append([]byte(nil), vBuf[:vl]...)
|
||||
blockAddress = append(blockAddress, addrID...)
|
||||
if removedUnspentTxs == nil {
|
||||
} else {
|
||||
addrUnspentTxs := removedUnspentTxs[string(addrID)]
|
||||
vl = packVarint(int32(len(addrUnspentTxs)), vBuf)
|
||||
blockAddress = append(blockAddress, vBuf[:vl]...)
|
||||
buf := d.packOutpoints(addrUnspentTxs)
|
||||
blockAddress = append(blockAddress, buf...)
|
||||
}
|
||||
return blockAddress
|
||||
}
|
||||
|
||||
func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, records map[string][]outpoint) error {
|
||||
func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Block, op int, records map[string][]outpoint, removedUnspentTxs map[string][]outpoint) error {
|
||||
keep := d.chainParser.KeepBlockAddresses()
|
||||
blockAddresses := make([]byte, 0)
|
||||
for addrID, outpoints := range records {
|
||||
|
@ -268,16 +276,12 @@ func (d *RocksDB) writeAddressRecords(wb *gorocksdb.WriteBatch, block *bchain.Bl
|
|||
}
|
||||
switch op {
|
||||
case opInsert:
|
||||
val, err := d.packOutpoints(outpoints)
|
||||
if err != nil {
|
||||
glog.Warningf("rocksdb: packOutputValue: %v", err)
|
||||
continue
|
||||
}
|
||||
val := d.packOutpoints(outpoints)
|
||||
wb.PutCF(d.cfh[cfAddresses], key, val)
|
||||
if keep > 0 {
|
||||
// collect all addresses be stored in blockaddresses
|
||||
// they are used in disconnect blocks
|
||||
blockAddress := packBlockAddress(baddrID)
|
||||
blockAddress := d.packBlockAddress(baddrID, removedUnspentTxs)
|
||||
blockAddresses = append(blockAddresses, blockAddress...)
|
||||
}
|
||||
case opDelete:
|
||||
|
@ -380,6 +384,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
|
|||
}
|
||||
addresses := make(map[string][]outpoint)
|
||||
unspentTxs := make(map[string][]byte)
|
||||
removedUnspentTxs := make(map[string][]outpoint)
|
||||
btxIDs := make([][]byte, len(block.Txs))
|
||||
// first process all outputs, build mapping of addresses to outpoints and mappings of unspent txs to addresses
|
||||
for txi, tx := range block.Txs {
|
||||
|
@ -438,6 +443,9 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
|
|||
glog.Warningf("rocksdb: height %d, tx %v vin %v in inputs but missing in unspentTxs", block.Height, tx.Txid, i)
|
||||
continue
|
||||
}
|
||||
rut := removedUnspentTxs[string(addrID)]
|
||||
rut = append(rut, outpoint{btxID, int32(input.Vout)})
|
||||
removedUnspentTxs[string(addrID)] = rut
|
||||
err = d.addAddrIDToRecords(op, wb, addresses, addrID, spendingTxid, int32(^i), block.Height)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -445,7 +453,7 @@ func (d *RocksDB) writeAddressesUTXO(wb *gorocksdb.WriteBatch, block *bchain.Blo
|
|||
unspentTxs[stxID] = unspentAddrs
|
||||
}
|
||||
}
|
||||
if err := d.writeAddressRecords(wb, block, op, addresses); err != nil {
|
||||
if err := d.writeAddressRecords(wb, block, op, addresses, removedUnspentTxs); err != nil {
|
||||
return err
|
||||
}
|
||||
// save unspent txs from current block
|
||||
|
@ -495,27 +503,34 @@ func (d *RocksDB) writeAddressesNonUTXO(wb *gorocksdb.WriteBatch, block *bchain.
|
|||
}
|
||||
}
|
||||
}
|
||||
return d.writeAddressRecords(wb, block, op, addresses)
|
||||
return d.writeAddressRecords(wb, block, op, addresses, nil)
|
||||
}
|
||||
|
||||
func unpackBlockAddresses(buf []byte) ([][]byte, error) {
|
||||
func (d *RocksDB) unpackBlockAddresses(buf []byte) ([][]byte, [][]outpoint, error) {
|
||||
addresses := make([][]byte, 0)
|
||||
outpointsArray := make([][]outpoint, 0)
|
||||
// the addresses are packed as lenaddrID addrID vout, where lenaddrID and vout are varints
|
||||
for i := 0; i < len(buf); {
|
||||
l, lv := unpackVarint(buf[i:])
|
||||
j := i + int(l) + lv
|
||||
if j > len(buf) {
|
||||
glog.Error("rocksdb: Inconsistent data in blockAddresses ", hex.EncodeToString(buf))
|
||||
return nil, errors.New("Inconsistent data in blockAddresses")
|
||||
return nil, nil, errors.New("Inconsistent data in blockAddresses")
|
||||
}
|
||||
addrID := append([]byte(nil), buf[i+lv:j]...)
|
||||
outpoints, ol, err := d.unpackNOutpoints(buf[j:])
|
||||
if err != nil {
|
||||
glog.Error("rocksdb: Inconsistent data in blockAddresses ", hex.EncodeToString(buf))
|
||||
return nil, nil, errors.New("Inconsistent data in blockAddresses")
|
||||
}
|
||||
addresses = append(addresses, addrID)
|
||||
i = j
|
||||
outpointsArray = append(outpointsArray, outpoints)
|
||||
i = j + ol
|
||||
}
|
||||
return addresses, nil
|
||||
return addresses, outpointsArray, nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) packOutpoints(outpoints []outpoint) ([]byte, error) {
|
||||
func (d *RocksDB) packOutpoints(outpoints []outpoint) []byte {
|
||||
buf := make([]byte, 0)
|
||||
bvout := make([]byte, vlq.MaxLen32)
|
||||
for _, o := range outpoints {
|
||||
|
@ -523,25 +538,45 @@ func (d *RocksDB) packOutpoints(outpoints []outpoint) ([]byte, error) {
|
|||
buf = append(buf, []byte(o.btxID)...)
|
||||
buf = append(buf, bvout[:l]...)
|
||||
}
|
||||
return buf, nil
|
||||
return buf
|
||||
}
|
||||
|
||||
func (d *RocksDB) unpackOutpoints(buf []byte) ([]outpoint, error) {
|
||||
txidUnpackedLen := d.chainParser.PackedTxidLen()
|
||||
outpoints := make([]outpoint, 0)
|
||||
for i := 0; i < len(buf); {
|
||||
btxid := buf[i : i+txidUnpackedLen]
|
||||
btxID := buf[i : i+txidUnpackedLen]
|
||||
i += txidUnpackedLen
|
||||
vout, voutLen := unpackVarint(buf[i:])
|
||||
i += voutLen
|
||||
outpoints = append(outpoints, outpoint{
|
||||
btxID: btxid,
|
||||
btxID: btxID,
|
||||
vout: vout,
|
||||
})
|
||||
}
|
||||
return outpoints, nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) unpackNOutpoints(buf []byte) ([]outpoint, int, error) {
|
||||
txidUnpackedLen := d.chainParser.PackedTxidLen()
|
||||
n, p := unpackVarint(buf)
|
||||
outpoints := make([]outpoint, n)
|
||||
for i := int32(0); i < n; i++ {
|
||||
if p+txidUnpackedLen >= len(buf) {
|
||||
return nil, 0, errors.New("Inconsistent data in unpackNOutpoints")
|
||||
}
|
||||
btxID := buf[p : p+txidUnpackedLen]
|
||||
p += txidUnpackedLen
|
||||
vout, voutLen := unpackVarint(buf[p:])
|
||||
p += voutLen
|
||||
outpoints[i] = outpoint{
|
||||
btxID: btxID,
|
||||
vout: vout,
|
||||
}
|
||||
}
|
||||
return outpoints, p, nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) packOutpoint(txid string, vout int32) ([]byte, error) {
|
||||
btxid, err := d.chainParser.PackTxid(txid)
|
||||
if err != nil {
|
||||
|
@ -611,17 +646,17 @@ func (d *RocksDB) writeHeight(
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *RocksDB) getBlockAddresses(key []byte) ([][]byte, error) {
|
||||
func (d *RocksDB) getBlockAddresses(key []byte) ([][]byte, [][]outpoint, error) {
|
||||
b, err := d.db.GetCF(d.ro, d.cfh[cfBlockAddresses], key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
defer b.Free()
|
||||
// block is missing in DB
|
||||
if b.Data() == nil {
|
||||
return nil, errors.New("Block addresses missing")
|
||||
return nil, nil, errors.New("Block addresses missing")
|
||||
}
|
||||
return unpackBlockAddresses(b.Data())
|
||||
return d.unpackBlockAddresses(b.Data())
|
||||
}
|
||||
|
||||
func (d *RocksDB) fullAddressesScan(lower uint32, higher uint32) ([][]byte, [][]byte, error) {
|
||||
|
@ -673,21 +708,25 @@ func (d *RocksDB) fullAddressesScan(lower uint32, higher uint32) ([][]byte, [][]
|
|||
// it finds the data in blockaddresses column if available,
|
||||
// otherwise by doing quite slow full scan of addresses column
|
||||
func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error {
|
||||
|
||||
// TODO - it is still a mess
|
||||
|
||||
glog.Infof("db: disconnecting blocks %d-%d", lower, higher)
|
||||
addrKeys := [][]byte{}
|
||||
addrValues := [][]byte{}
|
||||
addrUnspentOutpoints := [][]outpoint{}
|
||||
keep := d.chainParser.KeepBlockAddresses()
|
||||
var err error
|
||||
doFullScan := true
|
||||
if keep > 0 {
|
||||
for height := lower; height <= higher; height++ {
|
||||
key := packUint(height)
|
||||
addresses, err := d.getBlockAddresses(key)
|
||||
addresses, unspentOutpoints, err := d.getBlockAddresses(key)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
goto GoFullScan
|
||||
}
|
||||
for _, addrID := range addresses {
|
||||
for i, addrID := range addresses {
|
||||
addrKey := append(addrID, key...)
|
||||
val, err := d.db.GetCF(d.ro, d.cfh[cfAddresses], addrKey)
|
||||
if err != nil {
|
||||
|
@ -695,6 +734,7 @@ func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error {
|
|||
}
|
||||
addrKeys = append(addrKeys, addrKey)
|
||||
addrValue := append([]byte(nil), val.Data()...)
|
||||
addrUnspentOutpoints = append(addrUnspentOutpoints, unspentOutpoints[i])
|
||||
val.Free()
|
||||
addrValues = append(addrValues, addrValue)
|
||||
}
|
||||
|
@ -717,10 +757,7 @@ func (d *RocksDB) DisconnectBlockRange(lower uint32, higher uint32) error {
|
|||
glog.Info("address ", hex.EncodeToString(addrKey))
|
||||
}
|
||||
wb.DeleteCF(d.cfh[cfAddresses], addrKey)
|
||||
outpoints, err := d.unpackOutpoints(addrValues[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
outpoints := addrUnspentOutpoints[i]
|
||||
addrID, height, err := unpackAddressKey(addrKey)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -16,6 +16,10 @@ import (
|
|||
"github.com/juju/errors"
|
||||
)
|
||||
|
||||
// simplified explanation of signed varint packing, used in many index data structures
|
||||
// for number n, the packing is: 2*n if n>=0 else 2*(-n)-1
|
||||
// take only 1 byte if abs(n)<127
|
||||
|
||||
func setupRocksDB(t *testing.T, p bchain.BlockChainParser) *RocksDB {
|
||||
tmp, err := ioutil.TempDir("", "testdb")
|
||||
if err != nil {
|
||||
|
@ -45,7 +49,6 @@ func addressToPubKeyHex(addr string, t *testing.T, d *RocksDB) string {
|
|||
|
||||
func addressToPubKeyHexWithLength(addr string, t *testing.T, d *RocksDB) string {
|
||||
h := addressToPubKeyHex(addr, t, d)
|
||||
// length is signed varint, therefore 2 times big, we can take len(h) as the correct value
|
||||
return strconv.FormatInt(int64(len(h)), 16) + h
|
||||
}
|
||||
|
||||
|
@ -56,14 +59,18 @@ type keyPair struct {
|
|||
CompareFunc func(string) bool
|
||||
}
|
||||
|
||||
func compareFuncBlockAddresses(v string, expected []string) bool {
|
||||
func compareFuncBlockAddresses(t *testing.T, v string, expected []string) bool {
|
||||
for _, e := range expected {
|
||||
lb := len(v)
|
||||
v = strings.Replace(v, e, "", 1)
|
||||
if lb == len(v) {
|
||||
t.Error(e, " not found in ", v)
|
||||
return false
|
||||
}
|
||||
}
|
||||
if len(v) != 0 {
|
||||
t.Error("not expected content ", v)
|
||||
}
|
||||
return len(v) == 0
|
||||
}
|
||||
|
||||
|
@ -262,11 +269,11 @@ func verifyAfterUTXOBlock1(t *testing.T, d *RocksDB, noBlockAddresses bool) {
|
|||
blockAddressesKp = []keyPair{
|
||||
keyPair{"000370d5", "",
|
||||
func(v string) bool {
|
||||
return compareFuncBlockAddresses(v, []string{
|
||||
addressToPubKeyHexWithLength("mfcWp7DB6NuaZsExybTTXpVgWz559Np4Ti", t, d),
|
||||
addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d),
|
||||
addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d),
|
||||
addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d),
|
||||
return compareFuncBlockAddresses(t, v, []string{
|
||||
addressToPubKeyHexWithLength("mfcWp7DB6NuaZsExybTTXpVgWz559Np4Ti", t, d) + "00",
|
||||
addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d) + "00",
|
||||
addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d) + "00",
|
||||
addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d) + "00",
|
||||
})
|
||||
},
|
||||
},
|
||||
|
@ -329,14 +336,14 @@ func verifyAfterUTXOBlock2(t *testing.T, d *RocksDB) {
|
|||
if err := checkColumn(d, cfBlockAddresses, []keyPair{
|
||||
keyPair{"000370d6", "",
|
||||
func(v string) bool {
|
||||
return compareFuncBlockAddresses(v, []string{
|
||||
addressToPubKeyHexWithLength("mzB8cYrfRwFRFAGTDzV8LkUQy5BQicxGhX", t, d),
|
||||
addressToPubKeyHexWithLength("mtR97eM2HPWVM6c8FGLGcukgaHHQv7THoL", t, d),
|
||||
addressToPubKeyHexWithLength("mwwoKQE5Lb1G4picHSHDQKg8jw424PF9SC", t, d),
|
||||
addressToPubKeyHexWithLength("mmJx9Y8ayz9h14yd9fgCW1bUKoEpkBAquP", t, d),
|
||||
addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d),
|
||||
addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d),
|
||||
addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d),
|
||||
return compareFuncBlockAddresses(t, v, []string{
|
||||
addressToPubKeyHexWithLength("mzB8cYrfRwFRFAGTDzV8LkUQy5BQicxGhX", t, d) + "02" + "7c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d25" + "00",
|
||||
addressToPubKeyHexWithLength("mtR97eM2HPWVM6c8FGLGcukgaHHQv7THoL", t, d) + "00",
|
||||
addressToPubKeyHexWithLength("mwwoKQE5Lb1G4picHSHDQKg8jw424PF9SC", t, d) + "00",
|
||||
addressToPubKeyHexWithLength("mmJx9Y8ayz9h14yd9fgCW1bUKoEpkBAquP", t, d) + "00",
|
||||
addressToPubKeyHexWithLength("mv9uLThosiEnGRbVPS7Vhyw6VssbVRsiAw", t, d) + "02" + "effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac75" + "00",
|
||||
addressToPubKeyHexWithLength("mtGXQvBowMkBpnhLckhxhbwYK44Gs9eEtz", t, d) + "02" + "00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa3840" + "02",
|
||||
addressToPubKeyHexWithLength("2Mz1CYoppGGsLNUGF2YDhTif6J661JitALS", t, d) + "02" + "effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac75" + "02",
|
||||
})
|
||||
},
|
||||
},
|
||||
|
@ -570,7 +577,14 @@ func Test_findAndRemoveUnspentAddr(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type hexoutpoint struct {
|
||||
txID string
|
||||
vout int32
|
||||
}
|
||||
|
||||
func Test_unpackBlockAddresses(t *testing.T) {
|
||||
d := setupRocksDB(t, &testBitcoinParser{BitcoinParser: &btc.BitcoinParser{Params: btc.GetChainParams("test")}})
|
||||
defer closeAnddestroyRocksDB(t, d)
|
||||
type args struct {
|
||||
buf string
|
||||
}
|
||||
|
@ -578,12 +592,25 @@ func Test_unpackBlockAddresses(t *testing.T) {
|
|||
name string
|
||||
args args
|
||||
want []string
|
||||
want2 [][]hexoutpoint
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "1",
|
||||
args: args{"029c10517a011588745287127093935888939356870e646351670068680e765193518800870a7b7b0115873276a9144150837fb91d9461d6b95059842ab85262c2923f88ac08636751680457870291"},
|
||||
want: []string{"9c", "517a011588745287", "709393588893935687", "64635167006868", "76519351880087", "7b7b011587", "76a9144150837fb91d9461d6b95059842ab85262c2923f88ac", "63675168", "5787", "91"},
|
||||
args: args{"029c0010517a011588745287047c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d250000b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa38400612709393588893935687000e64635167006868000e7651935188008702effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac7502"},
|
||||
want: []string{"9c", "517a011588745287", "709393588893935687", "64635167006868", "76519351880087"},
|
||||
want2: [][]hexoutpoint{
|
||||
[]hexoutpoint{},
|
||||
[]hexoutpoint{
|
||||
hexoutpoint{"7c3be24063f268aaa1ed81b64776798f56088757641a34fb156c4f51ed2e9d25", 0},
|
||||
hexoutpoint{"00b2c06055e5e90e9c82bd4181fde310104391a7fa4f289b1704e5d90caa3840", 3},
|
||||
},
|
||||
[]hexoutpoint{},
|
||||
[]hexoutpoint{},
|
||||
[]hexoutpoint{
|
||||
hexoutpoint{"effd9ef509383d536b1c8af5bf434c8efbf521a4f2befd4022bbd68694b4ac75", 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
|
@ -592,7 +619,7 @@ func Test_unpackBlockAddresses(t *testing.T) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
got, err := unpackBlockAddresses(b)
|
||||
got, got2, err := d.unpackBlockAddresses(b)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("unpackBlockAddresses() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
|
@ -602,7 +629,18 @@ func Test_unpackBlockAddresses(t *testing.T) {
|
|||
h[i] = hex.EncodeToString(g)
|
||||
}
|
||||
if !reflect.DeepEqual(h, tt.want) {
|
||||
t.Errorf("unpackBlockAddresses() = %v, want %v", got, tt.want)
|
||||
t.Errorf("unpackBlockAddresses() = %v, want %v", h, tt.want)
|
||||
}
|
||||
h2 := make([][]hexoutpoint, len(got2))
|
||||
for i, g := range got2 {
|
||||
ho := make([]hexoutpoint, len(g))
|
||||
for j, o := range g {
|
||||
ho[j] = hexoutpoint{hex.EncodeToString(o.btxID), o.vout}
|
||||
}
|
||||
h2[i] = ho
|
||||
}
|
||||
if !reflect.DeepEqual(h2, tt.want2) {
|
||||
t.Errorf("unpackBlockAddresses() = %v, want %v", h2, tt.want2)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue