2020-01-11 12:33:30 -07:00
|
|
|
#include "influxpush.hh"
|
|
|
|
#include "minicurl.hh"
|
|
|
|
using namespace std;
|
|
|
|
|
|
|
|
|
|
|
|
InfluxPusher::InfluxPusher(std::string_view dbname) : d_dbname(dbname)
|
|
|
|
{
|
|
|
|
if(dbname=="null") {
|
|
|
|
d_mute = true;
|
|
|
|
cout<<"Not sending data to influxdb"<<endl;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void InfluxPusher::queueValue(const std::string& line)
|
|
|
|
{
|
2020-07-03 14:33:34 -06:00
|
|
|
if(!d_buffer.insert(line).second)
|
|
|
|
d_numdedupmsmts++;
|
2020-01-11 12:33:30 -07:00
|
|
|
checkSend();
|
|
|
|
}
|
|
|
|
|
|
|
|
void InfluxPusher::addValueObserver(int src, string_view name, const initializer_list<pair<const char*, double>>& values, double t, std::optional<SatID> satid)
|
|
|
|
{
|
|
|
|
if(d_mute)
|
|
|
|
return;
|
|
|
|
|
|
|
|
if(t > 2200000000 || t < 0) {
|
|
|
|
cerr<<"Unable to store item "<<name<<" for observer "<<src<<": time out of range "<<t<<endl;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
for(const auto& p : values) {
|
|
|
|
if(isnan(p.second))
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
string buffer;
|
|
|
|
buffer+= string(name)+",src="+to_string(src);
|
|
|
|
if(satid) {
|
|
|
|
buffer+=",gnssid="+to_string(satid->gnss)+ +",sv=" +to_string(satid->sv)+",sigid="+to_string(satid->sigid);
|
|
|
|
}
|
|
|
|
|
|
|
|
buffer+= " ";
|
|
|
|
bool lefirst=true;
|
|
|
|
for(const auto& v : values) {
|
2020-07-09 13:46:56 -06:00
|
|
|
if(!v.first) // trick to null out certain fields
|
|
|
|
continue;
|
2020-07-03 14:33:34 -06:00
|
|
|
d_numvalues++;
|
2020-01-11 12:33:30 -07:00
|
|
|
if(!lefirst) {
|
|
|
|
buffer +=",";
|
|
|
|
}
|
|
|
|
lefirst=false;
|
|
|
|
buffer += string(v.first) + "="+to_string(v.second);
|
|
|
|
}
|
|
|
|
buffer += " " + to_string((uint64_t)(t* 1000000000))+"\n";
|
2020-07-03 14:33:34 -06:00
|
|
|
d_nummsmts++;
|
|
|
|
d_msmtmap[(string)name]++;
|
2020-01-11 12:33:30 -07:00
|
|
|
queueValue(buffer);
|
|
|
|
}
|
|
|
|
|
2020-07-03 14:33:34 -06:00
|
|
|
|
2020-06-17 12:49:34 -06:00
|
|
|
void InfluxPusher::addValue(const SatID& id, string_view name, const initializer_list<pair<const char*, var_t>>& values, double t, std::optional<int> src, std::optional<string> tag)
|
2020-07-03 14:33:34 -06:00
|
|
|
{
|
|
|
|
|
|
|
|
vector<pair<string,var_t>> tags{{"sv", id.sv}, {"gnssid", id.gnss}, {"sigid", id.sigid}};
|
|
|
|
|
|
|
|
if(src) {
|
|
|
|
tags.push_back({*tag, *src});
|
|
|
|
}
|
|
|
|
addValue(tags, name, values, t);
|
|
|
|
}
|
|
|
|
|
|
|
|
void InfluxPusher::addValue(const vector<pair<string,var_t>>& tags, string_view name, const initializer_list<pair<const char*, var_t>>& values, double t)
|
2020-01-11 12:33:30 -07:00
|
|
|
{
|
|
|
|
if(d_mute)
|
|
|
|
return;
|
|
|
|
|
|
|
|
if(t > 2200000000 || t < 0) {
|
2020-07-03 14:33:34 -06:00
|
|
|
cerr<<"Unable to store item "<<name<<" for ";
|
|
|
|
// for(const auto& t: tags)
|
|
|
|
// cerr<<t.first<<"="<<t.second<<" ";
|
|
|
|
cerr<<": time out of range "<<t<<endl;
|
2020-01-11 12:33:30 -07:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
for(const auto& p : values) {
|
2020-06-17 12:49:34 -06:00
|
|
|
if(auto ptr = std::get_if<double>(&p.second))
|
|
|
|
if(isnan(*ptr))
|
|
|
|
return;
|
2020-01-11 12:33:30 -07:00
|
|
|
}
|
|
|
|
|
2020-07-03 14:33:34 -06:00
|
|
|
string buffer = string(name);
|
|
|
|
for(const auto& t : tags) {
|
|
|
|
buffer += ","+t.first + "=";
|
|
|
|
std::visit([&buffer](auto&& arg) {
|
|
|
|
using T = std::decay_t<decltype(arg)>;
|
|
|
|
if constexpr (std::is_same_v<T, string>) {
|
|
|
|
// string tags really don't need a "
|
|
|
|
buffer += arg;
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
// resist the urge to do integer tags, it sucks
|
|
|
|
buffer += to_string(arg);
|
|
|
|
}
|
|
|
|
}, t.second);
|
|
|
|
}
|
2020-01-11 12:33:30 -07:00
|
|
|
|
|
|
|
buffer+= " ";
|
|
|
|
bool lefirst=true;
|
|
|
|
for(const auto& v : values) {
|
2020-07-09 13:46:56 -06:00
|
|
|
if(!v.first) // trick to null out certain fields
|
|
|
|
continue;
|
|
|
|
|
2020-07-03 14:33:34 -06:00
|
|
|
d_numvalues++;
|
2020-01-11 12:33:30 -07:00
|
|
|
if(!lefirst) {
|
|
|
|
buffer +=",";
|
|
|
|
}
|
|
|
|
lefirst=false;
|
2020-06-17 12:49:34 -06:00
|
|
|
buffer += string(v.first) + "=";
|
|
|
|
|
|
|
|
std::visit([&buffer](auto&& arg) {
|
|
|
|
using T = std::decay_t<decltype(arg)>;
|
2020-07-03 14:33:34 -06:00
|
|
|
if constexpr (std::is_same_v<T, string>)
|
|
|
|
buffer += "\""+arg+"\"";
|
|
|
|
else {
|
|
|
|
buffer += to_string(arg);
|
|
|
|
if constexpr (!std::is_same_v<T, double>)
|
|
|
|
buffer+="i";
|
|
|
|
}
|
2020-06-17 12:49:34 -06:00
|
|
|
}, v.second);
|
2020-01-11 12:33:30 -07:00
|
|
|
}
|
|
|
|
buffer += " " + to_string((uint64_t)(t*1000000000))+"\n";
|
2020-07-03 14:33:34 -06:00
|
|
|
d_nummsmts++;
|
|
|
|
d_msmtmap[(string)name]++;
|
2020-01-11 12:33:30 -07:00
|
|
|
queueValue(buffer);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void InfluxPusher::checkSend()
|
|
|
|
{
|
|
|
|
if(d_buffer.size() > 10000 || (time(0) - d_lastsent) > 10) {
|
|
|
|
set<string> buffer;
|
|
|
|
buffer.swap(d_buffer);
|
|
|
|
// thread t([buffer,this]() {
|
|
|
|
if(!d_mute)
|
|
|
|
doSend(buffer);
|
|
|
|
// });
|
|
|
|
// t.detach();
|
|
|
|
d_lastsent=time(0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void InfluxPusher::doSend(const set<std::string>& buffer)
|
|
|
|
{
|
|
|
|
MiniCurl mc;
|
|
|
|
MiniCurl::MiniCurlHeaders mch;
|
|
|
|
if(!buffer.empty()) {
|
|
|
|
string newout;
|
|
|
|
for(const auto& nl: buffer)
|
|
|
|
newout.append(nl);
|
|
|
|
|
|
|
|
/*
|
|
|
|
ofstream infl;
|
|
|
|
infl.open ("infl.txt", std::ofstream::out | std::ofstream::app);
|
|
|
|
infl << newout;
|
|
|
|
*/
|
2020-02-25 15:19:09 -07:00
|
|
|
try {
|
|
|
|
mc.postURL("http://127.0.0.1:8086/write?db="+d_dbname, newout, mch);
|
|
|
|
}
|
|
|
|
catch(std::exception& e) {
|
|
|
|
if(strstr(e.what(), "retention"))
|
|
|
|
return;
|
|
|
|
throw;
|
|
|
|
}
|
2020-01-11 12:33:30 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
InfluxPusher::~InfluxPusher()
|
|
|
|
{
|
|
|
|
if(d_dbname != "null")
|
|
|
|
doSend(d_buffer);
|
|
|
|
}
|