deal with partial reads on ack-numbers, plus make sure ubxtool waits on threads to output their buffers before exiting. Thanks to Jeff Sipek for finding these issues.

pull/118/head
bert hubert 2020-05-05 21:40:40 +02:00
parent 0710e0ad81
commit 69d6d667e0
2 changed files with 26 additions and 16 deletions

View File

@ -58,24 +58,23 @@ void NMMSender::sendTCPThread(Destination* d)
bool hadMessage=false; bool hadMessage=false;
int msgnum = 0; int msgnum = 0;
for(;;) { for(;;) {
uint32_t num; uint32_t num;
// read acks // read acks
for(;;) { for(; zsc ;) { // only do this for compressed protocol
int res = read(s, &num, 4); try {
if(res < 0) { readn2(s, &num, 4); // this will give us 4, or throw
num = ntohl(num);
unacked.erase(num);
}
catch(EofException& ee) {
throw std::runtime_error("EOF while reading acks");
}
catch(std::exception& e) {
if(errno != EAGAIN) if(errno != EAGAIN)
unixDie("Reading acknowledgements in nmmsender"); unixDie("Reading acknowledgements in nmmsender");
break; break;
} }
if(res==0)
throw std::runtime_error("EOF while reading acks");
if(res==4) {
num = ntohl(num);
unacked.erase(num);
}
else
throw std::runtime_error("Partial read of "+to_string(res)+" bytes");
} }
@ -111,12 +110,14 @@ void NMMSender::sendTCPThread(Destination* d)
zsc->flush(); zsc->flush();
if(time(0) - connStartTime > 10 && unacked.size() > 1000) if(time(0) - connStartTime > 10 && unacked.size() > 1000)
throw std::runtime_error("Too many messages unacked, recycling connection"); throw std::runtime_error("Too many messages unacked ("+to_string(unacked.size())+"), recycling connection");
} }
hadMessage = false; hadMessage = false;
if(d_pleaseQuit)
return;
usleep(100000); usleep(100000);
#if defined(TCP_CORK) #if defined(TCP_CORK)
/* linux-only: has an implied 200ms timeout */ /* linux-only: has an implied 200ms timeout */

View File

@ -37,8 +37,7 @@ public:
{ {
for(auto& d : d_dests) { for(auto& d : d_dests) {
if(!d->dst.empty()) { if(!d->dst.empty()) {
std::thread t(&NMMSender::sendTCPThread, this, d.get()); d_thread.emplace_back(std::move(std::make_unique<std::thread>(&NMMSender::sendTCPThread, this, d.get())));
t.detach();
} }
} }
} }
@ -48,7 +47,17 @@ public:
void emitNMM(const NavMonMessage& nmm); void emitNMM(const NavMonMessage& nmm);
bool d_debug{false}; bool d_debug{false};
bool d_compress{false}; // set BEFORE launch bool d_compress{false}; // set BEFORE launch
bool d_pleaseQuit{false};
~NMMSender()
{
if(!d_thread.empty()) {
d_pleaseQuit = true;
for(auto& t : d_thread)
t->join();
}
}
private: private:
std::vector<std::unique_ptr<Destination>> d_dests; std::vector<std::unique_ptr<Destination>> d_dests;
std::vector<std::unique_ptr<std::thread>> d_thread;
}; };