From 7eeb94c8c568cbf43ec79cf6cb239290d0705c03 Mon Sep 17 00:00:00 2001 From: bert hubert Date: Sun, 11 Aug 2019 10:43:29 +0200 Subject: [PATCH] move to dumb storage --- Makefile | 11 +++-- README.md | 19 ++++++-- navnexus.cc | 134 +++++++++++++++++++++------------------------------- navparse.cc | 27 ++++++++--- navrecv.cc | 115 ++++++++++++++++++++++++++++++++++++++++++++ storage.cc | 40 ++++++++++++++++ storage.hh | 7 +++ ubxtool.cc | 4 +- 8 files changed, 264 insertions(+), 93 deletions(-) create mode 100644 navrecv.cc create mode 100644 storage.cc create mode 100644 storage.hh diff --git a/Makefile b/Makefile index 59409ed..b356d15 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ CXXFLAGS:= -std=gnu++17 -Wall -O3 -MMD -MP -ggdb -fno-omit-frame-pointer -Iext/fmt-5.2.1/include/ -Iext/powerblog/ext/simplesocket -Iext/powerblog/ext/ -PROGRAMS = navparse ubxtool navnexus +PROGRAMS = navparse ubxtool navnexus navrecv all: $(PROGRAMS) @@ -15,8 +15,13 @@ SIMPLESOCKETS=ext/powerblog/ext/simplesocket/swrappers.o ext/powerblog/ext/simpl navparse: navparse.o ext/fmt-5.2.1/src/format.o $(H2OPP) $(SIMPLESOCKETS) minicurl.o ubx.o bits.o navmon.pb.o g++ -std=gnu++17 $^ -o $@ -pthread -L/usr/local/lib -lh2o-evloop -lssl -lcrypto -lz -lcurl -lprotobuf # -lwslay -navnexus: navnexus.o ext/fmt-5.2.1/src/format.o $(H2OPP) $(SIMPLESOCKETS) minicurl.o ubx.o bits.o navmon.pb.o - g++ -std=gnu++17 $^ -o $@ -pthread -L/usr/local/lib -lh2o-evloop -lssl -lcrypto -lz -lcurl -lprotobuf # -lwslay +navnexus: navnexus.o ext/fmt-5.2.1/src/format.o $(SIMPLESOCKETS) ubx.o bits.o navmon.pb.o storage.o + g++ -std=gnu++17 $^ -o $@ -pthread -lprotobuf + +navrecv: navrecv.o ext/fmt-5.2.1/src/format.o $(SIMPLESOCKETS) navmon.pb.o storage.o + g++ -std=gnu++17 $^ -o $@ -pthread -lprotobuf + + navmon.pb.h: navmon.proto diff --git a/README.md b/README.md index baf839a..b86f082 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Tooling: Performs some best effort buffering & will reconnect if needed. UPDATE -> will be part of ubxtool * navrecv: receives GNSS NAV frames and stores them on disk, split out per - sender. UPDATE -> part of navnexus + sender. * navstore: tails the file stored by navrecv, puts them in LMDB. UPDATE -> part of navnexus * navstream: produces a stream of NAV updates from all sources, with a few @@ -37,10 +37,23 @@ rerunning 'navstore' and 'navinflux'. Big TODO -------- - * Oddity of ephemeris age in doppler graph (possibly wider) + * Dual goals: completeness, liveness, not the same + For forensics, great if the packet is there + For display, not that bad if we missed a message * Navnexus permanence + lmdb? sqlite? + * It looks like we have some ups and downs in WN/TOW + * In general, consider refeed strategy + Raw serial + Protobuf + Influxdb + ".csv files" + * Delivery needs to be bit more stateful (queue) + * Semantics definition for output of Navnexus - * Idempotency and retransmit by ubxtool + "we'll never surprise you with old data" + + ubxtool ------- diff --git a/navnexus.cc b/navnexus.cc index 2cc9a4d..477bd6c 100644 --- a/navnexus.cc +++ b/navnexus.cc @@ -6,11 +6,15 @@ #include "fmt/format.h" #include "fmt/printf.h" #include - - #include #include #include +#include +#include +#include +#include "storage.hh" +#include + using namespace std; std::mutex g_clientmut; @@ -20,67 +24,10 @@ std::mutex g_dedupmut; set> g_dedup; -int g_store; +std::string g_storage; std::multimap, string> g_history; -void recvSession(int s, ComboAddress client) -{ - cerr<<"Receiving messages from "< lg(g_dedupmut); - if(g_dedup.count({nmm.gi().gnssid(), nmm.gi().gnsssv(), nmm.gi().gnsswn(), nmm.gi().gnsstow()})) { - // cerr<<"Dedupped message from "<< nmm.sourceid()<<" "<< fmt::format("{0} {1} {2} {3}", nmm.gi().gnssid(), nmm.gi().gnsssv(), nmm.gi().gnsswn(), nmm.gi().gnsstow()) << endl; - continue; - } - // cerr<<"New message from "<< nmm.sourceid()<<" "<< fmt::format("{0} {1} {2} {3}", nmm.gi().gnssid(), nmm.gi().gnsssv(), nmm.gi().gnsswn(), nmm.gi().gnsstow()) << endl; - g_dedup.insert({nmm.gi().gnssid(), nmm.gi().gnsssv(), nmm.gi().gnsswn(), nmm.gi().gnsstow()}); - } - else - ; // cerr<<"Not an inav message "<< (int) nmm.type()< getSources() +{ + DIR *dir = opendir(g_storage.c_str()); + if(!dir) + unixDie("Listing metrics from statistics storage "+g_storage); + struct dirent *result=0; + vector ret; + for(;;) { + errno=0; + if(!(result = readdir(dir))) { + closedir(dir); + if(errno) + unixDie("Reading directory entry "+g_storage); + else + break; + } + if(result->d_name[0] != '.') { + uint64_t src; + if(sscanf(result->d_name, "%08lx", &src)==1) + ret.push_back(src); + } + } + + sort(ret.begin(), ret.end()); + return ret; +} + int main(int argc, char** argv) { signal(SIGPIPE, SIG_IGN); - - g_store = open("permanent", O_CREAT | O_APPEND | O_WRONLY, 0666); - if(g_store < 0) { - cerr<<"Unable to open permanent storage file"< "< page) if(wtype == 0) { if(getbitu(&page[0], 6,2) == 2) { wn = getbitu(&page[0], 96, 12); - tow = getbitu(&page[0], 108, 20); + if(tow != getbitu(&page[0], 108, 20)) { + cerr<<"wtype "<=1 && wtype <= 4) { // ephemeris @@ -219,7 +222,10 @@ void SVStat::addWord(std::basic_string_view page) e5bdvs = getbitu(&page[0], 71, 1); e1bdvs = getbitu(&page[0], 72, 1); wn = getbitu(&page[0], 73, 12); - tow = getbitu(&page[0], 85, 20); + if(tow != getbitu(&page[0], 85, 20)) + { + cerr<<"wtype "< page) dtLSF = getbits(&page[0], 97, 8); // cout<<(int) dtLS << " " <<(int) wnLSF<<" " <<(int) dn <<" " <<(int) dtLSF< inav((uint8_t*)nmm.gi().contents().c_str(), nmm.gi().contents().size()); int sv = nmm.gi().gnsssv(); g_svstats[sv].wn = nmm.gi().gnsswn(); + unsigned int wtype = getbitu(&inav[0], 0, 6); + if(1) { + // cout< nmm.gi().gnsstow()) { + cout<<" wtype "< "< +#include +#include "navmon.pb.h" +#include "fmt/format.h" +#include "fmt/printf.h" +#include +#include "storage.hh" + +#include +#include +#include +using namespace std; + +/* Goals in life: + + 1) NEVER EVER GO DOWN + 2) Receive all frames without thinking (see 1) + 3) Put the frames in a scalable directory structure +*/ + +string g_storagedir="./storage/"; + +struct FatalException : public std::runtime_error +{ + FatalException(const std::string& str) : std::runtime_error(str){} +}; + +void writeToDisk(time_t s, uint64_t sourceid, std::string_view message) +{ + auto path = getPath(g_storagedir, s, sourceid, true); + int fd = open(path.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0600); + if(fd < 0) { + throw FatalException("Unable to open file for storage: "+string(strerror(errno))); + } + int res = write(fd, &message[0], message.size()); + if(res < 0) { + close(fd); + throw FatalException("Unable to open file for storage: "+string(strerror(errno))); + } + if((unsigned int)res != message.size()) { + close(fd); + throw FatalException("Partial write to storage"); + } + close(fd); +} + +void recvSession(int s, ComboAddress client) +{ + Socket sock(s); + cerr<<"Receiving messages from "< +#include +#include + +using namespace std; + + +std::string getPath(std::string_view root, time_t s, uint64_t sourceid, bool create) +{ + auto comps = getPathComponents(root, s, sourceid); + std::string path; + for(unsigned int pos = 0; pos < comps.size() - 1 ; ++pos) { + path += comps[pos] +"/"; + if(create) + mkdir(path.c_str(), 0770); + } + path += comps[comps.size()-1]+".gnss"; + return path; +} + + +vector getPathComponents(std::string_view root, time_t s, uint64_t sourceid) +{ + // path: source/year/month/day/hour.pb + vector ret; + ret.push_back((string)root); + ret.push_back(fmt::sprintf("%08x", sourceid)); + + struct tm tm; + gmtime_r(&s, &tm); + + ret.push_back(to_string(tm.tm_year+1900)); + ret.push_back(to_string(tm.tm_mon+1)); + ret.push_back(to_string(tm.tm_mday+1)); + ret.push_back(to_string(tm.tm_hour)+".pb"); + return ret; +} diff --git a/storage.hh b/storage.hh new file mode 100644 index 0000000..87f89d5 --- /dev/null +++ b/storage.hh @@ -0,0 +1,7 @@ +#pragma once +#include +#include +#include + +std::vector getPathComponents(std::string_view root, time_t s, uint64_t sourceid); +std::string getPath(std::string_view root, time_t s, uint64_t sourceid, bool create=false); diff --git a/ubxtool.cc b/ubxtool.cc index 7251000..3939812 100644 --- a/ubxtool.cc +++ b/ubxtool.cc @@ -359,7 +359,7 @@ int main(int argc, char** argv) */ std::map, struct timeval> lasttv, tv; - unsigned int curCycleTOW{0}; + int curCycleTOW{-1}; // means invalid for(;;) { try { @@ -513,6 +513,8 @@ int main(int argc, char** argv) curCycleTOW = satTOW - (satTOW %30); } else { + if(curCycleTOW < 0) // did not yet have a start of cycle + continue; cerr<<" "<