move to dumb storage

pull/1/head
bert hubert 2019-08-11 10:43:29 +02:00
parent 2da36dac45
commit 7eeb94c8c5
8 changed files with 264 additions and 93 deletions

View File

@ -1,6 +1,6 @@
CXXFLAGS:= -std=gnu++17 -Wall -O3 -MMD -MP -ggdb -fno-omit-frame-pointer -Iext/fmt-5.2.1/include/ -Iext/powerblog/ext/simplesocket -Iext/powerblog/ext/
PROGRAMS = navparse ubxtool navnexus
PROGRAMS = navparse ubxtool navnexus navrecv
all: $(PROGRAMS)
@ -15,8 +15,13 @@ SIMPLESOCKETS=ext/powerblog/ext/simplesocket/swrappers.o ext/powerblog/ext/simpl
navparse: navparse.o ext/fmt-5.2.1/src/format.o $(H2OPP) $(SIMPLESOCKETS) minicurl.o ubx.o bits.o navmon.pb.o
g++ -std=gnu++17 $^ -o $@ -pthread -L/usr/local/lib -lh2o-evloop -lssl -lcrypto -lz -lcurl -lprotobuf # -lwslay
navnexus: navnexus.o ext/fmt-5.2.1/src/format.o $(H2OPP) $(SIMPLESOCKETS) minicurl.o ubx.o bits.o navmon.pb.o
g++ -std=gnu++17 $^ -o $@ -pthread -L/usr/local/lib -lh2o-evloop -lssl -lcrypto -lz -lcurl -lprotobuf # -lwslay
navnexus: navnexus.o ext/fmt-5.2.1/src/format.o $(SIMPLESOCKETS) ubx.o bits.o navmon.pb.o storage.o
g++ -std=gnu++17 $^ -o $@ -pthread -lprotobuf
navrecv: navrecv.o ext/fmt-5.2.1/src/format.o $(SIMPLESOCKETS) navmon.pb.o storage.o
g++ -std=gnu++17 $^ -o $@ -pthread -lprotobuf
navmon.pb.h: navmon.proto

View File

@ -12,7 +12,7 @@ Tooling:
Performs some best effort buffering & will reconnect if needed.
UPDATE -> will be part of ubxtool
* navrecv: receives GNSS NAV frames and stores them on disk, split out per
sender. UPDATE -> part of navnexus
sender.
* navstore: tails the file stored by navrecv, puts them in LMDB.
UPDATE -> part of navnexus
* navstream: produces a stream of NAV updates from all sources, with a few
@ -37,10 +37,23 @@ rerunning 'navstore' and 'navinflux'.
Big TODO
--------
* Oddity of ephemeris age in doppler graph (possibly wider)
* Dual goals: completeness, liveness, not the same
For forensics, great if the packet is there
For display, not that bad if we missed a message
* Navnexus permanence
lmdb? sqlite?
* It looks like we have some ups and downs in WN/TOW
* In general, consider refeed strategy
Raw serial
Protobuf
Influxdb
".csv files"
* Delivery needs to be bit more stateful (queue)
* Semantics definition for output of Navnexus
* Idempotency and retransmit by ubxtool
"we'll never surprise you with old data"
ubxtool
-------

View File

