From 69d6d667e0e5c13e4998213eb7f988b27c96fb23 Mon Sep 17 00:00:00 2001 From: bert hubert Date: Tue, 5 May 2020 21:40:40 +0200 Subject: [PATCH] deal with partial reads on ack-numbers, plus make sure ubxtool waits on threads to output their buffers before exiting. Thanks to Jeff Sipek for finding these issues. --- nmmsender.cc | 27 ++++++++++++++------------- nmmsender.hh | 15 ++++++++++++--- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/nmmsender.cc b/nmmsender.cc index 2a27b02..c9dc61b 100644 --- a/nmmsender.cc +++ b/nmmsender.cc @@ -58,24 +58,23 @@ void NMMSender::sendTCPThread(Destination* d) bool hadMessage=false; int msgnum = 0; - for(;;) { + for(;;) { uint32_t num; // read acks - for(;;) { - int res = read(s, &num, 4); - if(res < 0) { + for(; zsc ;) { // only do this for compressed protocol + try { + readn2(s, &num, 4); // this will give us 4, or throw + num = ntohl(num); + unacked.erase(num); + } + catch(EofException& ee) { + throw std::runtime_error("EOF while reading acks"); + } + catch(std::exception& e) { if(errno != EAGAIN) unixDie("Reading acknowledgements in nmmsender"); break; } - if(res==0) - throw std::runtime_error("EOF while reading acks"); - if(res==4) { - num = ntohl(num); - unacked.erase(num); - } - else - throw std::runtime_error("Partial read of "+to_string(res)+" bytes"); } @@ -111,12 +110,14 @@ void NMMSender::sendTCPThread(Destination* d) zsc->flush(); if(time(0) - connStartTime > 10 && unacked.size() > 1000) - throw std::runtime_error("Too many messages unacked, recycling connection"); + throw std::runtime_error("Too many messages unacked ("+to_string(unacked.size())+"), recycling connection"); } hadMessage = false; + if(d_pleaseQuit) + return; usleep(100000); #if defined(TCP_CORK) /* linux-only: has an implied 200ms timeout */ diff --git a/nmmsender.hh b/nmmsender.hh index 2b60e4d..8bff2fe 100644 --- a/nmmsender.hh +++ b/nmmsender.hh @@ -37,8 +37,7 @@ public: { for(auto& d : d_dests) { if(!d->dst.empty()) { - std::thread t(&NMMSender::sendTCPThread, this, d.get()); - t.detach(); + d_thread.emplace_back(std::move(std::make_unique(&NMMSender::sendTCPThread, this, d.get()))); } } } @@ -48,7 +47,17 @@ public: void emitNMM(const NavMonMessage& nmm); bool d_debug{false}; bool d_compress{false}; // set BEFORE launch - + bool d_pleaseQuit{false}; + ~NMMSender() + { + if(!d_thread.empty()) { + d_pleaseQuit = true; + for(auto& t : d_thread) + t->join(); + } + } + private: std::vector> d_dests; + std::vector> d_thread; };