make navrecv report station id for a new connection, make it set KEEPALIVE so we lose connections that changed IP address etc, some improved debugging messages
parent
cea8f539a0
commit
f0269f5660
25
navrecv.cc
25
navrecv.cc
|
@ -15,6 +15,7 @@
|
||||||
#include "CLI/CLI.hpp"
|
#include "CLI/CLI.hpp"
|
||||||
#include "version.hh"
|
#include "version.hh"
|
||||||
#include <netinet/tcp.h>
|
#include <netinet/tcp.h>
|
||||||
|
#include "navmon.hh"
|
||||||
|
|
||||||
static char program[]="navrecv";
|
static char program[]="navrecv";
|
||||||
|
|
||||||
|
@ -147,6 +148,7 @@ void recvSession2(Socket&& uns, ComboAddress client)
|
||||||
ZStdReader zsr(uns);
|
ZStdReader zsr(uns);
|
||||||
int s = zsr.getFD();
|
int s = zsr.getFD();
|
||||||
// time_t start = time(0);
|
// time_t start = time(0);
|
||||||
|
bool first=true;
|
||||||
for(;;) {
|
for(;;) {
|
||||||
// enable this to test ubxtool resilience & buffering
|
// enable this to test ubxtool resilience & buffering
|
||||||
// if(time(0) - start > 30)
|
// if(time(0) - start > 30)
|
||||||
|
@ -176,10 +178,17 @@ void recvSession2(Socket&& uns, ComboAddress client)
|
||||||
denum = htonl(denum);
|
denum = htonl(denum);
|
||||||
// cerr<<"Received message "<<denum<< " "<<nmm.localutcseconds()<<" " << nmm.localutcnanoseconds()/1000000000.0<<endl;
|
// cerr<<"Received message "<<denum<< " "<<nmm.localutcseconds()<<" " << nmm.localutcnanoseconds()/1000000000.0<<endl;
|
||||||
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
|
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
|
||||||
|
|
||||||
|
if(first) {
|
||||||
|
cerr<<"\tstation: "<<nmm.sourceid() << endl;
|
||||||
|
first=false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#ifdef __linux__
|
#ifdef __linux__
|
||||||
SSetsockopt(uns, IPPROTO_TCP, TCP_CORK, 1 );
|
SSetsockopt(uns, IPPROTO_TCP, TCP_CORK, 1 );
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SWrite(uns, num);
|
SWrite(uns, num);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -189,8 +198,10 @@ void recvSession(int s, ComboAddress client)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
Socket sock(s); // this closes on destruction
|
Socket sock(s); // this closes on destruction
|
||||||
|
SSetsockopt(s, SOL_SOCKET, SO_KEEPALIVE, 1); // saves file descriptors
|
||||||
cerr<<"Receiving messages from "<<client.toStringWithPort()<<endl;
|
cerr<<"Receiving messages from "<<client.toStringWithPort()<<endl;
|
||||||
for(;;) {
|
bool first=true;
|
||||||
|
for(int count=0;;++count) {
|
||||||
string part=SRead(sock, 4);
|
string part=SRead(sock, 4);
|
||||||
if(part.empty()) {
|
if(part.empty()) {
|
||||||
cerr<<"EOF from "<<client.toStringWithPort()<<endl;
|
cerr<<"EOF from "<<client.toStringWithPort()<<endl;
|
||||||
|
@ -199,7 +210,7 @@ void recvSession(int s, ComboAddress client)
|
||||||
if(part != "bert") {
|
if(part != "bert") {
|
||||||
if(part == "RNIE")
|
if(part == "RNIE")
|
||||||
return recvSession2(std::move(sock), client); // protocol v2, socket is moved cuz cleanup is special
|
return recvSession2(std::move(sock), client); // protocol v2, socket is moved cuz cleanup is special
|
||||||
cerr << "Wrong magic from "<<client.toStringWithPort()<<": "<<part<<endl;
|
cerr << "Message "<<count<<", wrong magic from "<<client.toStringWithPort()<<": "<<makeHexDump(part)<<endl;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
string out=part;
|
string out=part;
|
||||||
|
@ -212,11 +223,17 @@ void recvSession(int s, ComboAddress client)
|
||||||
len = htons(len);
|
len = htons(len);
|
||||||
|
|
||||||
part = SRead(s, len);
|
part = SRead(s, len);
|
||||||
|
if(part.size() != len) {
|
||||||
|
cerr<<"Mismatch, "<<part.size()<<", len "<<len<<endl;
|
||||||
|
}
|
||||||
out += part;
|
out += part;
|
||||||
|
|
||||||
NavMonMessage nmm;
|
NavMonMessage nmm;
|
||||||
nmm.ParseFromString(part);
|
nmm.ParseFromString(part);
|
||||||
|
if(first) {
|
||||||
|
cerr<<"\tstation: "<<nmm.sourceid() << endl;
|
||||||
|
first=false;
|
||||||
|
}
|
||||||
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
|
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue