also make string tags possible, keep more statistics

pull/125/head
bert hubert 2020-07-03 22:33:34 +02:00
parent 2d66645143
commit 852aaddc47
2 changed files with 54 additions and 15 deletions

View File

@ -13,9 +13,8 @@ InfluxPusher::InfluxPusher(std::string_view dbname) : d_dbname(dbname)
void InfluxPusher::queueValue(const std::string& line) void InfluxPusher::queueValue(const std::string& line)
{ {
d_buffer.insert(line); if(!d_buffer.insert(line).second)
// if(d_buffer.insert(line).second) d_numdedupmsmts++;
// cout<<line;
checkSend(); checkSend();
} }
@ -41,6 +40,7 @@ void InfluxPusher::addValueObserver(int src, string_view name, const initializer
buffer+= " "; buffer+= " ";
bool lefirst=true; bool lefirst=true;
for(const auto& v : values) { for(const auto& v : values) {
d_numvalues++;
if(!lefirst) { if(!lefirst) {
buffer +=","; buffer +=",";
} }
@ -48,18 +48,33 @@ void InfluxPusher::addValueObserver(int src, string_view name, const initializer
buffer += string(v.first) + "="+to_string(v.second); buffer += string(v.first) + "="+to_string(v.second);
} }
buffer += " " + to_string((uint64_t)(t* 1000000000))+"\n"; buffer += " " + to_string((uint64_t)(t* 1000000000))+"\n";
d_nummsmts++;
d_msmtmap[(string)name]++;
queueValue(buffer); queueValue(buffer);
} }
void InfluxPusher::addValue(const SatID& id, string_view name, const initializer_list<pair<const char*, var_t>>& values, double t, std::optional<int> src, std::optional<string> tag) void InfluxPusher::addValue(const SatID& id, string_view name, const initializer_list<pair<const char*, var_t>>& values, double t, std::optional<int> src, std::optional<string> tag)
{
vector<pair<string,var_t>> tags{{"sv", id.sv}, {"gnssid", id.gnss}, {"sigid", id.sigid}};
if(src) {
tags.push_back({*tag, *src});
}
addValue(tags, name, values, t);
}
void InfluxPusher::addValue(const vector<pair<string,var_t>>& tags, string_view name, const initializer_list<pair<const char*, var_t>>& values, double t)
{ {
if(d_mute) if(d_mute)
return; return;
if(t > 2200000000 || t < 0) { if(t > 2200000000 || t < 0) {
cerr<<"Unable to store item "<<name<<" for sv "<<id.gnss<<","<<id.sv<<": time out of range "<<t<<endl; cerr<<"Unable to store item "<<name<<" for ";
// for(const auto& t: tags)
// cerr<<t.first<<"="<<t.second<<" ";
cerr<<": time out of range "<<t<<endl;
return; return;
} }
for(const auto& p : values) { for(const auto& p : values) {
@ -68,13 +83,26 @@ void InfluxPusher::addValue(const SatID& id, string_view name, const initializer
return; return;
} }
string buffer = string(name) +",gnssid="+to_string(id.gnss)+",sv=" +to_string(id.sv)+",sigid="+to_string(id.sigid); string buffer = string(name);
if(src) for(const auto& t : tags) {
buffer += ","+*tag+"="+to_string(*src); buffer += ","+t.first + "=";
std::visit([&buffer](auto&& arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, string>) {
// string tags really don't need a "
buffer += arg;
}
else {
// resist the urge to do integer tags, it sucks
buffer += to_string(arg);
}
}, t.second);
}
buffer+= " "; buffer+= " ";
bool lefirst=true; bool lefirst=true;
for(const auto& v : values) { for(const auto& v : values) {
d_numvalues++;
if(!lefirst) { if(!lefirst) {
buffer +=","; buffer +=",";
} }
@ -83,14 +111,19 @@ void InfluxPusher::addValue(const SatID& id, string_view name, const initializer
std::visit([&buffer](auto&& arg) { std::visit([&buffer](auto&& arg) {
using T = std::decay_t<decltype(arg)>; using T = std::decay_t<decltype(arg)>;
buffer += to_string(arg); if constexpr (std::is_same_v<T, string>)
if constexpr (!std::is_same_v<T, double>) buffer += "\""+arg+"\"";
buffer+="i"; else {
buffer += to_string(arg);
if constexpr (!std::is_same_v<T, double>)
buffer+="i";
}
}, v.second); }, v.second);
} }
buffer += " " + to_string((uint64_t)(t*1000000000))+"\n"; buffer += " " + to_string((uint64_t)(t*1000000000))+"\n";
d_nummsmts++;
d_msmtmap[(string)name]++;
queueValue(buffer); queueValue(buffer);
} }

View File

@ -8,11 +8,13 @@
struct InfluxPusher struct InfluxPusher
{ {
typedef std::variant<double, int32_t, uint32_t, int64_t> var_t; typedef std::variant<double, int32_t, uint32_t, int64_t, string> var_t;
explicit InfluxPusher(std::string_view dbname); explicit InfluxPusher(std::string_view dbname);
void addValueObserver(int src, std::string_view name, const std::initializer_list<std::pair<const char*, double>>& values, double t, std::optional<SatID> satid=std::optional<SatID>()); void addValueObserver(int src, std::string_view name, const std::initializer_list<std::pair<const char*, double>>& values, double t, std::optional<SatID> satid=std::optional<SatID>());
void addValue(const SatID& id, std::string_view name, const std::initializer_list<std::pair<const char*, var_t>>& values, double t, std::optional<int> src = std::optional<int>(), std::optional<string> tag = std::optional<string>("src")); void addValue(const SatID& id, std::string_view name, const std::initializer_list<std::pair<const char*, var_t>>& values, double t, std::optional<int> src = std::optional<int>(), std::optional<string> tag = std::optional<string>("src"));
void addValue(const vector<pair<string, var_t>>& tags, string_view name, const initializer_list<pair<const char*, var_t>>& values, double t);
void checkSend(); void checkSend();
void doSend(const std::set<std::string>& buffer); void doSend(const std::set<std::string>& buffer);
~InfluxPusher(); ~InfluxPusher();
@ -22,4 +24,8 @@ struct InfluxPusher
time_t d_lastsent{0}; time_t d_lastsent{0};
string d_dbname; string d_dbname;
bool d_mute{false}; bool d_mute{false};
int64_t d_nummsmts{0};
int64_t d_numvalues{0};
int64_t d_numdedupmsmts{0};
map<std::string, int64_t> d_msmtmap;
}; };