teach navcat to combine different directories
parent
386ad8dd7d
commit
5d1cc3d75f
113
navcat.cc
113
navcat.cc
|
@ -26,7 +26,7 @@ using namespace std;
|
||||||
|
|
||||||
extern const char* g_gitHash;
|
extern const char* g_gitHash;
|
||||||
|
|
||||||
|
// get all stations (numerical) from a directory
|
||||||
vector<uint64_t> getSources(string_view dirname)
|
vector<uint64_t> getSources(string_view dirname)
|
||||||
{
|
{
|
||||||
DIR *dir = opendir(&dirname[0]);
|
DIR *dir = opendir(&dirname[0]);
|
||||||
|
@ -54,8 +54,13 @@ vector<uint64_t> getSources(string_view dirname)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool operator==(const timespec& a, const timespec& b)
|
||||||
|
{
|
||||||
|
return a.tv_sec == b.tv_sec && a.tv_nsec && b.tv_nsec;
|
||||||
|
}
|
||||||
|
|
||||||
void sendProtobuf(string_view dir, vector<uint64_t> stations, time_t startTime, time_t stopTime=0)
|
// send protobuf data from the named directories and stations, between start and stoptime
|
||||||
|
void sendProtobuf(const vector<string>& dirs, vector<uint64_t> stations, time_t startTime, time_t stopTime=0)
|
||||||
{
|
{
|
||||||
timespec start;
|
timespec start;
|
||||||
start.tv_sec = startTime;
|
start.tv_sec = startTime;
|
||||||
|
@ -66,44 +71,54 @@ void sendProtobuf(string_view dir, vector<uint64_t> stations, time_t startTime,
|
||||||
vector<pair<timespec,string> > rnmms;
|
vector<pair<timespec,string> > rnmms;
|
||||||
|
|
||||||
for(;;) {
|
for(;;) {
|
||||||
cerr<<"Gathering data"<<endl;
|
|
||||||
vector<uint64_t> srcs = stations.empty() ? getSources(dir) : stations;
|
|
||||||
rnmms.clear();
|
rnmms.clear();
|
||||||
for(const auto& src: srcs) {
|
for(const auto& dir : dirs) {
|
||||||
string fname = getPath(dir, start.tv_sec, src);
|
cerr<<"Gathering data from "<<humanTime(start.tv_sec)<<" from "<<dir<<".. ";
|
||||||
FILE* fp = fopen(fname.c_str(), "r");
|
|
||||||
if(!fp)
|
vector<uint64_t> srcs = stations.empty() ? getSources(dir) : stations;
|
||||||
continue;
|
int count=0;
|
||||||
uint32_t offset= fpos[fname];
|
for(const auto& src: srcs) {
|
||||||
if(fseek(fp, offset, SEEK_SET) < 0) {
|
string fname = getPath(dir, start.tv_sec, src);
|
||||||
cerr<<"Error seeking: "<<strerror(errno) <<endl;
|
FILE* fp = fopen(fname.c_str(), "r");
|
||||||
fclose(fp);
|
if(!fp)
|
||||||
continue;
|
continue;
|
||||||
}
|
uint32_t offset= fpos[fname];
|
||||||
// cerr <<"Seeked to position "<<fpos[fname]<<" of "<<fname<<endl;
|
if(fseek(fp, offset, SEEK_SET) < 0) {
|
||||||
|
cerr<<"Error seeking: "<<strerror(errno) <<endl;
|
||||||
uint32_t looked=0;
|
fclose(fp);
|
||||||
string msg;
|
continue;
|
||||||
struct timespec ts;
|
|
||||||
while(getRawNMM(fp, ts, msg, offset)) {
|
|
||||||
// don't drop data that is only 5 seconds too old
|
|
||||||
if(make_pair(ts.tv_sec + 5, ts.tv_nsec) >= make_pair(start.tv_sec, start.tv_nsec)) {
|
|
||||||
rnmms.push_back({ts, msg});
|
|
||||||
}
|
}
|
||||||
++looked;
|
// cerr <<"Seeked to position "<<fpos[fname]<<" of "<<fname<<endl;
|
||||||
}
|
|
||||||
// cerr<<"Harvested "<<rnmms.size()<<" events out of "<<looked<<endl;
|
string msg;
|
||||||
fpos[fname]=offset;
|
struct timespec ts;
|
||||||
fclose(fp);
|
while(getRawNMM(fp, ts, msg, offset)) {
|
||||||
}
|
// don't drop data that is only 5 seconds too old
|
||||||
|
if(make_pair(ts.tv_sec + 5, ts.tv_nsec) >= make_pair(start.tv_sec, start.tv_nsec)) {
|
||||||
|
rnmms.push_back({ts, msg});
|
||||||
|
++count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// cerr<<"Harvested "<<rnmms.size()<<" events out of "<<looked<<endl;
|
||||||
|
|
||||||
cerr<<"Sorting data"<<endl;
|
fpos[fname]=offset;
|
||||||
|
fclose(fp);
|
||||||
|
}
|
||||||
|
cerr<<" added "<<count<<endl;
|
||||||
|
}
|
||||||
|
// cerr<<"Sorting data"<<endl;
|
||||||
sort(rnmms.begin(), rnmms.end(), [](const auto& a, const auto& b)
|
sort(rnmms.begin(), rnmms.end(), [](const auto& a, const auto& b)
|
||||||
{
|
{
|
||||||
return std::tie(a.first.tv_sec, a.first.tv_nsec)
|
return std::tie(a.first.tv_sec, a.first.tv_nsec)
|
||||||
< std::tie(b.first.tv_sec, b.first.tv_nsec);
|
< std::tie(b.first.tv_sec, b.first.tv_nsec);
|
||||||
});
|
});
|
||||||
cerr<<"Sending data"<<endl;
|
|
||||||
|
auto newend=unique(rnmms.begin(), rnmms.end());
|
||||||
|
cerr<<"Removed "<<rnmms.end() - newend <<" duplicates, ";
|
||||||
|
|
||||||
|
rnmms.erase(newend, rnmms.end());
|
||||||
|
cerr<<"sending data"<<endl;
|
||||||
|
unsigned int count=0;
|
||||||
for(const auto& nmm: rnmms) {
|
for(const auto& nmm: rnmms) {
|
||||||
if(nmm.first.tv_sec > stopTime)
|
if(nmm.first.tv_sec > stopTime)
|
||||||
break;
|
break;
|
||||||
|
@ -113,8 +128,9 @@ void sendProtobuf(string_view dir, vector<uint64_t> stations, time_t startTime,
|
||||||
buf += nmm.second;
|
buf += nmm.second;
|
||||||
//fwrite(buf.c_str(), 1, buf.size(), stdout);
|
//fwrite(buf.c_str(), 1, buf.size(), stdout);
|
||||||
writen2(1, buf.c_str(), buf.size());
|
writen2(1, buf.c_str(), buf.size());
|
||||||
|
++count;
|
||||||
}
|
}
|
||||||
cerr<<"Done sending"<<endl;
|
cerr<<"Done sending " << count<<" messages"<<endl;
|
||||||
if(3600 + start.tv_sec - (start.tv_sec%3600) < stopTime)
|
if(3600 + start.tv_sec - (start.tv_sec%3600) < stopTime)
|
||||||
start.tv_sec = 3600 + start.tv_sec - (start.tv_sec%3600);
|
start.tv_sec = 3600 + start.tv_sec - (start.tv_sec%3600);
|
||||||
else {
|
else {
|
||||||
|
@ -129,14 +145,16 @@ int main(int argc, char** argv)
|
||||||
bool doVERSION{false};
|
bool doVERSION{false};
|
||||||
|
|
||||||
CLI::App app(program);
|
CLI::App app(program);
|
||||||
string beginarg, endarg, storage;
|
string beginarg, endarg;
|
||||||
app.add_option("--storage,-s", storage, "Location of storage files");
|
vector<string> storages;
|
||||||
|
int galwn{-1};
|
||||||
|
app.add_option("--storage,-s", storages, "Locations of storage files");
|
||||||
vector<uint64_t> stations;
|
vector<uint64_t> stations;
|
||||||
app.add_flag("--version", doVERSION, "show program version and copyright");
|
app.add_flag("--version", doVERSION, "show program version and copyright");
|
||||||
app.add_option("--begin,-b", beginarg, "Begin time (2020-01-01 00:00, or 12:30)")->required();
|
app.add_option("--begin,-b", beginarg, "Begin time (2020-01-01 00:00, or 12:30)");
|
||||||
app.add_option("--end,-e", endarg, "End time. Now if omitted");
|
app.add_option("--end,-e", endarg, "End time. Now if omitted");
|
||||||
app.add_option("--stations", stations, "only send data from listed stations");
|
app.add_option("--stations", stations, "only send data from listed stations");
|
||||||
|
app.add_option("--gal-wn", galwn, "Galileo week number to report on");
|
||||||
CLI11_PARSE(app, argc, argv);
|
CLI11_PARSE(app, argc, argv);
|
||||||
|
|
||||||
|
|
||||||
|
@ -145,9 +163,19 @@ int main(int argc, char** argv)
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
time_t startTime = parseTime(beginarg);
|
time_t startTime, stopTime;
|
||||||
|
if(galwn >=0) {
|
||||||
time_t stopTime = endarg.empty() ? time(0) : parseTime(endarg);
|
startTime=utcFromGST(galwn, 0);
|
||||||
|
stopTime=startTime + 7*86400;
|
||||||
|
}
|
||||||
|
else if(!beginarg.empty()) {
|
||||||
|
startTime = parseTime(beginarg);
|
||||||
|
stopTime = endarg.empty() ? time(0) : parseTime(endarg);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
cerr<<"No time range specified, use -b or --gal-wn"<<endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
cerr<<"Emitting from "<<humanTime(startTime) << " to " << humanTime(stopTime) << endl;
|
cerr<<"Emitting from "<<humanTime(startTime) << " to " << humanTime(stopTime) << endl;
|
||||||
if(!stations.empty()) {
|
if(!stations.empty()) {
|
||||||
|
@ -155,7 +183,6 @@ int main(int argc, char** argv)
|
||||||
for(const auto& s : stations)
|
for(const auto& s : stations)
|
||||||
cerr<<" "<<s;
|
cerr<<" "<<s;
|
||||||
cerr<<endl;
|
cerr<<endl;
|
||||||
}
|
}
|
||||||
sendProtobuf(storage, stations, startTime, stopTime);
|
sendProtobuf(storages, stations, startTime, stopTime);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue