#include "comboaddress.hh" #include "sclasses.hh" #include #include #include "navmon.pb.h" #include "fmt/format.h" #include "fmt/printf.h" #include #include #include #include #include #include #include #include "storage.hh" #include #include #include "CLI/CLI.hpp" #include "version.hh" static char program[]="navnexus"; using namespace std; extern const char* g_gitHash; std::mutex g_clientmut; set g_clients; std::string g_storage; void unixDie(const std::string& str) { throw std::runtime_error(str+string(": ")+string(strerror(errno))); } vector getSources() { DIR *dir = opendir(g_storage.c_str()); if(!dir) unixDie("Listing metrics from statistics storage "+g_storage); struct dirent *result=0; vector ret; for(;;) { errno=0; if(!(result = readdir(dir))) { closedir(dir); if(errno) unixDie("Reading directory entry "+g_storage); else break; } if(result->d_name[0] != '.') { uint64_t src; if(sscanf(result->d_name, "%08" PRIx64, &src)==1) ret.push_back(src); } } sort(ret.begin(), ret.end()); return ret; } void sendSession(int clientfd, ComboAddress client, time_t startTime, time_t stopTime=0) try { cerr<<"New downstream client "< fpos; vector > rnmms; for(;;) { auto srcs = getSources(); rnmms.clear(); for(const auto& src: srcs) { string fname = getPath(g_storage, start.tv_sec, src); int fd = open(fname.c_str(), O_RDONLY); if(fd < 0) continue; uint32_t offset= fpos[fname]; if(lseek(fd, offset, SEEK_SET) < 0) { cout<<"Error seeking: "<= make_pair(start.tv_sec, start.tv_nsec)) { rnmms.push_back({ts, msg}); } ++looked; } // cout<<"Harvested "<first.tv_sec, rnmms.rbegin()->first.tv_nsec}; sleep(1); } } } catch(std::exception& e) { cerr<<"Sender thread died: "<