implement compressed submission protocol with acks

pull/114/head
bert hubert 2020-04-26 16:12:19 +02:00
parent 71203b6bc8
commit e96ceb7ddd
5 changed files with 137 additions and 19 deletions

View File

@ -102,8 +102,8 @@ navcat: navcat.o ext/fmt-6.1.2/src/format.o $(SIMPLESOCKETS) ubx.o bits.o navmo
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -pthread -lprotobuf
navrecv: navrecv.o ext/fmt-6.1.2/src/format.o $(SIMPLESOCKETS) navmon.pb.o storage.o githash.o
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -pthread -lprotobuf
navrecv: navrecv.o ext/fmt-6.1.2/src/format.o $(SIMPLESOCKETS) navmon.pb.o storage.o githash.o zstdwrap.o navmon.o
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -pthread -lprotobuf -lzstd
tlecatch: tlecatch.o $(patsubst %.cc,%.o,$(wildcard ext/sgp4/libsgp4/*.cc)) githash.o
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -pthread -lprotobuf
@ -111,12 +111,12 @@ tlecatch: tlecatch.o $(patsubst %.cc,%.o,$(wildcard ext/sgp4/libsgp4/*.cc)) gith
rinreport: rinreport.o rinex.o githash.o navmon.o ext/fmt-6.1.2/src/format.o ephemeris.o osen.o
$(CXX) -std=gnu++17 $^ -o $@ -lz -pthread
rtcmtool: rtcmtool.o navmon.pb.o githash.o ext/fmt-6.1.2/src/format.o bits.o nmmsender.o $(SIMPLESOCKETS) navmon.o rtcm.o
$(CXX) -std=gnu++17 $^ -o $@ -lz -pthread -lprotobuf
rtcmtool: rtcmtool.o navmon.pb.o githash.o ext/fmt-6.1.2/src/format.o bits.o nmmsender.o $(SIMPLESOCKETS) navmon.o rtcm.o zstdwrap.o
$(CXX) -std=gnu++17 $^ -o $@ -lz -pthread -lprotobuf -lzstd
ubxtool: navmon.pb.o ubxtool.o ubx.o bits.o ext/fmt-6.1.2/src/format.o galileo.o gps.o beidou.o navmon.o ephemeris.o $(SIMPLESOCKETS) osen.o githash.o nmmsender.o
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -lprotobuf -pthread
ubxtool: navmon.pb.o ubxtool.o ubx.o bits.o ext/fmt-6.1.2/src/format.o galileo.o gps.o beidou.o navmon.o ephemeris.o $(SIMPLESOCKETS) osen.o githash.o nmmsender.o zstdwrap.o
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -lprotobuf -pthread -lzstd
testrunner: navmon.pb.o testrunner.o ubx.o bits.o ext/fmt-6.1.2/src/format.o galileo.o gps.o beidou.o ephemeris.o sp3.o osen.o navmon.o rinex.o githash.o
$(CXX) -std=gnu++17 $^ -o $@ -L/usr/local/lib -lprotobuf -lz

View File

@ -11,7 +11,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "zstdwrap.hh"
#include "CLI/CLI.hpp"
#include "version.hh"
@ -138,10 +138,51 @@ void writeToDisk(time_t s, uint64_t sourceid, std::string_view message)
}
}
// note that this moves the socket
void recvSession2(Socket&& uns, ComboAddress client)
{
cerr << "Entering compressed session for "<<client.toStringWithPort()<<endl;
ZStdReader zsr(uns);
int s = zsr.getFD();
// time_t start = time(0);
for(;;) {
// enable this to test ubxtool resilience & buffering
// if(time(0) - start > 30)
// sleep(10);
string num=SRead(s, 4);
if(num.empty()) {
cerr<<"EOF from "<<client.toStringWithPort()<<endl;
break;
}
string out="bert";
string part = SRead(s, 2);
out += part;
uint16_t len;
memcpy(&len, part.c_str(), 2);
len = htons(len);
part = SRead(s, len);
out += part;
NavMonMessage nmm;
nmm.ParseFromString(part);
uint32_t denum;
memcpy(&denum, num.c_str(), 4);
denum = htonl(denum);
// cerr<<"Received message "<<denum<< " "<<nmm.localutcseconds()<<" " << nmm.localutcnanoseconds()/1000000000.0<<endl;
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
SWrite(uns, num);
}
}
void recvSession(int s, ComboAddress client)
{
try {
Socket sock(s);
Socket sock(s); // this closes on destruction
cerr<<"Receiving messages from "<<client.toStringWithPort()<<endl;
for(;;) {
string part=SRead(sock, 4);
@ -150,6 +191,8 @@ void recvSession(int s, ComboAddress client)
break;
}
if(part != "bert") {
if(part == "RNIE")
return recvSession2(std::move(sock), client); // protocol v2, socket is moved cuz cleanup is special
cerr << "Wrong magic from "<<client.toStringWithPort()<<": "<<part<<endl;
break;
}

View File

@ -5,6 +5,7 @@
#include <random>
#include "navmon.hh"
#include <algorithm>
#include "zstdwrap.hh"
using namespace std;
@ -13,6 +14,7 @@ void NMMSender::sendTCPThread(Destination* d)
struct NameError{};
for(;;) {
ComboAddress chosen;
map<uint32_t, string> unacked;
try {
vector<ComboAddress> addrs;
for(;;) {
@ -36,8 +38,40 @@ void NMMSender::sendTCPThread(Destination* d)
SocketCommunicator sc(s);
sc.setTimeout(3);
sc.connect(addr);
time_t connStartTime = time(0);
if (d_debug) { cerr<<humanTimeNow()<<" Connected to "<<d->dst<<" on "<<addr.toStringWithPort()<<endl; }
auto emit = [&sc](const char*buf, uint32_t len) {
sc.writen(string(buf, len));
};
std::unique_ptr<ZStdCompressor> zsc;
if(d_compress) {
sc.writen("RNIE"); // the other magic value is "bert". hence.
zsc = std::make_unique<ZStdCompressor>(emit, 20);
}
bool hadMessage=false;
int msgnum = 0;
for(;;) {
uint32_t num;
// read acks
for(;;) {
int res = read(s, &num, 4);
if(res < 0) {
if(errno != EAGAIN)
unixDie("Reading acknowledgements in nmmsender");
break;
}
if(res==0)
throw std::runtime_error("EOF while reading acks");
if(res==4) {
num = ntohl(num);
unacked.erase(num);
}
else
throw std::runtime_error("Partial read of "+to_string(res)+" bytes");
}
std::string msg;
{
std::lock_guard<std::mutex> mut(d->mut);
@ -46,18 +80,46 @@ void NMMSender::sendTCPThread(Destination* d)
}
}
if(!msg.empty()) {
sc.writen(msg);
hadMessage=true;
if(zsc) {
uint32_t num = htonl(msgnum);
string encap((const char*)&num, 4);
encap += msg;
zsc->give(encap.c_str(), encap.size());
unacked[msgnum] = msg;
msgnum++;
}
else
sc.writen(msg);
std::lock_guard<std::mutex> mut(d->mut);
d->queue.pop_front();
}
else {
if(zsc && hadMessage) {
cerr << "Compressed to: "<< 100.0*zsc->d_outputBytes/zsc->d_inputBytes<<"%, buffered compressed: "<<zsc->outputBufferBytes()<<" out of " <<zsc->outputBufferCapacity()<<" bytes. Unacked: "<<unacked.size()<<endl;
zsc->flush();
if(time(0) - connStartTime > 10 && unacked.size() > 1000)
throw std::runtime_error("Too many messages unacked, recycling connection");
}
hadMessage = false;
usleep(100000);
}
else usleep(100000);
}
}
}
catch(NameError&) {
{
std::lock_guard<std::mutex> mut(d->mut);
if (d_debug) { cerr<<humanTimeNow()<<" There are now "<<d->queue.size()<<" messages queued for "<<d->dst<<endl; }
if (d_debug) { cerr<<humanTimeNow()<<" There are now "<<d->queue.size()<<" messages queued for "<<d->dst<<", and "<<unacked.size()<<" unacknowledged"<<endl; }
}
sleep(30);
}
@ -65,7 +127,7 @@ void NMMSender::sendTCPThread(Destination* d)
if (d_debug) { cerr<<humanTimeNow()<<" Sending thread for "<<d->dst<<" via "<<chosen.toStringWithPort()<<" had error: "<<e.what()<<endl; }
{
std::lock_guard<std::mutex> mut(d->mut);
if (d_debug) { cerr<<humanTimeNow()<<" There are now "<<d->queue.size()<<" messages queued for "<<d->dst<<endl; }
if (d_debug) { cerr<<humanTimeNow()<<" There are now "<<d->queue.size()<<" messages queued for "<<d->dst<<", and "<<unacked.size()<<" unacknowledged"<<endl; }
}
sleep(1);
}
@ -73,10 +135,18 @@ void NMMSender::sendTCPThread(Destination* d)
if (d_debug) { cerr<<humanTimeNow()<<" Sending thread for "<<d->dst <<" via "<<chosen.toStringWithPort()<<" had error"; }
{
std::lock_guard<std::mutex> mut(d->mut);
if (d_debug) { cerr<<"There are now "<<d->queue.size()<<" messages queued for "<<d->dst<<" via "<<chosen.toStringWithPort()<<endl; }
if (d_debug) { cerr<<"There are now "<<d->queue.size()<<" messages queued for "<<d->dst<<", and "<<unacked.size()<<" unacknowledge via "<<chosen.toStringWithPort()<<endl; }
}
sleep(1);
}
std::lock_guard<std::mutex> mut(d->mut);
if(!unacked.empty()) {
cerr<<humanTimeNow()<< " Stuffing "<<unacked.size()<<" messages back into the queue"<<endl;
for(auto iter= unacked.rbegin(); iter != unacked.rend(); ++iter) {
d->queue.push_front(iter->second);
}
unacked.clear();
}
}
}
@ -84,15 +154,17 @@ void NMMSender::sendTCPThread(Destination* d)
void NMMSender::emitNMM(const NavMonMessage& nmm)
{
for(auto& d : d_dests) {
d->emitNMM(nmm);
d->emitNMM(nmm, d_compress);
}
}
void NMMSender::Destination::emitNMM(const NavMonMessage& nmm)
void NMMSender::Destination::emitNMM(const NavMonMessage& nmm, bool compressed)
{
string out;
nmm.SerializeToString(& out);
string msg("bert");
string msg;
if(!compressed)
msg="bert";
uint16_t len = htons(out.size());
msg.append((char*)&len, 2);

View File

@ -16,7 +16,7 @@ class NMMSender
std::deque<std::string> queue;
std::mutex mut;
void emitNMM(const NavMonMessage& nmm);
void emitNMM(const NavMonMessage& nmm, bool compress);
};
public:
@ -47,7 +47,8 @@ public:
void emitNMM(const NavMonMessage& nmm);
bool d_debug{false};
private:
bool d_compress{false}; // set BEFORE launch
private:
std::vector<std::unique_ptr<Destination>> d_dests;
};

View File

@ -426,9 +426,10 @@ int main(int argc, char** argv)
unsigned int fuzzPositionMeters=0;
string owner;
string remark;
bool doCompress=false;
app.add_option("--destination,-d", destinations, "Send output to this IPv4/v6 address");
app.add_flag("--wait", doWait, "Wait a bit, do not try to read init messages");
app.add_flag("--compress,-z", doCompress, "Use compressed protocol for network transmission");
app.add_flag("--reset", doReset, "Reset UBX device");
app.add_flag("--beidou,-c", doBeidou, "Enable BeiDou reception");
app.add_flag("--gps,-g", doGPS, "Enable GPS reception");
@ -878,6 +879,7 @@ int main(int argc, char** argv)
*/
int curCycleTOW{-1}; // means invalid
ns.d_compress = doCompress;
ns.launch();
cerr<<humanTimeNow()<<" Entering main loop"<<endl;