cleanup of ubxtool, with buffered writing ability to multiple destination

geo-improv2
bert hubert 2019-09-23 14:23:03 +02:00
parent 79e15e1749
commit 5169dc2ca3
3 changed files with 207 additions and 114 deletions

View File

@ -42,8 +42,8 @@ tlecatch: tlecatch.o $(patsubst %.cc,%.o,$(wildcard ext/sgp4/libsgp4/*.cc))
navmon.pb.cc: navmon.proto
protoc --cpp_out=./ navmon.proto
ubxtool: navmon.pb.o ubxtool.o ubx.o bits.o ext/fmt-5.2.1/src/format.o galileo.o gps.o beidou.o navmon.o ephemeris.o
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -lprotobuf
ubxtool: navmon.pb.o ubxtool.o ubx.o bits.o ext/fmt-5.2.1/src/format.o galileo.o gps.o beidou.o navmon.o ephemeris.o $(SIMPLESOCKETS)
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -lprotobuf -pthread
testrunner: navmon.pb.o testrunner.o ubx.o bits.o ext/fmt-5.2.1/src/format.o galileo.o gps.o beidou.o ephemeris.o
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -lprotobuf

View File

@ -92,10 +92,10 @@ try
tles.parseFile("gps-ops.txt");
tles.parseFile("beidou.txt");
bool skipGPS{true};
bool skipBeidou{true};
bool skipGPS{false};
bool skipBeidou{false};
bool skipGalileo{false};
bool skipGlonass{true};
bool skipGlonass{false};
ofstream almanac("almanac.txt");
ofstream iodstream("iodstream.csv");

View File

@ -26,11 +26,19 @@
#include "glonass.hh"
#include "beidou.hh"
#include "CLI/CLI.hpp"
struct timespec g_gstutc;
uint16_t g_wn;
#include <thread>
#include <mutex>
#include "comboaddress.hh"
#include "swrappers.hh"
#include "sclasses.hh"
struct timespec g_gnssutc;
uint16_t g_galwn;
using namespace std;
uint16_t g_srcid{2};
uint16_t g_srcid{0};
#define BAUDRATE B115200
@ -77,30 +85,6 @@ t n w
28 15: 0 WN/TOW
*/
/*
if(ubxClass == 2 && ubxType == 89) { // SAR
string hexstring;
for(int n = 0; n < 15; ++n)
hexstring+=fmt::format("%x", (int)getbitu(msg.c_str(), 36 + 4*n, 4));
// int sv = (int)msg[2];
// wk.emitLine(sv, "SAR "+hexstring);
// cout<<"SAR: sv = "<< (int)msg[2] <<" ";
// for(int n=4; n < 12; ++n)
// fmt::printf("%02x", (int)msg[n]);
// for(int n = 0; n < 15; ++n)
// fmt::printf("%x", (int)getbitu(msg.c_str(), 36 + 4*n, 4));
// cout << " Type: "<< (int) msg[12] <<"\n";
// cout<<"Parameter: (len = "<<msg.length()<<") ";
// for(unsigned int n = 13; n < msg.length(); ++n)
// fmt::printf("%02x ", (int)msg[n]);
// cout<<"\n";
}
*/
class UBXMessage
{
public:
@ -196,6 +180,7 @@ UBXMessage waitForUBX(int fd, int seconds, uint8_t ubxClass, uint8_t ubxType)
struct TimeoutError{};
// perhaps do retransmit from here?
bool waitForUBXAckNack(int fd, int seconds, int ubxClass, int ubxType)
{
time_t start = time(0);
@ -228,9 +213,103 @@ bool waitForUBXAckNack(int fd, int seconds, int ubxClass, int ubxType)
throw std::runtime_error("Did not get ACK/NACK response for class "+to_string(ubxClass)+" type "+to_string(ubxType)+" on time");
}
void emitNMM(int fd, const NavMonMessage& nmm)
class NMMSender
{
struct Destination
{
int fd{-1};
ComboAddress dst{"0.0.0.0:0"};
std::string fname;
deque<string> queue;
std::mutex mut;
void emitNMM(const NavMonMessage& nmm);
};
public:
void addDestination(int fd)
{
auto d = std::make_unique<Destination>();
d->fd = fd;
d_dests.push_back(std::move(d));
}
void addDestination(const ComboAddress& dest)
{
auto d = std::make_unique<Destination>();
d->dst = dest;
d_dests.push_back(std::move(d));
}
void launch()
{
for(auto& d : d_dests) {
if(d->dst.sin4.sin_port != 0) {
thread t(&NMMSender::sendTCPThread, this, d.get());
t.detach();
}
}
}
void sendTCPThread(Destination* d)
{
for(;;) {
try {
Socket s(d->dst.sin4.sin_family, SOCK_STREAM);
SocketCommunicator sc(s);
sc.setTimeout(3);
sc.connect(d->dst);
cerr<<"Connected to "<<d->dst.toStringWithPort()<<endl;
for(;;) {
std::string msg;
{
std::lock_guard<std::mutex> mut(d->mut);
if(!d->queue.empty()) {
msg = d->queue.front();
}
}
if(!msg.empty()) {
sc.writen(msg);
d->queue.pop_front();
}
else usleep(100000);
}
}
catch(std::exception& e) {
cerr<<"Sending thread for "<<d->dst.toStringWithPort()<<" had error: "<<e.what()<<endl;
{
std::lock_guard<std::mutex> mut(d->mut);
cerr<<"There are now "<<d->queue.size()<<" messages queued for "<<d->dst.toStringWithPort()<<endl;
}
sleep(1);
}
catch(...) {
cerr<<"Sending thread for "<<d->dst.toStringWithPort()<<" had error";
{
std::lock_guard<std::mutex> mut(d->mut);
cerr<<"There are now "<<d->queue.size()<<" messages queued for "<<d->dst.toStringWithPort()<<endl;
}
sleep(1);
}
}
}
void emitNMM(const NavMonMessage& nmm);
private:
vector<unique_ptr<Destination>> d_dests;
};
void NMMSender::emitNMM(const NavMonMessage& nmm)
{
for(auto& d : d_dests) {
d->emitNMM(nmm);
}
}
void NMMSender::Destination::emitNMM(const NavMonMessage& nmm)
{
string out;
nmm.SerializeToString(& out);
string msg("bert");
@ -238,10 +317,16 @@ void emitNMM(int fd, const NavMonMessage& nmm)
uint16_t len = htons(out.size());
msg.append((char*)&len, 2);
msg.append(out);
writen2(fd, msg.c_str(), msg.size());
if(dst.sin4.sin_port) {
std::lock_guard<std::mutex> l(mut);
queue.push_back(msg);
}
else
writen2(fd, msg.c_str(), msg.size());
}
void enableUBXMessageUSB(int fd, uint8_t ubxClass, uint8_t ubxType, uint8_t rate=1)
{
for(int n=0 ; n < 5; ++n) {
@ -335,15 +420,16 @@ int main(int argc, char** argv)
CLI::App app("ubxtool");
vector<std::string> serial;
bool doGPS{true}, doGalileo{true}, doGlonass{false}, doBeidou{true}, doReset{false}, doWait{false}, doRTSCTS{true}, doSBAS{false};
bool doGPS{true}, doGalileo{true}, doGlonass{false}, doBeidou{true}, doReset{false}, doWait{true}, doRTSCTS{true}, doSBAS{false};
bool doSTDOUT=false;
#ifdef OpenBSD
doRTSCTS = false;
#endif
app.add_option("serial", serial, "Serial");
vector<string> destinations;
string portName;
app.add_option("--destination,-d", destinations, "Send output to this IPv4/v6 address");
app.add_flag("--wait", doWait, "Wait a bit, do not try to read init messages");
app.add_flag("--reset", doReset, "Reset UBX device");
app.add_flag("--beidou,-c", doBeidou, "Enable BeiDou reception");
@ -352,22 +438,18 @@ int main(int argc, char** argv)
app.add_flag("--galileo,-e", doGalileo, "Enable Galileo reception");
app.add_flag("--sbas,-s", doSBAS, "Enable SBAS (EGNOS/WAAS/etc) reception");
app.add_option("--rtscts", doRTSCTS, "Set hardware handshaking");
app.add_flag("--stdout", doSTDOUT, "Emit output to stdout");
app.add_option("--port,-p", portName, "Device or file to read serial from");
app.add_option("--station", g_srcid, "Station id");
try {
app.parse(argc, argv);
} catch(const CLI::Error &e) {
return app.exit(e);
}
if(serial.size() != 2) {
cout<<app.help()<<endl;
return EXIT_FAILURE;
}
int fd = initFD(serial[0].c_str(), doRTSCTS);
int fd = initFD(portName.c_str(), doRTSCTS);
g_srcid = atoi(serial[1].c_str());
bool version9 = false;
if(!g_fromFile) {
bool doInit = true;
@ -387,7 +469,7 @@ int main(int argc, char** argv)
for(int n=0 ; n< 20; ++n) {
cerr<<"Waiting for device to come back"<<endl;
try {
fd = initFD(serial[0].c_str(), doRTSCTS);
fd = initFD(portName.c_str(), doRTSCTS);
readSome(fd);
}
catch(...)
@ -472,9 +554,6 @@ int main(int argc, char** argv)
}
}
cerr<<"Disabling NMEA"<<endl;
msg = buildUbxMessage(0x06, 0x00, {0x03,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x00,0x03,0x00,0x01,0x00,0x00,0x00,0x00,0x00});
@ -526,7 +605,7 @@ int main(int argc, char** argv)
}
cerr<<"Enabling UBX-NAV-PVT"<<endl; // position, velocity, time fix
enableUBXMessageUSB(fd, 0x01, 0x07, 1);
enableUBXMessageUSB(fd, 0x01, 0x07, 1); // we use this to get timing
}
}
@ -542,16 +621,23 @@ int main(int argc, char** argv)
what timestamp it claims to have, so that we can set subsequent timestamps correctly.
*/
std::map<pair<int,int>, struct timeval> lasttv, tv;
int curCycleTOW{-1}; // means invalid
signal(SIGPIPE, SIG_IGN);
NMMSender ns;
for(const auto& s : destinations)
ns.addDestination(ComboAddress(s, 29603));
if(doSTDOUT)
ns.addDestination(1);
ns.launch();
cerr<<"Entering main loop"<<endl;
for(;;) {
try {
auto [msg, timestamp] = getUBXMessage(fd);
(void)timestamp;
auto payload = msg.getPayload();
// should turn this into protobuf
if(msg.getClass() == 0x01 && msg.getType() == 0x07) { // UBX-NAV-PVT
struct PVT
{
@ -586,26 +672,19 @@ int main(int argc, char** argv)
satt--;
pvt.nano += 1000000000;
}
g_gstutc.tv_sec = satt;
g_gstutc.tv_nsec = pvt.nano;
// double seconds= pvt.sec + pvt.nano/1000000000.0;
// fmt::fprintf(stderr, "Satellite UTC: %02d:%02d:%06.4f -> %.4f or %d:%f\n", tm.tm_hour, tm.tm_min, seconds, satutc, timegm(&tm), pvt.nano/1000.0);
if(!g_fromFile) {
// struct tm ourtime;
// time_t ourt = timestamp.tv_sec;
// gmtime_r(&ourt, &ourtime);
//
//double ourutc = ourt + timestamp.tv_usec/1000000.0;
// seconds = ourtime.tm_sec + timestamp.tv_usec/1000000.0;
// fmt::fprintf(stderr, "Our UTC : %02d:%02d:%06.4f -> %.4f or %d:%f -> delta = %.4fs\n", tm.tm_hour, tm.tm_min, seconds, ourutc, timestamp.tv_sec, 1.0*timestamp.tv_usec, ourutc - satutc);
}
if(!g_gnssutc.tv_sec)
cerr<<"Got initial timestamp"<<endl;
g_gnssutc.tv_sec = satt;
g_gnssutc.tv_nsec = pvt.nano;
continue;
}
else if(msg.getClass() == 0x02 && msg.getType() == 0x15) { // RAWX, the doppler stuff
if(!g_gnssutc.tv_sec) {
cerr<<"Ignoring message with class "<<msg.getClass()<< " and type "<< msg.getType()<<": have not yet received a timestamp"<<endl;
continue;
}
if(msg.getClass() == 0x02 && msg.getType() == 0x15) { // RAWX, the doppler stuff
// cerr<<"Got "<<(int)payload[11] <<" measurements "<<endl;
double rcvTow;
memcpy(&rcvTow, &payload[0], 8);
@ -642,8 +721,8 @@ int main(int argc, char** argv)
NavMonMessage nmm;
nmm.set_type(NavMonMessage::RFDataType);
nmm.set_localutcseconds(g_gstutc.tv_sec);
nmm.set_localutcnanoseconds(g_gstutc.tv_nsec);
nmm.set_localutcseconds(g_gnssutc.tv_sec);
nmm.set_localutcnanoseconds(g_gnssutc.tv_nsec);
nmm.set_sourceid(g_srcid);
nmm.mutable_rfd()->set_gnssid(gnssid);
@ -659,7 +738,7 @@ int main(int argc, char** argv)
nmm.mutable_rfd()->set_dostd(ldexp(0.002, doStddev));
nmm.mutable_rfd()->set_cpstd(cpStddev*0.4);
nmm.mutable_rfd()->set_locktimems(locktimems);
emitNMM(1, nmm);
ns.emitNMM( nmm);
}
}
else if(msg.getClass() == 0x01 && msg.getType() == 0x01) { // POSECF
@ -681,14 +760,14 @@ int main(int argc, char** argv)
NavMonMessage nmm;
nmm.set_type(NavMonMessage::ObserverPositionType);
nmm.set_localutcseconds(g_gstutc.tv_sec);
nmm.set_localutcnanoseconds(g_gstutc.tv_nsec);
nmm.set_localutcseconds(g_gnssutc.tv_sec);
nmm.set_localutcnanoseconds(g_gnssutc.tv_nsec);
nmm.set_sourceid(g_srcid);
nmm.mutable_op()->set_x(p.ecefX /100.0);
nmm.mutable_op()->set_y(p.ecefY /100.0);
nmm.mutable_op()->set_z(p.ecefZ /100.0);
nmm.mutable_op()->set_acc(p.pAcc /100.0);
emitNMM(1, nmm);
ns.emitNMM( nmm);
}
else if(msg.getClass() == 2 && msg.getType() == 0x13) { // SFRBX
@ -716,8 +795,8 @@ int main(int argc, char** argv)
if(id.first == 0 && !sigid) { // can only parse the old stuff
NavMonMessage nmm;
nmm.set_type(NavMonMessage::GPSInavType);
nmm.set_localutcseconds(g_gstutc.tv_sec);
nmm.set_localutcnanoseconds(g_gstutc.tv_nsec);
nmm.set_localutcseconds(g_gnssutc.tv_sec);
nmm.set_localutcnanoseconds(g_gnssutc.tv_nsec);
nmm.set_sourceid(g_srcid);
// cerr<<"GPS frame, numwords: "<<(int)payload[4]<<", version: "<<(int)payload[6]<<endl;
static int wn, tow;
@ -736,7 +815,7 @@ int main(int argc, char** argv)
nmm.mutable_gpsi()->set_gnssid(id.first);
nmm.mutable_gpsi()->set_gnsssv(id.second);
nmm.mutable_gpsi()->set_contents(string((char*)gpsframe.c_str(), gpsframe.size()));
emitNMM(1, nmm);
ns.emitNMM( nmm);
continue;
}
else if(id.first ==2) {
@ -744,12 +823,12 @@ int main(int argc, char** argv)
// cerr<<" res1 "<<(int)payload[2]<<" numwords "<<(int)payload[4] << " channel "<< (int)payload[5] << " version " << (int)payload[6]<<" res2 "<<(int)payload[7]<<endl;
auto inav = getInavFromSFRBXMsg(payload);
unsigned int wtype = getbitu(&inav[0], 0, 6);
tv[id] = timestamp;
// cerr<<"gnssid "<<id.first<<" sv "<<id.second<<" " << wtype << endl;
uint32_t satTOW;
int msgTOW{0};
if(getTOWFromInav(inav, &satTOW, &g_wn)) { // 0, 6, 5
if(getTOWFromInav(inav, &satTOW, &g_galwn)) { // 0, 6, 5
// cerr<<" "<<wtype<<" sv "<<id.second<<" tow "<<satTOW << " % 30 = "<< satTOW % 30<<", implied start of cycle: "<<(satTOW - (satTOW %30)) <<endl;
msgTOW = satTOW;
curCycleTOW = satTOW - (satTOW %30);
@ -786,10 +865,10 @@ int main(int argc, char** argv)
NavMonMessage nmm;
nmm.set_sourceid(g_srcid);
nmm.set_type(NavMonMessage::GalileoInavType);
nmm.set_localutcseconds(g_gstutc.tv_sec);
nmm.set_localutcnanoseconds(g_gstutc.tv_nsec);
nmm.set_localutcseconds(g_gnssutc.tv_sec);
nmm.set_localutcnanoseconds(g_gnssutc.tv_nsec);
nmm.mutable_gi()->set_gnsswn(g_wn);
nmm.mutable_gi()->set_gnsswn(g_galwn);
nmm.mutable_gi()->set_gnsstow(msgTOW);
nmm.mutable_gi()->set_gnssid(id.first);
@ -797,7 +876,7 @@ int main(int argc, char** argv)
nmm.mutable_gi()->set_sigid(sigid);
nmm.mutable_gi()->set_contents((const char*)&inav[0], inav.size());
emitNMM(1, nmm);
ns.emitNMM( nmm);
}
else if(id.first==3) {
auto gstr = getGlonassFromSFRBXMsg(payload);
@ -813,20 +892,23 @@ int main(int argc, char** argv)
continue;
}
NavMonMessage nmm;
nmm.set_localutcseconds(g_gstutc.tv_sec);
nmm.set_localutcnanoseconds(g_gstutc.tv_nsec);
nmm.set_localutcseconds(g_gnssutc.tv_sec);
nmm.set_localutcnanoseconds(g_gnssutc.tv_nsec);
nmm.set_sourceid(g_srcid);
if(id.second > 5) {
// this **HARDCODES** that C01,02,03,04,05 emit D2 messages!
nmm.set_type(NavMonMessage::BeidouInavTypeD1);
nmm.mutable_bid1()->set_gnsswn(bm.wn);
nmm.mutable_bid1()->set_gnsswn(bm.wn); // only sent in word 1!!
nmm.mutable_bid1()->set_gnsstow(bm.sow);
nmm.mutable_bid1()->set_gnssid(id.first);
nmm.mutable_bid1()->set_gnsssv(id.second);
nmm.mutable_bid1()->set_sigid(sigid);
nmm.mutable_bid1()->set_contents(string((char*)gstr.c_str(), gstr.size()));
ns.emitNMM( nmm);
}
else {
// not sending this: we can't even get the week number right!
/*
nmm.set_type(NavMonMessage::BeidouInavTypeD2);
nmm.mutable_bid2()->set_gnsswn(bm.wn);
nmm.mutable_bid2()->set_gnsstow(bm.sow);
@ -834,8 +916,10 @@ int main(int argc, char** argv)
nmm.mutable_bid2()->set_gnsssv(id.second);
nmm.mutable_bid2()->set_sigid(sigid);
nmm.mutable_bid2()->set_contents(string((char*)gstr.c_str(), gstr.size()));
*/
}
emitNMM(1, nmm);
continue;
}
else if(id.first==6) {
@ -848,8 +932,8 @@ int main(int argc, char** argv)
*/
if(id.second != 255) {
NavMonMessage nmm;
nmm.set_localutcseconds(g_gstutc.tv_sec);
nmm.set_localutcnanoseconds(g_gstutc.tv_nsec);
nmm.set_localutcseconds(g_gnssutc.tv_sec);
nmm.set_localutcnanoseconds(g_gnssutc.tv_nsec);
nmm.set_sourceid(g_srcid);
nmm.set_type(NavMonMessage::GlonassInavType);
nmm.mutable_gloi()->set_freq(payload[3]);
@ -858,21 +942,14 @@ int main(int argc, char** argv)
nmm.mutable_gloi()->set_sigid(sigid);
nmm.mutable_gloi()->set_contents(string((char*)gstr.c_str(), gstr.size()));
emitNMM(1, nmm);
ns.emitNMM( nmm);
}
}
else if(id.first == 1) {// SBAS
}
else
cerr<<"SFRBX from unsupported GNSSID/sigid combination "<<id.first<<", sv "<<id.second<<", sigid "<<sigid<<", "<<payload.size()<<" bytes"<<endl;
#if 0
if(0 && lasttv.count(id)) {
fmt::fprintf(stderr, "gnssid %d sv %d wtype %d, %d:%d -> %d:%d, delta=%d\n",
payload[0], payload[1], wtype, lasttv[id].tv_sec, lasttv[id].tv_usec, tv[id].tv_sec, tv[id].tv_usec, tv[id].tv_usec - lasttv[id].tv_usec);
}
#endif
lasttv[id]=tv[id];
}
catch(CRCMismatch& cm) {
cerr<<"Had CRC mismatch!"<<endl;
@ -892,8 +969,8 @@ int main(int argc, char** argv)
// cerr <<"gnssid "<<gnssid<<" sv "<<sv<<" el "<<el<<endl;
NavMonMessage nmm;
nmm.set_sourceid(g_srcid);
nmm.set_localutcseconds(g_gstutc.tv_sec);
nmm.set_localutcnanoseconds(g_gstutc.tv_nsec);
nmm.set_localutcseconds(g_gnssutc.tv_sec);
nmm.set_localutcnanoseconds(g_gnssutc.tv_nsec);
nmm.set_type(NavMonMessage::ReceptionDataType);
nmm.mutable_rd()->set_gnssid(gnssid);
@ -902,7 +979,23 @@ int main(int argc, char** argv)
nmm.mutable_rd()->set_el(el);
nmm.mutable_rd()->set_azi(azi);
nmm.mutable_rd()->set_prres(*((int16_t*)(payload.c_str()+ 14 +12*n)) *0.1);
emitNMM(1, nmm);
/*
uint32_t status;
memcpy(&status, &payload[16+12*n], 4);
cerr<<gnssid<<","<<sv<<":";
if(status & (1<<3))
cerr<<" used";
cerr<< " qualityind "<<(status & 7);
cerr<<" db "<<db<<" el "<< el;
cerr<<" health " << (status & (1<<5));
cerr<<" sbasCorr " << (status & (1<<16));
cerr<<" prRes "<<nmm.rd().prres();
cerr<<" orbsrc " << ((status >> 8)&7);
cerr<<" eph-avail " << !!(status & (1<<11));
cerr<<" alm-avail " << !!(status & (1<<12));
cerr<<endl;
*/
ns.emitNMM( nmm);
}
}
else if(msg.getClass() == 1 && msg.getType() == 0x43) { // UBX-NAV-SIG
@ -926,8 +1019,8 @@ int main(int argc, char** argv)
// cerr <<"gnssid "<<gnssid<<" sv "<<sv<<" el "<<el<<endl;
NavMonMessage nmm;
nmm.set_sourceid(g_srcid);
nmm.set_localutcseconds(g_gstutc.tv_sec);
nmm.set_localutcnanoseconds(g_gstutc.tv_nsec);
nmm.set_localutcseconds(g_gnssutc.tv_sec);
nmm.set_localutcnanoseconds(g_gnssutc.tv_nsec);
nmm.set_type(NavMonMessage::ReceptionDataType);
nmm.mutable_rd()->set_gnssid(gnssid);
@ -938,7 +1031,7 @@ int main(int argc, char** argv)
nmm.mutable_rd()->set_el(0);
nmm.mutable_rd()->set_azi(0);
emitNMM(1, nmm);
ns.emitNMM( nmm);
}
}
else if(msg.getClass() == 1 && msg.getType() == 0x30) { // UBX-NAV-SVINFO
@ -965,8 +1058,8 @@ int main(int argc, char** argv)
NavMonMessage nmm;
nmm.set_sourceid(g_srcid);
nmm.set_localutcseconds(g_gstutc.tv_sec);
nmm.set_localutcnanoseconds(g_gstutc.tv_nsec);
nmm.set_localutcseconds(g_gnssutc.tv_sec);
nmm.set_localutcnanoseconds(g_gnssutc.tv_nsec);
nmm.set_type(NavMonMessage::SARResponseType);
nmm.mutable_sr()->set_gnssid(2); // Galileo only for now