diff --git a/Makefile b/Makefile index 62c389e..297cd12 100644 --- a/Makefile +++ b/Makefile @@ -102,8 +102,8 @@ navcat: navcat.o ext/fmt-6.1.2/src/format.o $(SIMPLESOCKETS) ubx.o bits.o navmo $(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -pthread -lprotobuf -navrecv: navrecv.o ext/fmt-6.1.2/src/format.o $(SIMPLESOCKETS) navmon.pb.o storage.o githash.o - $(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -pthread -lprotobuf +navrecv: navrecv.o ext/fmt-6.1.2/src/format.o $(SIMPLESOCKETS) navmon.pb.o storage.o githash.o zstdwrap.o navmon.o + $(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -pthread -lprotobuf -lzstd tlecatch: tlecatch.o $(patsubst %.cc,%.o,$(wildcard ext/sgp4/libsgp4/*.cc)) githash.o $(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -pthread -lprotobuf @@ -111,12 +111,12 @@ tlecatch: tlecatch.o $(patsubst %.cc,%.o,$(wildcard ext/sgp4/libsgp4/*.cc)) gith rinreport: rinreport.o rinex.o githash.o navmon.o ext/fmt-6.1.2/src/format.o ephemeris.o osen.o $(CXX) -std=gnu++17 $^ -o $@ -lz -pthread -rtcmtool: rtcmtool.o navmon.pb.o githash.o ext/fmt-6.1.2/src/format.o bits.o nmmsender.o $(SIMPLESOCKETS) navmon.o rtcm.o - $(CXX) -std=gnu++17 $^ -o $@ -lz -pthread -lprotobuf +rtcmtool: rtcmtool.o navmon.pb.o githash.o ext/fmt-6.1.2/src/format.o bits.o nmmsender.o $(SIMPLESOCKETS) navmon.o rtcm.o zstdwrap.o + $(CXX) -std=gnu++17 $^ -o $@ -lz -pthread -lprotobuf -lzstd -ubxtool: navmon.pb.o ubxtool.o ubx.o bits.o ext/fmt-6.1.2/src/format.o galileo.o gps.o beidou.o navmon.o ephemeris.o $(SIMPLESOCKETS) osen.o githash.o nmmsender.o - $(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -lprotobuf -pthread +ubxtool: navmon.pb.o ubxtool.o ubx.o bits.o ext/fmt-6.1.2/src/format.o galileo.o gps.o beidou.o navmon.o ephemeris.o $(SIMPLESOCKETS) osen.o githash.o nmmsender.o zstdwrap.o + $(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -lprotobuf -pthread -lzstd testrunner: navmon.pb.o testrunner.o ubx.o bits.o ext/fmt-6.1.2/src/format.o galileo.o gps.o beidou.o ephemeris.o sp3.o osen.o navmon.o rinex.o githash.o $(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -lprotobuf -lz diff --git a/navrecv.cc b/navrecv.cc index 8fb7d9a..ef11364 100644 --- a/navrecv.cc +++ b/navrecv.cc @@ -11,7 +11,7 @@ #include #include #include - +#include "zstdwrap.hh" #include "CLI/CLI.hpp" #include "version.hh" @@ -138,10 +138,51 @@ void writeToDisk(time_t s, uint64_t sourceid, std::string_view message) } } +// note that this moves the socket +void recvSession2(Socket&& uns, ComboAddress client) +{ + cerr << "Entering compressed session for "< 30) + // sleep(10); + string num=SRead(s, 4); + if(num.empty()) { + cerr<<"EOF from "< #include "navmon.hh" #include +#include "zstdwrap.hh" using namespace std; @@ -13,6 +14,7 @@ void NMMSender::sendTCPThread(Destination* d) struct NameError{}; for(;;) { ComboAddress chosen; + map unacked; try { vector addrs; for(;;) { @@ -36,8 +38,40 @@ void NMMSender::sendTCPThread(Destination* d) SocketCommunicator sc(s); sc.setTimeout(3); sc.connect(addr); + time_t connStartTime = time(0); if (d_debug) { cerr<dst<<" on "< zsc; + if(d_compress) { + sc.writen("RNIE"); // the other magic value is "bert". hence. + zsc = std::make_unique(emit, 20); + } + bool hadMessage=false; + int msgnum = 0; + for(;;) { + uint32_t num; + // read acks + for(;;) { + int res = read(s, &num, 4); + if(res < 0) { + if(errno != EAGAIN) + unixDie("Reading acknowledgements in nmmsender"); + break; + } + if(res==0) + throw std::runtime_error("EOF while reading acks"); + if(res==4) { + num = ntohl(num); + unacked.erase(num); + } + else + throw std::runtime_error("Partial read of "+to_string(res)+" bytes"); + } + + std::string msg; { std::lock_guard mut(d->mut); @@ -46,18 +80,46 @@ void NMMSender::sendTCPThread(Destination* d) } } if(!msg.empty()) { - sc.writen(msg); + hadMessage=true; + if(zsc) { + + uint32_t num = htonl(msgnum); + string encap((const char*)&num, 4); + encap += msg; + zsc->give(encap.c_str(), encap.size()); + unacked[msgnum] = msg; + msgnum++; + + } + else + sc.writen(msg); std::lock_guard mut(d->mut); d->queue.pop_front(); + + } + else { + if(zsc && hadMessage) { + cerr << "Compressed to: "<< 100.0*zsc->d_outputBytes/zsc->d_inputBytes<<"%, buffered compressed: "<outputBufferBytes()<<" out of " <outputBufferCapacity()<<" bytes. Unacked: "<flush(); + + if(time(0) - connStartTime > 10 && unacked.size() > 1000) + throw std::runtime_error("Too many messages unacked, recycling connection"); + + + + } + hadMessage = false; + usleep(100000); + } - else usleep(100000); } } } catch(NameError&) { { std::lock_guard mut(d->mut); - if (d_debug) { cerr<queue.size()<<" messages queued for "<dst<queue.size()<<" messages queued for "<dst<<", and "<dst<<" via "< mut(d->mut); - if (d_debug) { cerr<queue.size()<<" messages queued for "<dst<queue.size()<<" messages queued for "<dst<<", and "<dst <<" via "< mut(d->mut); - if (d_debug) { cerr<<"There are now "<queue.size()<<" messages queued for "<dst<<" via "<queue.size()<<" messages queued for "<dst<<", and "< mut(d->mut); + if(!unacked.empty()) { + cerr<queue.push_front(iter->second); + } + unacked.clear(); + } } } @@ -84,15 +154,17 @@ void NMMSender::sendTCPThread(Destination* d) void NMMSender::emitNMM(const NavMonMessage& nmm) { for(auto& d : d_dests) { - d->emitNMM(nmm); + d->emitNMM(nmm, d_compress); } } -void NMMSender::Destination::emitNMM(const NavMonMessage& nmm) +void NMMSender::Destination::emitNMM(const NavMonMessage& nmm, bool compressed) { string out; nmm.SerializeToString(& out); - string msg("bert"); + string msg; + if(!compressed) + msg="bert"; uint16_t len = htons(out.size()); msg.append((char*)&len, 2); diff --git a/nmmsender.hh b/nmmsender.hh index de482d8..2b60e4d 100644 --- a/nmmsender.hh +++ b/nmmsender.hh @@ -16,7 +16,7 @@ class NMMSender std::deque queue; std::mutex mut; - void emitNMM(const NavMonMessage& nmm); + void emitNMM(const NavMonMessage& nmm, bool compress); }; public: @@ -47,7 +47,8 @@ public: void emitNMM(const NavMonMessage& nmm); bool d_debug{false}; -private: + bool d_compress{false}; // set BEFORE launch +private: std::vector> d_dests; }; diff --git a/ubxtool.cc b/ubxtool.cc index af31853..53524a6 100644 --- a/ubxtool.cc +++ b/ubxtool.cc @@ -426,9 +426,10 @@ int main(int argc, char** argv) unsigned int fuzzPositionMeters=0; string owner; string remark; - + bool doCompress=false; app.add_option("--destination,-d", destinations, "Send output to this IPv4/v6 address"); app.add_flag("--wait", doWait, "Wait a bit, do not try to read init messages"); + app.add_flag("--compress,-z", doCompress, "Use compressed protocol for network transmission"); app.add_flag("--reset", doReset, "Reset UBX device"); app.add_flag("--beidou,-c", doBeidou, "Enable BeiDou reception"); app.add_flag("--gps,-g", doGPS, "Enable GPS reception"); @@ -878,6 +879,7 @@ int main(int argc, char** argv) */ int curCycleTOW{-1}; // means invalid + ns.d_compress = doCompress; ns.launch(); cerr<