#include "influxpush.hh" #include "minicurl.hh" using namespace std; InfluxPusher::InfluxPusher(std::string_view dbname) : d_dbname(dbname) { if(dbname=="null") { d_mute = true; cout<<"Not sending data to influxdb"<>& values, double t, std::optional satid) { if(d_mute) return; if(t > 2200000000 || t < 0) { cerr<<"Unable to store item "<gnss)+ +",sv=" +to_string(satid->sv)+",sigid="+to_string(satid->sigid); } buffer+= " "; bool lefirst=true; for(const auto& v : values) { if(!v.first) // trick to null out certain fields continue; d_numvalues++; if(!lefirst) { buffer +=","; } lefirst=false; buffer += string(v.first) + "="+to_string(v.second); } buffer += " " + to_string((uint64_t)(t* 1000000000))+"\n"; d_nummsmts++; d_msmtmap[(string)name]++; queueValue(buffer); } void InfluxPusher::addValue(const SatID& id, string_view name, const initializer_list>& values, double t, std::optional src, std::optional tag) { vector> tags{{"sv", id.sv}, {"gnssid", id.gnss}, {"sigid", id.sigid}}; if(src) { tags.push_back({*tag, *src}); } addValue(tags, name, values, t); } void InfluxPusher::addValue(const vector>& tags, string_view name, const initializer_list>& values, double t) { if(d_mute) return; if(t > 2200000000 || t < 0) { cerr<<"Unable to store item "<(&p.second)) if(isnan(*ptr)) return; } string buffer = string(name); for(const auto& t : tags) { buffer += ","+t.first + "="; std::visit([&buffer](auto&& arg) { using T = std::decay_t; if constexpr (std::is_same_v) { // string tags really don't need a " buffer += arg; } else { // resist the urge to do integer tags, it sucks buffer += to_string(arg); } }, t.second); } buffer+= " "; bool lefirst=true; for(const auto& v : values) { if(!v.first) // trick to null out certain fields continue; d_numvalues++; if(!lefirst) { buffer +=","; } lefirst=false; buffer += string(v.first) + "="; std::visit([&buffer](auto&& arg) { using T = std::decay_t; if constexpr (std::is_same_v) buffer += "\""+arg+"\""; else { buffer += to_string(arg); if constexpr (!std::is_same_v) buffer+="i"; } }, v.second); } buffer += " " + to_string((uint64_t)(t*1000000000))+"\n"; d_nummsmts++; d_msmtmap[(string)name]++; queueValue(buffer); } void InfluxPusher::checkSend() { if(d_buffer.size() > 10000 || (time(0) - d_lastsent) > 10) { set buffer; buffer.swap(d_buffer); // thread t([buffer,this]() { if(!d_mute) doSend(buffer); // }); // t.detach(); d_lastsent=time(0); } } void InfluxPusher::doSend(const set& buffer) { MiniCurl mc; MiniCurl::MiniCurlHeaders mch; if(!buffer.empty()) { string newout; for(const auto& nl: buffer) newout.append(nl); /* ofstream infl; infl.open ("infl.txt", std::ofstream::out | std::ofstream::app); infl << newout; */ try { mc.postURL("http://127.0.0.1:8086/write?db="+d_dbname, newout, mch); } catch(std::exception& e) { if(strstr(e.what(), "retention")) return; throw; } } } InfluxPusher::~InfluxPusher() { if(d_dbname != "null") doSend(d_buffer); }