casual 10-fold speedup of navnexus
parent
9776a0882d
commit
95f4502928
47
navnexus.cc
47
navnexus.cc
|
@ -60,14 +60,14 @@ try
|
||||||
{
|
{
|
||||||
cerr<<"New downstream client "<<client.toStringWithPort() << endl;
|
cerr<<"New downstream client "<<client.toStringWithPort() << endl;
|
||||||
|
|
||||||
pair<uint64_t, uint64_t> start = {startTime, 0};
|
pair<int64_t, int64_t> start(startTime, 0);
|
||||||
|
|
||||||
// so we have a ton of files, and internally these are not ordered
|
// so we have a ton of files, and internally these are not ordered
|
||||||
map<string,uint32_t> fpos;
|
map<string,uint32_t> fpos;
|
||||||
vector<NavMonMessage> nmms;
|
vector<pair<timespec,string> > rnmms;
|
||||||
for(;;) {
|
for(;;) {
|
||||||
auto srcs = getSources();
|
auto srcs = getSources();
|
||||||
nmms.clear();
|
rnmms.clear();
|
||||||
for(const auto& src: srcs) {
|
for(const auto& src: srcs) {
|
||||||
string fname = getPath(g_storage, start.first, src);
|
string fname = getPath(g_storage, start.first, src);
|
||||||
int fd = open(fname.c_str(), O_RDONLY);
|
int fd = open(fname.c_str(), O_RDONLY);
|
||||||
|
@ -83,43 +83,42 @@ try
|
||||||
NavMonMessage nmm;
|
NavMonMessage nmm;
|
||||||
|
|
||||||
uint32_t looked=0;
|
uint32_t looked=0;
|
||||||
while(getNMM(fd, nmm, offset)) {
|
string msg;
|
||||||
|
struct timespec ts;
|
||||||
|
|
||||||
|
while(getRawNMM(fd, ts, msg, offset)) {
|
||||||
// don't drop data that is only 5 seconds too old
|
// don't drop data that is only 5 seconds too old
|
||||||
if(make_pair(nmm.localutcseconds() + 5, nmm.localutcnanoseconds()) >= start) {
|
if(make_pair(ts.tv_sec + 5, ts.tv_nsec) >= start) {
|
||||||
nmms.push_back(nmm);
|
rnmms.push_back({ts, msg});
|
||||||
}
|
}
|
||||||
++looked;
|
++looked;
|
||||||
}
|
}
|
||||||
cout<<"Harvested "<<nmms.size()<<" events out of "<<looked<<endl;
|
// cout<<"Harvested "<<rnmms.size()<<" events out of "<<looked<<endl;
|
||||||
fpos[fname]=offset;
|
fpos[fname]=offset;
|
||||||
close(fd);
|
close(fd);
|
||||||
}
|
}
|
||||||
sort(nmms.begin(), nmms.end(), [](const auto& a, const auto& b)
|
cout<<"Sorting.. ";
|
||||||
|
cout.flush();
|
||||||
|
sort(rnmms.begin(), rnmms.end(), [](const auto& a, const auto& b)
|
||||||
{
|
{
|
||||||
return make_pair(a.localutcseconds(), a.localutcnanoseconds()) <
|
return std::tie(a.first.tv_sec, a.first.tv_nsec)
|
||||||
make_pair(b.localutcseconds(), b.localutcnanoseconds());
|
< std::tie(b.first.tv_sec, b.first.tv_nsec);
|
||||||
});
|
});
|
||||||
|
cout<<"Sending.. ";
|
||||||
for(const auto& nmm: nmms) {
|
cout.flush();
|
||||||
std::string out;
|
for(const auto& nmm: rnmms) {
|
||||||
try {
|
|
||||||
nmm.SerializeToString(&out);
|
|
||||||
}
|
|
||||||
catch(std::exception& e) {
|
|
||||||
cerr<<"Something went wrong during serialization, skipping: "<<e.what()<<endl;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
std::string buf="bert";
|
std::string buf="bert";
|
||||||
uint16_t len = htons(out.size());
|
uint16_t len = htons(nmm.second.size());
|
||||||
buf.append((char*)(&len), 2);
|
buf.append((char*)(&len), 2);
|
||||||
buf+=out;
|
buf += nmm.second;
|
||||||
SWriten(clientfd, buf);
|
SWriten(clientfd, buf);
|
||||||
}
|
}
|
||||||
|
cout<<"Done"<<endl;
|
||||||
if(3600 + start.first - (start.first%3600) < time(0))
|
if(3600 + start.first - (start.first%3600) < time(0))
|
||||||
start.first = 3600 + start.first - (start.first%3600);
|
start.first = 3600 + start.first - (start.first%3600);
|
||||||
else {
|
else {
|
||||||
if(!nmms.empty())
|
if(!rnmms.empty())
|
||||||
start = {nmms.rbegin()->localutcseconds(), nmms.rbegin()->localutcnanoseconds()};
|
start = {rnmms.rbegin()->first.tv_sec, rnmms.rbegin()->first.tv_nsec};
|
||||||
sleep(1);
|
sleep(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
26
storage.cc
26
storage.cc
|
@ -79,6 +79,32 @@ bool getNMM(FILE* fp, NavMonMessage& nmm, uint32_t& offset)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getRawNMM(int fd, timespec& t, string& raw, uint32_t& offset)
|
||||||
|
{
|
||||||
|
char bert[4];
|
||||||
|
if(read(fd, bert, 4) != 4 || bert[0]!='b' || bert[1]!='e' || bert[2] !='r' || bert[3]!='t') {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
uint16_t len;
|
||||||
|
if(read(fd, &len, 2) != 2)
|
||||||
|
return false;
|
||||||
|
len = htons(len);
|
||||||
|
char buffer[len];
|
||||||
|
if(read(fd, buffer, len) != len)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
NavMonMessage nmm;
|
||||||
|
raw.assign(buffer, len);
|
||||||
|
nmm.ParseFromString(raw);
|
||||||
|
t.tv_sec = nmm.localutcseconds();
|
||||||
|
t.tv_nsec = nmm.localutcnanoseconds();
|
||||||
|
offset += 4 + 2 + len;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
bool getRawNMM(FILE* fp, timespec& t, string& raw, uint32_t& offset)
|
bool getRawNMM(FILE* fp, timespec& t, string& raw, uint32_t& offset)
|
||||||
{
|
{
|
||||||
|
|
|
@ -6,6 +6,9 @@
|
||||||
|
|
||||||
std::vector<std::string> getPathComponents(std::string_view root, time_t s, uint64_t sourceid);
|
std::vector<std::string> getPathComponents(std::string_view root, time_t s, uint64_t sourceid);
|
||||||
std::string getPath(std::string_view root, time_t s, uint64_t sourceid, bool create=false);
|
std::string getPath(std::string_view root, time_t s, uint64_t sourceid, bool create=false);
|
||||||
|
/*
|
||||||
bool getNMM(int fd, NavMonMessage& nmm, uint32_t& offset);
|
bool getNMM(int fd, NavMonMessage& nmm, uint32_t& offset);
|
||||||
bool getNMM(FILE* fp, NavMonMessage& nmm, uint32_t& offset);
|
bool getNMM(FILE* fp, NavMonMessage& nmm, uint32_t& offset);
|
||||||
|
*/
|
||||||
|
bool getRawNMM(int fd, timespec& t, std::string& raw, uint32_t& offset);
|
||||||
bool getRawNMM(FILE* fp, timespec& t, std::string& raw, uint32_t& offset);
|
bool getRawNMM(FILE* fp, timespec& t, std::string& raw, uint32_t& offset);
|
||||||
|
|
Loading…
Reference in New Issue