blockbook/fiat/fiat_rates.go

215 lines
7.2 KiB
Go

package fiat
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"time"
"github.com/golang/glog"
"spacecruft.org/spacecruft/blockbook/db"
)
// OnNewFiatRatesTicker is used to send notification about a new FiatRates ticker
type OnNewFiatRatesTicker func(ticker *db.CurrencyRatesTicker)
// RatesDownloaderInterface provides method signatures for specific fiat rates downloaders
type RatesDownloaderInterface interface {
getTicker(timestamp *time.Time) (*db.CurrencyRatesTicker, error)
marketDataExists(timestamp *time.Time) (bool, error)
}
// RatesDownloader stores FiatRates API parameters
type RatesDownloader struct {
periodSeconds time.Duration
db *db.RocksDB
startTime *time.Time // a starting timestamp for tests to be deterministic (time.Now() for production)
timeFormat string
callbackOnNewTicker OnNewFiatRatesTicker
downloader RatesDownloaderInterface
}
// NewFiatRatesDownloader initiallizes the downloader for FiatRates API.
// If the startTime is nil, the downloader will start from the beginning.
func NewFiatRatesDownloader(db *db.RocksDB, apiType string, params string, startTime *time.Time, callback OnNewFiatRatesTicker) (*RatesDownloader, error) {
var rd = &RatesDownloader{}
type fiatRatesParams struct {
URL string `json:"url"`
Coin string `json:"coin"`
PeriodSeconds int `json:"periodSeconds"`
}
rdParams := &fiatRatesParams{}
err := json.Unmarshal([]byte(params), &rdParams)
if err != nil {
return nil, err
}
if rdParams.URL == "" || rdParams.PeriodSeconds == 0 {
return nil, errors.New("Missing parameters")
}
rd.timeFormat = "02-01-2006" // Layout string for FiatRates date formatting (DD-MM-YYYY)
rd.periodSeconds = time.Duration(rdParams.PeriodSeconds) * time.Second // Time period for syncing the latest market data
rd.db = db
rd.callbackOnNewTicker = callback
if startTime == nil {
timeNow := time.Now().UTC()
rd.startTime = &timeNow
} else {
rd.startTime = startTime // If startTime is nil, time.Now() will be used
}
if apiType == "coingecko" {
rd.downloader = NewCoinGeckoDownloader(rdParams.URL, rdParams.Coin, rd.timeFormat)
} else {
return nil, fmt.Errorf("NewFiatRatesDownloader: incorrect API type %q", apiType)
}
return rd, nil
}
// Run starts the FiatRates downloader. If there are tickers available, it continues from the last record.
// If there are no tickers, it finds the earliest market data available on API and downloads historical data.
// When historical data is downloaded, it continues to fetch the latest ticker prices.
func (rd *RatesDownloader) Run() error {
var timestamp *time.Time
// Check if there are any tickers stored in database
glog.Infof("Finding last available ticker...")
ticker, err := rd.db.FiatRatesFindLastTicker()
if err != nil {
glog.Errorf("RatesDownloader FindTicker error: %v", err)
return err
}
if ticker == nil {
// If no tickers found, start downloading from the beginning
glog.Infof("No tickers found! Looking up the earliest market data available on API and downloading from there.")
timestamp, err = rd.findEarliestMarketData()
if err != nil {
glog.Errorf("Error looking up earliest market data: %v", err)
return err
}
} else {
// If found, continue downloading data from the next day of the last available record
glog.Infof("Last available ticker: %v", ticker.Timestamp)
timestamp = ticker.Timestamp
}
err = rd.syncHistorical(timestamp)
if err != nil {
glog.Errorf("RatesDownloader syncHistorical error: %v", err)
return err
}
if err := rd.syncLatest(); err != nil {
glog.Errorf("RatesDownloader syncLatest error: %v", err)
return err
}
return nil
}
// FindEarliestMarketData uses binary search to find the oldest market data available on API.
func (rd *RatesDownloader) findEarliestMarketData() (*time.Time, error) {
minDateString := "03-01-2009"
minDate, err := time.Parse(rd.timeFormat, minDateString)
if err != nil {
glog.Error("Error parsing date: ", err)
return nil, err
}
maxDate := rd.startTime.Add(time.Duration(-24) * time.Hour) // today's historical tickers may not be ready yet, so set to yesterday
currentDate := maxDate
for {
var dataExists bool = false
for {
dataExists, err = rd.downloader.marketDataExists(&currentDate)
if err != nil {
glog.Errorf("Error checking if market data exists for date %v. Error: %v. Retrying in %v seconds.", currentDate, err, rd.periodSeconds)
timer := time.NewTimer(rd.periodSeconds)
<-timer.C
}
break
}
dateDiff := currentDate.Sub(minDate)
if dataExists {
if dateDiff < time.Hour*24 {
maxDate := time.Date(maxDate.Year(), maxDate.Month(), maxDate.Day(), 0, 0, 0, 0, maxDate.Location()) // truncate time to day
return &maxDate, nil
}
maxDate = currentDate
currentDate = currentDate.Add(-1 * dateDiff / 2)
} else {
minDate = currentDate
currentDate = currentDate.Add(maxDate.Sub(currentDate) / 2)
}
}
}
// syncLatest downloads the latest FiatRates data every rd.PeriodSeconds
func (rd *RatesDownloader) syncLatest() error {
timer := time.NewTimer(rd.periodSeconds)
var lastTickerRates map[string]float64
sameTickerCounter := 0
for {
ticker, err := rd.downloader.getTicker(nil)
if err != nil {
// Do not exit on GET error, log it, wait and try again
glog.Errorf("syncLatest GetData error: %v", err)
<-timer.C
timer.Reset(rd.periodSeconds)
continue
}
if sameTickerCounter < 5 && reflect.DeepEqual(ticker.Rates, lastTickerRates) {
// If rates are the same as previous, do not store them
glog.Infof("syncLatest: ticker rates for %v are the same as previous, skipping...", ticker.Timestamp)
<-timer.C
timer.Reset(rd.periodSeconds)
sameTickerCounter++
continue
}
lastTickerRates = ticker.Rates
sameTickerCounter = 0
glog.Infof("syncLatest: storing ticker for %v", ticker.Timestamp)
err = rd.db.FiatRatesStoreTicker(ticker)
if err != nil {
// If there's an error storing ticker (like missing rates), log it, wait and try again
glog.Errorf("syncLatest StoreTicker error: %v", err)
} else if rd.callbackOnNewTicker != nil {
rd.callbackOnNewTicker(ticker)
}
<-timer.C
timer.Reset(rd.periodSeconds)
}
}
// syncHistorical downloads all the historical data since the specified timestamp till today,
// then continues to download the latest rates
func (rd *RatesDownloader) syncHistorical(timestamp *time.Time) error {
period := time.Duration(1) * time.Second
timer := time.NewTimer(period)
for {
if rd.startTime.Sub(*timestamp) < time.Duration(time.Hour*24) {
break
}
ticker, err := rd.downloader.getTicker(timestamp)
if err != nil {
// Do not exit on GET error, log it, wait and try again
glog.Errorf("syncHistorical GetData error: %v", err)
<-timer.C
timer.Reset(rd.periodSeconds)
continue
}
glog.Infof("syncHistorical: storing ticker for %v", ticker.Timestamp)
err = rd.db.FiatRatesStoreTicker(ticker)
if err != nil {
// If there's an error storing ticker (like missing rates), log it and continue to the next day
glog.Errorf("syncHistorical error storing ticker for %v: %v", timestamp, err)
}
*timestamp = timestamp.Add(time.Hour * 24) // go to the next day
<-timer.C
timer.Reset(period)
}
return nil
}