add client logging for navrecv
parent
27ee8daae2
commit
8179fca981
100
navrecv.cc
100
navrecv.cc
|
@ -16,6 +16,7 @@
|
||||||
#include "version.hh"
|
#include "version.hh"
|
||||||
#include <netinet/tcp.h>
|
#include <netinet/tcp.h>
|
||||||
#include "navmon.hh"
|
#include "navmon.hh"
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
static char program[]="navrecv";
|
static char program[]="navrecv";
|
||||||
|
|
||||||
|
@ -140,8 +141,94 @@ void writeToDisk(time_t s, uint64_t sourceid, std::string_view message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is used to report clients, so we can log them
|
||||||
|
// The idea is that cleanup runs from the Sentinel which, when destroyed, will remove the entry
|
||||||
|
struct ClientKeeper
|
||||||
|
{
|
||||||
|
struct ClientStatus
|
||||||
|
{
|
||||||
|
bool oldProtocol;
|
||||||
|
time_t lastMessage;
|
||||||
|
int station;
|
||||||
|
uint64_t messages{0};
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Sentinel
|
||||||
|
{
|
||||||
|
Sentinel(ClientKeeper* parent, const ComboAddress& us) : d_parent(parent), d_us(us)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
Sentinel(Sentinel&& s)
|
||||||
|
{
|
||||||
|
// cerr<<"Moved!"<<endl;
|
||||||
|
d_parent = s.d_parent;
|
||||||
|
d_us = s.d_us;
|
||||||
|
s.d_parent=0;
|
||||||
|
}
|
||||||
|
|
||||||
|
~Sentinel()
|
||||||
|
{
|
||||||
|
// cerr<<"Destructor"<<endl;
|
||||||
|
if(d_parent) {
|
||||||
|
d_parent->remove(d_us);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
; //cerr<<" but we were moved already!\n";
|
||||||
|
|
||||||
|
}
|
||||||
|
void update(int station, bool oldProtocol)
|
||||||
|
{
|
||||||
|
time_t now = time(0);
|
||||||
|
std::lock_guard<std::mutex> l(d_parent->d_mut);
|
||||||
|
ClientStatus& cs = d_parent->d_clients[d_us];
|
||||||
|
cs.station = station;
|
||||||
|
cs.lastMessage = now;
|
||||||
|
cs.messages++;
|
||||||
|
cs.oldProtocol = oldProtocol;
|
||||||
|
}
|
||||||
|
ClientKeeper* d_parent;
|
||||||
|
ComboAddress d_us;
|
||||||
|
};
|
||||||
|
|
||||||
|
Sentinel reportClient(const ComboAddress& client)
|
||||||
|
{
|
||||||
|
Sentinel s2(this, client);
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> l(d_mut);
|
||||||
|
d_clients[client];
|
||||||
|
return s2;
|
||||||
|
}
|
||||||
|
|
||||||
|
void remove(const ComboAddress& client)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> l(d_mut);
|
||||||
|
d_clients.erase(client);
|
||||||
|
}
|
||||||
|
|
||||||
|
void dump()
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> l(d_mut);
|
||||||
|
string format("{:<50}{:<5}{:<10}{:<10}{:<10}\n");
|
||||||
|
ofstream out("clients.bak");
|
||||||
|
time_t now=time(0);
|
||||||
|
out<< fmt::format(format, "IP Address", "ID", "Protocol", "Messages", "Age");
|
||||||
|
for(const auto& c : d_clients) {
|
||||||
|
out << fmt::format(format, c.first.toStringWithPort(), c.second.station, c.second.oldProtocol ? "Old" : "New", c.second.messages, now-c.second.lastMessage);
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
unlink("clients.txt");
|
||||||
|
rename("clients.bak", "clients.txt");
|
||||||
|
}
|
||||||
|
|
||||||
|
map<ComboAddress, ClientStatus> d_clients;
|
||||||
|
std::mutex d_mut;
|
||||||
|
};
|
||||||
|
|
||||||
|
ClientKeeper g_ckeeper;
|
||||||
|
|
||||||
// note that this moves the socket
|
// note that this moves the socket
|
||||||
void recvSession2(Socket&& uns, ComboAddress client)
|
void recvSession2(Socket&& uns, ComboAddress client, ClientKeeper::Sentinel& sentinel)
|
||||||
{
|
{
|
||||||
string secret = SRead(uns, 8); // ignored for now
|
string secret = SRead(uns, 8); // ignored for now
|
||||||
cerr << "Entering compressed session for "<<client.toStringWithPort()<<endl;
|
cerr << "Entering compressed session for "<<client.toStringWithPort()<<endl;
|
||||||
|
@ -177,6 +264,7 @@ void recvSession2(Socket&& uns, ComboAddress client)
|
||||||
memcpy(&denum, num.c_str(), 4);
|
memcpy(&denum, num.c_str(), 4);
|
||||||
denum = htonl(denum);
|
denum = htonl(denum);
|
||||||
// cerr<<"Received message "<<denum<< " "<<nmm.localutcseconds()<<" " << nmm.localutcnanoseconds()/1000000000.0<<endl;
|
// cerr<<"Received message "<<denum<< " "<<nmm.localutcseconds()<<" " << nmm.localutcnanoseconds()/1000000000.0<<endl;
|
||||||
|
sentinel.update(nmm.sourceid(), false);
|
||||||
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
|
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
|
||||||
|
|
||||||
if(first) {
|
if(first) {
|
||||||
|
@ -201,6 +289,9 @@ void recvSession(int s, ComboAddress client)
|
||||||
SSetsockopt(s, SOL_SOCKET, SO_KEEPALIVE, 1); // saves file descriptors
|
SSetsockopt(s, SOL_SOCKET, SO_KEEPALIVE, 1); // saves file descriptors
|
||||||
cerr<<"Receiving messages from "<<client.toStringWithPort()<<endl;
|
cerr<<"Receiving messages from "<<client.toStringWithPort()<<endl;
|
||||||
bool first=true;
|
bool first=true;
|
||||||
|
|
||||||
|
ClientKeeper::Sentinel sentinel=g_ckeeper.reportClient(client);
|
||||||
|
|
||||||
for(int count=0;;++count) {
|
for(int count=0;;++count) {
|
||||||
string part=SRead(sock, 4);
|
string part=SRead(sock, 4);
|
||||||
if(part.empty()) {
|
if(part.empty()) {
|
||||||
|
@ -209,7 +300,7 @@ void recvSession(int s, ComboAddress client)
|
||||||
}
|
}
|
||||||
if(part != "bert") {
|
if(part != "bert") {
|
||||||
if(part == "RNIE")
|
if(part == "RNIE")
|
||||||
return recvSession2(std::move(sock), client); // protocol v2, socket is moved cuz cleanup is special
|
return recvSession2(std::move(sock), client, sentinel); // protocol v2, socket is moved cuz cleanup is special
|
||||||
cerr << "Message "<<count<<", wrong magic from "<<client.toStringWithPort()<<": "<<makeHexDump(part)<<endl;
|
cerr << "Message "<<count<<", wrong magic from "<<client.toStringWithPort()<<": "<<makeHexDump(part)<<endl;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -235,6 +326,7 @@ void recvSession(int s, ComboAddress client)
|
||||||
cerr<<"\tstation: "<<nmm.sourceid() << endl;
|
cerr<<"\tstation: "<<nmm.sourceid() << endl;
|
||||||
first=false;
|
first=false;
|
||||||
}
|
}
|
||||||
|
sentinel.update(nmm.sourceid(), true);
|
||||||
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
|
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -287,7 +379,9 @@ int main(int argc, char** argv)
|
||||||
thread recvThread(recvListener, std::move(receiver), recvaddr);
|
thread recvThread(recvListener, std::move(receiver), recvaddr);
|
||||||
recvThread.detach();
|
recvThread.detach();
|
||||||
|
|
||||||
|
sleep(5);
|
||||||
for(;;) {
|
for(;;) {
|
||||||
sleep(1);
|
g_ckeeper.dump();
|
||||||
|
sleep(10);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue