Implement TxCache stored in RocksDB

indexv1
Martin Boehm 2018-03-06 12:36:24 +01:00
parent 0cfc74a48d
commit d1c4c66c5f
5 changed files with 157 additions and 36 deletions

View File

@ -73,6 +73,7 @@ var (
chain *bchain.BitcoinRPC
mempool *bchain.Mempool
index *db.RocksDB
txCache *db.TxCache
syncWorker *db.SyncWorker
callbacksOnNewBlockHash []func(hash string)
callbacksOnNewTxAddr []func(txid string, addr string)
@ -152,6 +153,11 @@ func main() {
}
}
if txCache, err = db.NewTxCache(index, chain); err != nil {
glog.Error("txCache ", err)
return
}
var httpServer *server.HTTPServer
if *httpServerBinding != "" {
httpServer, err = server.NewHTTPServer(*httpServerBinding, *certFiles, index, mempool)
@ -174,7 +180,7 @@ func main() {
var socketIoServer *server.SocketIoServer
if *socketIoBinding != "" {
socketIoServer, err = server.NewSocketIoServer(*socketIoBinding, *certFiles, index, mempool, chain, *explorerURL)
socketIoServer, err = server.NewSocketIoServer(*socketIoBinding, *certFiles, index, mempool, chain, txCache, *explorerURL)
if err != nil {
glog.Error("socketio: ", err)
return

View File

@ -539,21 +539,27 @@ func (d *RocksDB) GetTx(txid string) (*bchain.Tx, uint32, error) {
return nil, 0, err
}
defer val.Free()
return unpackTx(val.Data())
data := val.Data()
if len(data) > 4 {
return unpackTx(data)
}
return nil, 0, nil
}
func (d *RocksDB) PutTx(tx *bchain.Tx, height uint32) error {
// PutTx stores transactions in db
func (d *RocksDB) PutTx(tx *bchain.Tx, height uint32, blockTime int64) error {
key, err := packTxid(tx.Txid)
if err != nil {
return nil
}
buf, err := packTx(tx, height)
buf, err := packTx(tx, height, blockTime)
if err != nil {
return err
}
return d.db.PutCF(d.wo, d.cfh[cfTransactions], key, buf)
}
// DeleteTx removes transactions from db
func (d *RocksDB) DeleteTx(txid string) error {
key, err := packTxid(txid)
if err != nil {
@ -634,15 +640,22 @@ func unpackOutputScript(buf []byte) string {
return hex.EncodeToString(buf)
}
func packTx(tx *bchain.Tx, height uint32) ([]byte, error) {
buf := make([]byte, 4+len(tx.Hex)/2)
func packTx(tx *bchain.Tx, height uint32, blockTime int64) ([]byte, error) {
bt := packVarint64(blockTime)
buf := make([]byte, 4+len(bt)+len(tx.Hex)/2)
binary.BigEndian.PutUint32(buf[0:4], height)
_, err := hex.Decode(buf[4:], []byte(tx.Hex))
copy(buf[4:], bt)
_, err := hex.Decode(buf[4+len(bt):], []byte(tx.Hex))
return buf, err
}
func unpackTx(buf []byte) (*bchain.Tx, uint32, error) {
height := unpackUint(buf)
tx, err := bchain.ParseTx(buf[4:])
return tx, height, err
bt, l := unpackVarint64(buf[4:])
tx, err := bchain.ParseTx(buf[4+l:])
if err != nil {
return nil, 0, err
}
tx.Blocktime = bt
return tx, height, nil
}

View File

@ -7,12 +7,11 @@ import (
"testing"
)
var testTx = bchain.Tx{
// Blocktime: 1520253021,
Hex: "01000000017f9a22c9cbf54bd902400df746f138f37bcf5b4d93eb755820e974ba43ed5f42040000006a4730440220037f4ed5427cde81d55b9b6a2fd08c8a25090c2c2fff3a75c1a57625ca8a7118022076c702fe55969fa08137f71afd4851c48e31082dd3c40c919c92cdbc826758d30121029f6da5623c9f9b68a9baf9c1bc7511df88fa34c6c2f71f7c62f2f03ff48dca80feffffff019c9700000000000017a9146144d57c8aff48492c9dfb914e120b20bad72d6f8773d00700",
Txid: "056e3d82e5ffd0e915fb9b62797d76263508c34fe3e5dbed30dd3e943930f204",
LockTime: 512115,
// Time: 1520253022,
var testTx1 = bchain.Tx{
Hex: "01000000017f9a22c9cbf54bd902400df746f138f37bcf5b4d93eb755820e974ba43ed5f42040000006a4730440220037f4ed5427cde81d55b9b6a2fd08c8a25090c2c2fff3a75c1a57625ca8a7118022076c702fe55969fa08137f71afd4851c48e31082dd3c40c919c92cdbc826758d30121029f6da5623c9f9b68a9baf9c1bc7511df88fa34c6c2f71f7c62f2f03ff48dca80feffffff019c9700000000000017a9146144d57c8aff48492c9dfb914e120b20bad72d6f8773d00700",
Blocktime: 1519053802,
Txid: "056e3d82e5ffd0e915fb9b62797d76263508c34fe3e5dbed30dd3e943930f204",
LockTime: 512115,
Vin: []bchain.Vin{
{
ScriptSig: bchain.ScriptSig{
@ -36,13 +35,53 @@ var testTx = bchain.Tx{
},
},
}
var testTxPacked1 = "0001e2408ba8d7af5401000000017f9a22c9cbf54bd902400df746f138f37bcf5b4d93eb755820e974ba43ed5f42040000006a4730440220037f4ed5427cde81d55b9b6a2fd08c8a25090c2c2fff3a75c1a57625ca8a7118022076c702fe55969fa08137f71afd4851c48e31082dd3c40c919c92cdbc826758d30121029f6da5623c9f9b68a9baf9c1bc7511df88fa34c6c2f71f7c62f2f03ff48dca80feffffff019c9700000000000017a9146144d57c8aff48492c9dfb914e120b20bad72d6f8773d00700"
var testTxPacked = "0001e24001000000017f9a22c9cbf54bd902400df746f138f37bcf5b4d93eb755820e974ba43ed5f42040000006a4730440220037f4ed5427cde81d55b9b6a2fd08c8a25090c2c2fff3a75c1a57625ca8a7118022076c702fe55969fa08137f71afd4851c48e31082dd3c40c919c92cdbc826758d30121029f6da5623c9f9b68a9baf9c1bc7511df88fa34c6c2f71f7c62f2f03ff48dca80feffffff019c9700000000000017a9146144d57c8aff48492c9dfb914e120b20bad72d6f8773d00700"
var testTx2 = bchain.Tx{
Hex: "010000000001019d64f0c72a0d206001decbffaa722eb1044534c74eee7a5df8318e42a4323ec10000000017160014550da1f5d25a9dae2eafd6902b4194c4c6500af6ffffffff02809698000000000017a914cd668d781ece600efa4b2404dc91fd26b8b8aed8870553d7360000000017a914246655bdbd54c7e477d0ea2375e86e0db2b8f80a8702473044022076aba4ad559616905fa51d4ddd357fc1fdb428d40cb388e042cdd1da4a1b7357022011916f90c712ead9a66d5f058252efd280439ad8956a967e95d437d246710bc9012102a80a5964c5612bb769ef73147b2cf3c149bc0fd4ecb02f8097629c94ab013ffd00000000",
Blocktime: 1235678901,
Txid: "474e6795760ebe81cb4023dc227e5a0efe340e1771c89a0035276361ed733de7",
LockTime: 0,
Vin: []bchain.Vin{
{
ScriptSig: bchain.ScriptSig{
Hex: "160014550da1f5d25a9dae2eafd6902b4194c4c6500af6",
},
Txid: "c13e32a4428e31f85d7aee4ec7344504b12e72aaffcbde0160200d2ac7f0649d",
Vout: 0,
Sequence: 4294967295,
},
},
Vout: []bchain.Vout{
{
Value: .1,
N: 0,
ScriptPubKey: bchain.ScriptPubKey{
Hex: "a914cd668d781ece600efa4b2404dc91fd26b8b8aed887",
Addresses: []string{
"2NByHN6A8QYkBATzxf4pRGbCSHD5CEN2TRu",
},
},
},
{
Value: 9.20081157,
N: 1,
ScriptPubKey: bchain.ScriptPubKey{
Hex: "a914246655bdbd54c7e477d0ea2375e86e0db2b8f80a87",
Addresses: []string{
"2MvZguYaGjM7JihBgNqgLF2Ca2Enb76Hj9D",
},
},
},
},
}
var testTxPacked2 = "0007c91a899ab7da6a010000000001019d64f0c72a0d206001decbffaa722eb1044534c74eee7a5df8318e42a4323ec10000000017160014550da1f5d25a9dae2eafd6902b4194c4c6500af6ffffffff02809698000000000017a914cd668d781ece600efa4b2404dc91fd26b8b8aed8870553d7360000000017a914246655bdbd54c7e477d0ea2375e86e0db2b8f80a8702473044022076aba4ad559616905fa51d4ddd357fc1fdb428d40cb388e042cdd1da4a1b7357022011916f90c712ead9a66d5f058252efd280439ad8956a967e95d437d246710bc9012102a80a5964c5612bb769ef73147b2cf3c149bc0fd4ecb02f8097629c94ab013ffd00000000"
func Test_packTx(t *testing.T) {
type args struct {
tx bchain.Tx
height uint32
tx bchain.Tx
height uint32
blockTime int64
}
tests := []struct {
name string
@ -51,15 +90,21 @@ func Test_packTx(t *testing.T) {
wantErr bool
}{
{
name: "1",
args: args{testTx, 123456},
want: testTxPacked,
name: "btc-1",
args: args{testTx1, 123456, 1519053802},
want: testTxPacked1,
wantErr: false,
},
{
name: "testnet-1",
args: args{testTx2, 510234, 1235678901},
want: testTxPacked2,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := packTx(&tt.args.tx, tt.args.height)
got, err := packTx(&tt.args.tx, tt.args.height, tt.args.blockTime)
if (err != nil) != tt.wantErr {
t.Errorf("packTx() error = %v, wantErr %v", err, tt.wantErr)
return
@ -84,12 +129,20 @@ func Test_unpackTx(t *testing.T) {
wantErr bool
}{
{
name: "1",
args: args{packedTx: testTxPacked},
want: &testTx,
name: "btc-1",
args: args{packedTx: testTxPacked1},
want: &testTx1,
want1: 123456,
wantErr: false,
},
// this test fails now, needs testnet chaincfg.TestNet3Params
{
name: "testnet-1",
args: args{packedTx: testTxPacked2},
want: &testTx2,
want1: 510234,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

47
db/txcache.go 100644
View File

@ -0,0 +1,47 @@
package db
import (
"blockbook/bchain"
"github.com/golang/glog"
)
// TxCache is handle to TxCacheServer
type TxCache struct {
db *RocksDB
chain *bchain.BitcoinRPC
}
// NewTxCache creates new TxCache interface and returns its handle
func NewTxCache(db *RocksDB, chain *bchain.BitcoinRPC) (*TxCache, error) {
return &TxCache{
db: db,
chain: chain,
}, nil
}
// GetTransaction returns transaction either from RocksDB or if not present from blockchain
// it the transaction is confirmed, it is stored in the RocksDB
func (c *TxCache) GetTransaction(txid string, bestheight uint32) (*bchain.Tx, error) {
tx, h, err := c.db.GetTx(txid)
if err != nil {
return nil, err
}
if tx != nil {
tx.Confirmations = bestheight - h
return tx, nil
}
tx, err = c.chain.GetTransaction(txid)
if err != nil {
return nil, err
}
// do not cache mempool transactions
if tx.Confirmations > 0 {
err = c.db.PutTx(tx, bestheight-tx.Confirmations, tx.Blocktime)
// do not return caching error, only log it
if err != nil {
glog.Error("PutTx error ", err)
}
}
return tx, nil
}

View File

@ -23,13 +23,14 @@ type SocketIoServer struct {
server *gosocketio.Server
https *http.Server
db *db.RocksDB
txCache *db.TxCache
mempool *bchain.Mempool
chain *bchain.BitcoinRPC
explorerURL string
}
// NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle
func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, mempool *bchain.Mempool, chain *bchain.BitcoinRPC, explorerURL string) (*SocketIoServer, error) {
func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, mempool *bchain.Mempool, chain *bchain.BitcoinRPC, txCache *db.TxCache, explorerURL string) (*SocketIoServer, error) {
server := gosocketio.NewServer(transport.GetDefaultWebsocketTransport())
server.On(gosocketio.OnConnection, func(c *gosocketio.Channel) {
@ -62,6 +63,7 @@ func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, mempool
https: https,
server: server,
db: db,
txCache: txCache,
mempool: mempool,
chain: chain,
explorerURL: explorerURL,
@ -292,7 +294,7 @@ type txOutputs struct {
Script *string `json:"script"`
// ScriptAsm *string `json:"scriptAsm"`
SpentTxID *string `json:"spentTxId,omitempty"`
SpentIndex int `json:"spentIndex,omitempty"`
SpentIndex int `json:"spentIndex"`
SpentHeight int `json:"spentHeight,omitempty"`
Address *string `json:"address"`
}
@ -365,16 +367,16 @@ func (s *SocketIoServer) getAddressHistory(addr []string, rr *reqRange) (res res
txids := txr.Result
res.Result.TotalCount = len(txids)
res.Result.Items = make([]addressHistoryItem, 0)
txCache := make(map[string]*bchain.Tx, len(txids))
localCache := make(map[string]*bchain.Tx, len(txids))
for i, txid := range txids {
if i >= rr.From && i < rr.To {
tx, ok := txCache[txid]
tx, ok := localCache[txid]
if !ok {
tx, err = s.chain.GetTransaction(txid)
tx, err = s.txCache.GetTransaction(txid, bestheight)
if err != nil {
return res, err
}
txCache[txid] = tx
localCache[txid] = tx
}
ads := make(map[string]addressHistoryIndexes)
hi := make([]txInputs, 0)
@ -386,13 +388,13 @@ func (s *SocketIoServer) getAddressHistory(addr []string, rr *reqRange) (res res
OutputIndex: int(vin.Vout),
}
if vin.Txid != "" {
otx, ok := txCache[vin.Txid]
otx, ok := localCache[vin.Txid]
if !ok {
otx, err = s.chain.GetTransaction(vin.Txid)
otx, err = s.txCache.GetTransaction(vin.Txid, bestheight)
if err != nil {
return res, err
}
txCache[vin.Txid] = otx
localCache[vin.Txid] = otx
}
if len(otx.Vout) > int(vin.Vout) {
vout := otx.Vout[vin.Vout]
@ -604,7 +606,7 @@ func (s *SocketIoServer) getDetailedTransaction(txid string) (res resultGetDetai
if err != nil {
return
}
tx, err := s.chain.GetTransaction(txid)
tx, err := s.txCache.GetTransaction(txid, bestheight)
if err != nil {
return res, err
}
@ -617,7 +619,7 @@ func (s *SocketIoServer) getDetailedTransaction(txid string) (res resultGetDetai
OutputIndex: int(vin.Vout),
}
if vin.Txid != "" {
otx, err := s.chain.GetTransaction(vin.Txid)
otx, err := s.txCache.GetTransaction(vin.Txid, bestheight)
if err != nil {
return res, err
}