package server import ( "blockbook/api" "blockbook/bchain" "blockbook/common" "blockbook/db" "encoding/json" "math/big" "net/http" "runtime/debug" "strconv" "sync" "sync/atomic" "time" "github.com/golang/glog" "github.com/gorilla/websocket" "github.com/juju/errors" ) const upgradeFailed = "Upgrade failed: " const outChannelSize = 500 const defaultTimeout = 60 * time.Second var ( // ErrorMethodNotAllowed is returned when client tries to upgrade method other than GET ErrorMethodNotAllowed = errors.New("Method not allowed") connectionCounter uint64 ) type websocketReq struct { ID string `json:"id"` Method string `json:"method"` Params json.RawMessage `json:"params"` } type websocketRes struct { ID string `json:"id"` Data interface{} `json:"data"` } type websocketChannel struct { id uint64 conn *websocket.Conn out chan *websocketRes ip string requestHeader http.Header alive bool aliveLock sync.Mutex } // WebsocketServer is a handle to websocket server type WebsocketServer struct { socket *websocket.Conn upgrader *websocket.Upgrader db *db.RocksDB txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser mempool bchain.Mempool metrics *common.Metrics is *common.InternalState api *api.Worker block0hash string newBlockSubscriptions map[*websocketChannel]string newBlockSubscriptionsLock sync.Mutex addressSubscriptions map[string]map[*websocketChannel]string addressSubscriptionsLock sync.Mutex fiatRatesSubscriptions map[string]map[*websocketChannel]string fiatRatesSubscriptionsLock sync.Mutex } // NewWebsocketServer creates new websocket interface to blockbook and returns its handle func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState) (*WebsocketServer, error) { api, err := api.NewWorker(db, chain, mempool, txCache, is) if err != nil { return nil, err } b0, err := db.GetBlockHash(0) if err != nil { return nil, err } s := &WebsocketServer{ upgrader: &websocket.Upgrader{ ReadBufferSize: 1024 * 32, WriteBufferSize: 1024 * 32, CheckOrigin: checkOrigin, }, db: db, txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), mempool: mempool, metrics: metrics, is: is, api: api, block0hash: b0, newBlockSubscriptions: make(map[*websocketChannel]string), addressSubscriptions: make(map[string]map[*websocketChannel]string), fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string), } return s, nil } // allow all origins func checkOrigin(r *http.Request) bool { return true } // ServeHTTP sets up handler of websocket channel func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { http.Error(w, upgradeFailed+ErrorMethodNotAllowed.Error(), 503) return } conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { http.Error(w, upgradeFailed+err.Error(), 503) return } c := &websocketChannel{ id: atomic.AddUint64(&connectionCounter, 1), conn: conn, out: make(chan *websocketRes, outChannelSize), ip: r.RemoteAddr, requestHeader: r.Header, alive: true, } go s.inputLoop(c) go s.outputLoop(c) s.onConnect(c) } // GetHandler returns http handler func (s *WebsocketServer) GetHandler() http.Handler { return s } func (s *WebsocketServer) closeChannel(c *websocketChannel) { c.aliveLock.Lock() defer c.aliveLock.Unlock() if c.alive { c.conn.Close() c.alive = false //clean out close(c.out) for len(c.out) > 0 { <-c.out } s.onDisconnect(c) } } func (c *websocketChannel) IsAlive() bool { c.aliveLock.Lock() defer c.aliveLock.Unlock() return c.alive } func (s *WebsocketServer) inputLoop(c *websocketChannel) { defer func() { if r := recover(); r != nil { glog.Error("recovered from panic: ", r, ", ", c.id) debug.PrintStack() s.closeChannel(c) } }() for { t, d, err := c.conn.ReadMessage() if err != nil { s.closeChannel(c) return } switch t { case websocket.TextMessage: var req websocketReq err := json.Unmarshal(d, &req) if err != nil { glog.Error("Error parsing message from ", c.id, ", ", string(d), ", ", err) s.closeChannel(c) return } go s.onRequest(c, &req) case websocket.BinaryMessage: glog.Error("Binary message received from ", c.id, ", ", c.ip) s.closeChannel(c) return case websocket.PingMessage: c.conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(defaultTimeout)) break case websocket.CloseMessage: s.closeChannel(c) return case websocket.PongMessage: // do nothing } } } func (s *WebsocketServer) outputLoop(c *websocketChannel) { for m := range c.out { err := c.conn.WriteJSON(m) if err != nil { glog.Error("Error sending message to ", c.id, ", ", err) s.closeChannel(c) } } } func (s *WebsocketServer) onConnect(c *websocketChannel) { glog.Info("Client connected ", c.id, ", ", c.ip) s.metrics.WebsocketClients.Inc() } func (s *WebsocketServer) onDisconnect(c *websocketChannel) { s.unsubscribeNewBlock(c) s.unsubscribeAddresses(c) s.unsubscribeFiatRates(c) glog.Info("Client disconnected ", c.id, ", ", c.ip) s.metrics.WebsocketClients.Dec() } var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *websocketReq) (interface{}, error){ "getAccountInfo": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r, err := unmarshalGetAccountInfoRequest(req.Params) if err == nil { rv, err = s.getAccountInfo(r) } return }, "getInfo": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { return s.getInfo() }, "getBlockHash": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct { Height int `json:"height"` }{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getBlockHash(r.Height) } return }, "getAccountUtxo": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct { Descriptor string `json:"descriptor"` }{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getAccountUtxo(r.Descriptor) } return }, "getTransaction": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct { Txid string `json:"txid"` }{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getTransaction(r.Txid) } return }, "getTransactionSpecific": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct { Txid string `json:"txid"` }{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getTransactionSpecific(r.Txid) } return }, "estimateFee": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { return s.estimateFee(c, req.Params) }, "sendTransaction": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct { Hex string `json:"hex"` }{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.sendTransaction(r.Hex) } return }, "subscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { return s.subscribeNewBlock(c, req) }, "unsubscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { return s.unsubscribeNewBlock(c) }, "subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { ad, err := s.unmarshalAddresses(req.Params) if err == nil { rv, err = s.subscribeAddresses(c, ad, req) } return }, "unsubscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { return s.unsubscribeAddresses(c) }, "subscribeFiatRates": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct { Currency string `json:"currency"` }{} err = json.Unmarshal(req.Params, &r) if err != nil { return nil, err } return s.subscribeFiatRates(c, r.Currency, req) }, "unsubscribeFiatRates": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { return s.unsubscribeFiatRates(c) }, "ping": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct{}{} return r, nil }, "getCurrentFiatRates": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct { Currency string `json:"currency"` }{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getCurrentFiatRates(r.Currency) } return }, "getFiatRatesForDates": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct { Dates []string `json:"dates"` Currency string `json:"currency"` }{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getFiatRatesForDates(r.Dates, r.Currency) } return }, "getFiatRatesTickersList": func(s *WebsocketServer, c *websocketChannel, req *websocketReq) (rv interface{}, err error) { r := struct { Date string `json:"date"` }{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getFiatRatesTickersList(r.Date) } return }, } func sendResponse(c *websocketChannel, req *websocketReq, data interface{}) { defer func() { if r := recover(); r != nil { glog.Error("Client ", c.id, ", onRequest ", req.Method, " recovered from panic: ", r) } }() c.out <- &websocketRes{ ID: req.ID, Data: data, } } func (s *WebsocketServer) onRequest(c *websocketChannel, req *websocketReq) { var err error var data interface{} defer func() { if r := recover(); r != nil { glog.Error("Client ", c.id, ", onRequest ", req.Method, " recovered from panic: ", r) debug.PrintStack() e := resultError{} e.Error.Message = "Internal error" data = e } // nil data means no response if data != nil { sendResponse(c, req, data) } }() t := time.Now() defer s.metrics.WebsocketReqDuration.With(common.Labels{"method": req.Method}).Observe(float64(time.Since(t)) / 1e3) // in microseconds f, ok := requestHandlers[req.Method] if ok { data, err = f(s, c, req) } else { err = errors.New("unknown method") } if err == nil { glog.V(1).Info("Client ", c.id, " onRequest ", req.Method, " success") s.metrics.WebsocketRequests.With(common.Labels{"method": req.Method, "status": "success"}).Inc() } else { glog.Error("Client ", c.id, " onMessage ", req.Method, ": ", errors.ErrorStack(err), ", data ", string(req.Params)) s.metrics.WebsocketRequests.With(common.Labels{"method": req.Method, "status": "failure"}).Inc() e := resultError{} e.Error.Message = err.Error() data = e } } type accountInfoReq struct { Descriptor string `json:"descriptor"` Details string `json:"details"` Tokens string `json:"tokens"` PageSize int `json:"pageSize"` Page int `json:"page"` FromHeight int `json:"from"` ToHeight int `json:"to"` ContractFilter string `json:"contractFilter"` Gap int `json:"gap"` } func unmarshalGetAccountInfoRequest(params []byte) (*accountInfoReq, error) { var r accountInfoReq err := json.Unmarshal(params, &r) if err != nil { return nil, err } return &r, nil } func (s *WebsocketServer) getAccountInfo(req *accountInfoReq) (res *api.Address, err error) { var opt api.AccountDetails switch req.Details { case "tokens": opt = api.AccountDetailsTokens case "tokenBalances": opt = api.AccountDetailsTokenBalances case "txids": opt = api.AccountDetailsTxidHistory case "txs": opt = api.AccountDetailsTxHistory default: opt = api.AccountDetailsBasic } var tokensToReturn api.TokensToReturn switch req.Tokens { case "used": tokensToReturn = api.TokensToReturnUsed case "nonzero": tokensToReturn = api.TokensToReturnNonzeroBalance default: tokensToReturn = api.TokensToReturnDerived } filter := api.AddressFilter{ FromHeight: uint32(req.FromHeight), ToHeight: uint32(req.ToHeight), Contract: req.ContractFilter, Vout: api.AddressFilterVoutOff, TokensToReturn: tokensToReturn, } if req.PageSize == 0 { req.PageSize = txsOnPage } a, err := s.api.GetXpubAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, req.Gap) if err != nil { return s.api.GetAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter) } return a, nil } func (s *WebsocketServer) getAccountUtxo(descriptor string) (interface{}, error) { utxo, err := s.api.GetXpubUtxo(descriptor, false, 0) if err != nil { return s.api.GetAddressUtxo(descriptor, false) } return utxo, nil } func (s *WebsocketServer) getTransaction(txid string) (interface{}, error) { return s.api.GetTransaction(txid, false, false) } func (s *WebsocketServer) getTransactionSpecific(txid string) (interface{}, error) { return s.chain.GetTransactionSpecific(&bchain.Tx{Txid: txid}) } func (s *WebsocketServer) getInfo() (interface{}, error) { vi := common.GetVersionInfo() height, hash, err := s.db.GetBestBlock() if err != nil { return nil, err } type info struct { Name string `json:"name"` Shortcut string `json:"shortcut"` Decimals int `json:"decimals"` Version string `json:"version"` BestHeight int `json:"bestHeight"` BestHash string `json:"bestHash"` Block0Hash string `json:"block0Hash"` Testnet bool `json:"testnet"` } return &info{ Name: s.is.Coin, Shortcut: s.is.CoinShortcut, Decimals: s.chainParser.AmountDecimals(), BestHeight: int(height), BestHash: hash, Version: vi.Version, Block0Hash: s.block0hash, Testnet: s.chain.IsTestnet(), }, nil } func (s *WebsocketServer) getBlockHash(height int) (interface{}, error) { h, err := s.db.GetBlockHash(uint32(height)) if err != nil { return nil, err } type hash struct { Hash string `json:"hash"` } return &hash{ Hash: h, }, nil } func (s *WebsocketServer) estimateFee(c *websocketChannel, params []byte) (interface{}, error) { type estimateFeeReq struct { Blocks []int `json:"blocks"` Specific map[string]interface{} `json:"specific"` } type estimateFeeRes struct { FeePerTx string `json:"feePerTx,omitempty"` FeePerUnit string `json:"feePerUnit,omitempty"` FeeLimit string `json:"feeLimit,omitempty"` } var r estimateFeeReq err := json.Unmarshal(params, &r) if err != nil { return nil, err } res := make([]estimateFeeRes, len(r.Blocks)) if s.chainParser.GetChainType() == bchain.ChainEthereumType { gas, err := s.chain.EthereumTypeEstimateGas(r.Specific) if err != nil { return nil, err } sg := strconv.FormatUint(gas, 10) for i, b := range r.Blocks { fee, err := s.chain.EstimateSmartFee(b, true) if err != nil { return nil, err } res[i].FeePerUnit = fee.String() res[i].FeeLimit = sg fee.Mul(&fee, new(big.Int).SetUint64(gas)) res[i].FeePerTx = fee.String() } } else { conservative := true v, ok := r.Specific["conservative"] if ok { vc, ok := v.(bool) if ok { conservative = vc } } txSize := 0 v, ok = r.Specific["txsize"] if ok { f, ok := v.(float64) if ok { txSize = int(f) } } for i, b := range r.Blocks { fee, err := s.chain.EstimateSmartFee(b, conservative) if err != nil { return nil, err } res[i].FeePerUnit = fee.String() if txSize > 0 { fee.Mul(&fee, big.NewInt(int64(txSize))) fee.Add(&fee, big.NewInt(500)) fee.Div(&fee, big.NewInt(1000)) res[i].FeePerTx = fee.String() } } } return res, nil } func (s *WebsocketServer) sendTransaction(tx string) (res resultSendTransaction, err error) { txid, err := s.chain.SendRawTransaction(tx) if err != nil { return res, err } res.Result = txid return } type subscriptionResponse struct { Subscribed bool `json:"subscribed"` } func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *websocketReq) (res interface{}, err error) { s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() s.newBlockSubscriptions[c] = req.ID return &subscriptionResponse{true}, nil } func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interface{}, err error) { s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() delete(s.newBlockSubscriptions, c) return &subscriptionResponse{false}, nil } func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]bchain.AddressDescriptor, error) { r := struct { Addresses []string `json:"addresses"` }{} err := json.Unmarshal(params, &r) if err != nil { return nil, err } rv := make([]bchain.AddressDescriptor, len(r.Addresses)) for i, a := range r.Addresses { ad, err := s.chainParser.GetAddrDescFromAddress(a) if err != nil { return nil, err } rv[i] = ad } return rv, nil } func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []bchain.AddressDescriptor, req *websocketReq) (res interface{}, err error) { // unsubscribe all previous subscriptions s.unsubscribeAddresses(c) s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() for i := range addrDesc { ads := string(addrDesc[i]) as, ok := s.addressSubscriptions[ads] if !ok { as = make(map[*websocketChannel]string) s.addressSubscriptions[ads] = as } as[c] = req.ID } return &subscriptionResponse{true}, nil } // unsubscribeAddresses unsubscribes all address subscriptions by this channel func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interface{}, err error) { s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() for _, sa := range s.addressSubscriptions { for sc := range sa { if sc == c { delete(sa, c) } } } return &subscriptionResponse{false}, nil } // subscribeFiatRates subscribes all FiatRates subscriptions by this channel func (s *WebsocketServer) subscribeFiatRates(c *websocketChannel, currency string, req *websocketReq) (res interface{}, err error) { // unsubscribe all previous subscriptions s.unsubscribeFiatRates(c) s.fiatRatesSubscriptionsLock.Lock() defer s.fiatRatesSubscriptionsLock.Unlock() as, ok := s.fiatRatesSubscriptions[currency] if !ok { as = make(map[*websocketChannel]string) s.fiatRatesSubscriptions[currency] = as } as[c] = req.ID return &subscriptionResponse{true}, nil } // unsubscribeFiatRates unsubscribes all FiatRates subscriptions by this channel func (s *WebsocketServer) unsubscribeFiatRates(c *websocketChannel) (res interface{}, err error) { s.fiatRatesSubscriptionsLock.Lock() defer s.fiatRatesSubscriptionsLock.Unlock() for _, sa := range s.fiatRatesSubscriptions { for sc := range sa { if sc == c { delete(sa, c) } } } return &subscriptionResponse{false}, nil } // OnNewBlock is a callback that broadcasts info about new block to subscribed clients func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() data := struct { Height uint32 `json:"height"` Hash string `json:"hash"` }{ Height: height, Hash: hash, } for c, id := range s.newBlockSubscriptions { if c.IsAlive() { c.out <- &websocketRes{ ID: id, Data: &data, } } } glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") } // OnNewTxAddr is a callback that broadcasts info about a tx affecting subscribed address func (s *WebsocketServer) OnNewTxAddr(tx *bchain.Tx, addrDesc bchain.AddressDescriptor) { // check if there is any subscription but release the lock immediately, GetTransactionFromBchainTx may take some time s.addressSubscriptionsLock.Lock() as, ok := s.addressSubscriptions[string(addrDesc)] lenAs := len(as) s.addressSubscriptionsLock.Unlock() if ok && lenAs > 0 { addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc) if err != nil { glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc) return } if len(addr) == 1 { atx, err := s.api.GetTransactionFromBchainTx(tx, 0, false, false) if err != nil { glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid) return } data := struct { Address string `json:"address"` Tx *api.Tx `json:"tx"` }{ Address: addr[0], Tx: atx, } // get the list of subscriptions again, this time keep the lock s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() as, ok = s.addressSubscriptions[string(addrDesc)] if ok { for c, id := range as { if c.IsAlive() { c.out <- &websocketRes{ ID: id, Data: &data, } } } glog.Info("broadcasting new tx ", tx.Txid, " for addr ", addr[0], " to ", len(as), " channels") } } } } func (s *WebsocketServer) broadcastTicker(coin string, rate json.Number) { s.fiatRatesSubscriptionsLock.Lock() defer s.fiatRatesSubscriptionsLock.Unlock() as, ok := s.fiatRatesSubscriptions[coin] if ok && len(as) > 0 { data := struct { Rate interface{} `json:"rate"` }{ Rate: rate, } // get the list of subscriptions again, this time keep the lock as, ok = s.fiatRatesSubscriptions[coin] if ok { for c, id := range as { if c.IsAlive() { c.out <- &websocketRes{ ID: id, Data: &data, } } } glog.Info("broadcasting new rate ", rate, " for coin ", coin, " to ", len(as), " channels") } } } // OnNewFiatRatesTicker is a callback that broadcasts info about fiat rates affecting subscribed currency func (s *WebsocketServer) OnNewFiatRatesTicker(ticker *db.CurrencyRatesTicker) { for currency, rate := range ticker.Rates { s.broadcastTicker(currency, rate) } } func (s *WebsocketServer) getCurrentFiatRates(currency string) (interface{}, error) { ret, err := s.api.GetCurrentFiatRates(currency) return ret, err } func (s *WebsocketServer) getFiatRatesForDates(dates []string, currency string) (interface{}, error) { ret, err := s.api.GetFiatRatesForDates(dates, currency) return ret, err } func (s *WebsocketServer) getFiatRatesTickersList(date string) (interface{}, error) { ret, err := s.api.GetFiatRatesTickersList(date) return ret, err }