@ -6,11 +6,15 @@
#include "fmt/format.h"
#include "fmt/printf.h"
#include <mutex>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdexcept>
#include <sys/types.h>
#include "storage.hh"
#include <dirent.h>
using namespace std;
std::mutex g_clientmut;
@ -20,67 +24,10 @@ std::mutex g_dedupmut;
set<std::tuple<int, int, int, int>> g_dedup;
int g_store;
std::string g_storage;
std::multimap<pair<uint32_t,uint32_t>, string> g_history;
void recvSession(int s, ComboAddress client)
{
cerr<<"Receiving messages from "<<client.toStringWithPort()<<endl;
for(;;) {
string part=SRead(s, 4);
if(part != "bert") {
cerr << "Wrong magic!"<<endl;
break;
}
string out=part;
part = SRead(s, 2);
out += part;
uint16_t len;
memcpy(&len, part.c_str(), 2);
len = htons(len);
part = SRead(s, len);
out += part;
NavMonMessage nmm;
nmm.ParseFromString(part);
// cerr<<nmm.sourceid()<<" ";
if(nmm.type() == NavMonMessage::GalileoInavType) {
std::lock_guard<std::mutex> lg(g_dedupmut);
if(g_dedup.count({nmm.gi().gnssid(), nmm.gi().gnsssv(), nmm.gi().gnsswn(), nmm.gi().gnsstow()})) {
// cerr<<"Dedupped message from "<< nmm.sourceid()<<" "<< fmt::format("{0} {1} {2} {3}", nmm.gi().gnssid(), nmm.gi().gnsssv(), nmm.gi().gnsswn(), nmm.gi().gnsstow()) << endl;
continue;
}
// cerr<<"New message from "<< nmm.sourceid()<<" "<< fmt::format("{0} {1} {2} {3}", nmm.gi().gnssid(), nmm.gi().gnsssv(), nmm.gi().gnsswn(), nmm.gi().gnsstow()) << endl;
g_dedup.insert({nmm.gi().gnssid(), nmm.gi().gnsssv(), nmm.gi().gnsswn(), nmm.gi().gnsstow()});
}
else
; // cerr<<"Not an inav message "<< (int) nmm.type()<<endl;
g_history.insert({{nmm.localutcseconds(), nmm.localutcnanoseconds()}, out});
if(write(g_store, out.c_str(), out.size()) != out.size()) {
cerr<<"Failed to store message in buffer"<<endl;
}
for(const auto& fd : g_clients) {
SWrite(fd, out);
}
}
}
void recvListener(Socket&& s, ComboAddress local)
{
for(;;) {
ComboAddress remote=local;
int fd = SAccept(s, remote);
std::thread t(recvSession, fd, remote);
t.detach();
}
}
void sendSession(int s, ComboAddress client)
{
@ -116,29 +63,59 @@ void sendListener(Socket&& s, ComboAddress local)
}
void unixDie(const std::string& str)
{
throw std::runtime_error(str+string(": ")+string(strerror(errno)));
}
vector<uint64_t> getSources()
{
DIR *dir = opendir(g_storage.c_str());
if(!dir)
unixDie("Listing metrics from statistics storage "+g_storage);
struct dirent *result=0;
vector<uint64_t> ret;
for(;;) {
errno=0;
if(!(result = readdir(dir))) {
closedir(dir);
if(errno)
unixDie("Reading directory entry "+g_storage);
else
break;
}
if(result->d_name[0] != '.') {
uint64_t src;
if(sscanf(result->d_name, "%08lx", &src)==1)
ret.push_back(src);
}
}
sort(ret.begin(), ret.end());
return ret;
}
int main(int argc, char** argv)
{
signal(SIGPIPE, SIG_IGN);
g_store = open("permanent", O_CREAT | O_APPEND | O_WRONLY, 0666);
if(g_store < 0) {
cerr<<"Unable to open permanent storage file"<<endl;
if(argc != 3) {
cout<<"Syntax: navnexus storage listen-address"<<endl;
return(EXIT_FAILURE);
}
g_storage=argv[1];
for(;;) {
auto srcs = getSources();
for(const auto& s: srcs) {
time_t t = time(0);
cout<<s <<" -> "<<getPath(g_storage, t, s) << " & " << getPath(g_storage, t-3600, s) << endl;
}
sleep(5);
}
ComboAddress recvaddr("0.0.0.0", 29600);
Socket receiver(recvaddr.sin4.sin_family, SOCK_STREAM, 0);
SSetsockopt(receiver,SOL_SOCKET, SO_REUSEADDR, 1 );
SBind(receiver, recvaddr);
SListen(receiver, 128);
thread recvThread(recvListener, std::move(receiver), recvaddr);
recvThread.detach();
ComboAddress sendaddr("0.0.0.0", 29601);
#if 0
ComboAddress sendaddr(argv[2], 29601);
Socket sender(sendaddr.sin4.sin_family, SOCK_STREAM);
SSetsockopt(sender, SOL_SOCKET, SO_REUSEADDR, 1 );
SBind(sender, sendaddr);
@ -147,10 +124,7 @@ int main(int argc, char** argv)
thread sendThread(sendListener, std::move(sender), sendaddr);
sendThread.detach();
for(;;) {
sleep(1);
}
#endif
}

View File

@ -184,7 +184,10 @@ void SVStat::addWord(std::basic_string_view<uint8_t> page)
if(wtype == 0) {
if(getbitu(&page[0], 6,2) == 2) {
wn = getbitu(&page[0], 96, 12);
tow = getbitu(&page[0], 108, 20);
if(tow != getbitu(&page[0], 108, 20)) {
cerr<<"wtype "<<wtype<<", was about to mis-set TOW, " <<tow<< " " <<getbitu(&page[0], 108, 20) <<endl;
}
}
}
else if(wtype >=1 && wtype <= 4) { // ephemeris
@ -219,7 +222,10 @@ void SVStat::addWord(std::basic_string_view<uint8_t> page)
e5bdvs = getbitu(&page[0], 71, 1);
e1bdvs = getbitu(&page[0], 72, 1);
wn = getbitu(&page[0], 73, 12);
tow = getbitu(&page[0], 85, 20);
if(tow != getbitu(&page[0], 85, 20))
{
cerr<<"wtype "<<wtype<<", was about to mis-set TOW"<<endl;
}
}
else if(wtype == 6) {
a0 = getbits(&page[0], 6, 32);
@ -233,7 +239,9 @@ void SVStat::addWord(std::basic_string_view<uint8_t> page)
dtLSF = getbits(&page[0], 97, 8);
// cout<<(int) dtLS << " " <<(int) wnLSF<<" " <<(int) dn <<" " <<(int) dtLSF<<endl;
tow = getbitu(&page[0], 105, 20);
if(tow != getbitu(&page[0], 105, 20)) {
cerr<<"wtype "<<wtype<<", was about to mis-set TOW"<<endl;
}
}
else if(wtype == 10) { // GSTT GPS
a0g = getbits(&page[0], 86, 16);
@ -714,6 +722,13 @@ try
basic_string<uint8_t> inav((uint8_t*)nmm.gi().contents().c_str(), nmm.gi().contents().size());
int sv = nmm.gi().gnsssv();
g_svstats[sv].wn = nmm.gi().gnsswn();
unsigned int wtype = getbitu(&inav[0], 0, 6);
if(1) {
// cout<<sv <<"\t" << wtype << "\t" << nmm.gi().gnsstow() << "\t"<< nmm.sourceid() << endl;
if(g_svstats[sv].tow > nmm.gi().gnsstow()) {
cout<<" wtype "<<wtype<<", was about to set tow backwards for "<<sv<<", "<<g_svstats[sv].tow << " > "<<nmm.gi().gnsstow()<<", " << ((signed)g_svstats[sv].tow - (signed)nmm.gi().gnsstow()) << ", source "<<nmm.sourceid()<<endl;
}
}
g_svstats[sv].tow = nmm.gi().gnsstow();
g_svstats[sv].perrecv[nmm.sourceid()].wn = nmm.gi().gnsswn();
@ -723,7 +738,7 @@ try
// for(auto& c : inav)
// fmt::printf("%02x ", c);
unsigned int wtype = getbitu(&inav[0], 0, 6);
g_svstats[sv].addWord(inav);
if(g_svstats[sv].e1bhs || g_svstats[sv].e5bhs || g_svstats[sv].e1bdvs || g_svstats[sv].e5bdvs) {
@ -858,7 +873,7 @@ try
Vector us2sat(us, sat);
Vector speed;
getSpeed(nmm.rfd().rcvwn(), nmm.rfd().rcvtow(), g_svstats[sv].liveIOD(), &speed);
cout<<sv<<" radius: "<<Vector(core, sat).length()<<", distance: "<<us2sat.length()<<", orbital velocity: "<<speed.length()/1000.0<<" km/s, ";
// cout<<sv<<" radius: "<<Vector(core, sat).length()<<", distance: "<<us2sat.length()<<", orbital velocity: "<<speed.length()/1000.0<<" km/s, ";
Vector core2us(core, us);
Vector dx(us, sat); // = x-ourx, dy = y-oury, dz = z-ourz;
@ -874,7 +889,7 @@ try
// be careful with time here -
double ephage = ephAge(nmm.rfd().rcvtow(), g_svstats[sv].liveIOD().t0e*60);
cout<<"Radial velocity: "<< radvel<<", predicted doppler: "<< preddop << ", measured doppler: "<<nmm.rfd().doppler()<<endl;
// cout<<"Radial velocity: "<< radvel<<", predicted doppler: "<< preddop << ", measured doppler: "<<nmm.rfd().doppler()<<endl;
dopplercsv << std::fixed << utcFromGST(g_svstats[sv].wn, nmm.rfd().rcvtow()) <<" " << nmm.rfd().gnssid() <<" " <<sv<<" "<<nmm.rfd().pseudorange()<<" "<< nmm.rfd().carrierphase() <<" " << nmm.rfd().doppler()<<" " << preddop << " " << Vector(us, sat).length() << " " <<radvel <<" " << nmm.rfd().locktimems()<<" " <<ephage << " " << nmm.rfd().prstd() << " " << nmm.rfd().cpstd() <<" " <<
nmm.rfd().dostd() << endl;
}

115
navrecv.cc 100644
View File

@ -0,0 +1,115 @@
#include "comboaddress.hh"
#include "sclasses.hh"
#include <thread>
#include <signal.h>
#include "navmon.pb.h"
#include "fmt/format.h"
#include "fmt/printf.h"
#include <mutex>
#include "storage.hh"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
using namespace std;
/* Goals in life:
1) NEVER EVER GO DOWN
2) Receive all frames without thinking (see 1)
3) Put the frames in a scalable directory structure
*/
string g_storagedir="./storage/";
struct FatalException : public std::runtime_error
{
FatalException(const std::string& str) : std::runtime_error(str){}
};
void writeToDisk(time_t s, uint64_t sourceid, std::string_view message)
{
auto path = getPath(g_storagedir, s, sourceid, true);
int fd = open(path.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0600);
if(fd < 0) {
throw FatalException("Unable to open file for storage: "+string(strerror(errno)));
}
int res = write(fd, &message[0], message.size());
if(res < 0) {
close(fd);
throw FatalException("Unable to open file for storage: "+string(strerror(errno)));
}
if((unsigned int)res != message.size()) {
close(fd);
throw FatalException("Partial write to storage");
}
close(fd);
}
void recvSession(int s, ComboAddress client)
{
Socket sock(s);
cerr<<"Receiving messages from "<<client.toStringWithPort()<<endl;
for(;;) {
string part=SRead(sock, 4);
if(part != "bert") {
cerr << "Wrong magic!"<<endl;
break;
}
string out=part;
part = SRead(s, 2);
out += part;
uint16_t len;
memcpy(&len, part.c_str(), 2);
len = htons(len);
part = SRead(s, len);
out += part;
NavMonMessage nmm;
nmm.ParseFromString(part);
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
}
}
void recvListener(Socket&& s, ComboAddress local)
{
for(;;) {
ComboAddress remote=local;
int fd = SAccept(s, remote);
std::thread t(recvSession, fd, remote);
t.detach();
}
}
int main(int argc, char** argv)
{
signal(SIGPIPE, SIG_IGN);
if(argc != 3) {
cout<<"Syntax: navrecv listen-address storage"<<endl;
return EXIT_FAILURE;
}
g_storagedir=argv[2];
ComboAddress recvaddr(argv[1], 29603);
Socket receiver(recvaddr.sin4.sin_family, SOCK_STREAM, 0);
SSetsockopt(receiver,SOL_SOCKET, SO_REUSEADDR, 1 );
SBind(receiver, recvaddr);
SListen(receiver, 128);
thread recvThread(recvListener, std::move(receiver), recvaddr);
recvThread.detach();
for(;;) {
sleep(1);
}
}

40
storage.cc 100644
View File

@ -0,0 +1,40 @@
#include "storage.hh"
#include "fmt/format.h"
#include "fmt/printf.h"
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
using namespace std;
std::string getPath(std::string_view root, time_t s, uint64_t sourceid, bool create)
{
auto comps = getPathComponents(root, s, sourceid);
std::string path;
for(unsigned int pos = 0; pos < comps.size() - 1 ; ++pos) {
path += comps[pos] +"/";
if(create)
mkdir(path.c_str(), 0770);
}
path += comps[comps.size()-1]+".gnss";
return path;
}
vector<string> getPathComponents(std::string_view root, time_t s, uint64_t sourceid)
{
// path: source/year/month/day/hour.pb
vector<string> ret;
ret.push_back((string)root);
ret.push_back(fmt::sprintf("%08x", sourceid));
struct tm tm;
gmtime_r(&s, &tm);
ret.push_back(to_string(tm.tm_year+1900));
ret.push_back(to_string(tm.tm_mon+1));
ret.push_back(to_string(tm.tm_mday+1));
ret.push_back(to_string(tm.tm_hour)+".pb");
return ret;
}

7
storage.hh 100644
View File

@ -0,0 +1,7 @@
#pragma once
#include <time.h>
#include <string>
#include <vector>
std::vector<std::string> getPathComponents(std::string_view root, time_t s, uint64_t sourceid);
std::string getPath(std::string_view root, time_t s, uint64_t sourceid, bool create=false);

View File

@ -359,7 +359,7 @@ int main(int argc, char** argv)
*/
std::map<pair<int,int>, struct timeval> lasttv, tv;
unsigned int curCycleTOW{0};
int curCycleTOW{-1}; // means invalid
for(;;) {
try {
@ -513,6 +513,8 @@ int main(int argc, char** argv)
curCycleTOW = satTOW - (satTOW %30);
}
else {
if(curCycleTOW < 0) // did not yet have a start of cycle
continue;
cerr<<" "<<wtype<<" sv "<<id.second<<" tow ";
if(wtype == 2) {
cerr<<"infered to be 1 "<<curCycleTOW + 31<<endl;