#include "comboaddress.hh" #include "sclasses.hh" #include #include #include "navmon.pb.h" #include "fmt/format.h" #include "fmt/printf.h" #include #include "storage.hh" #include #include #include #include #include "zstdwrap.hh" #include "CLI/CLI.hpp" #include "version.hh" #include #include "navmon.hh" #include static char program[]="navrecv"; using namespace std; extern const char* g_gitHash; /* Goals in life: 1) NEVER EVER GO DOWN 2) Receive all frames without thinking (see 1) 3) Put the frames in a scalable directory structure */ string g_storagedir="./storage/"; struct FatalException : public std::runtime_error { FatalException(const std::string& str) : std::runtime_error(str){} }; int getfd(const char* path, int mode, int permission) { struct FileID { string path; int mode; int permission; bool operator<(const FileID& rhs) const { return std::tie(path, mode, permission) < std::tie(rhs.path, rhs.mode, rhs. permission); } }; struct FDID { explicit FDID(int fd_) { fd = fd_; lastUsed = time(0); } FDID(const FDID& rhs) = delete; FDID& operator=(const FDID& rhs) = delete; FDID(FDID&& rhs) { fd = rhs.fd; lastUsed = rhs.lastUsed; rhs.fd = -1; } ~FDID() { if(fd >= 0) { cout<<"Closing fd "< lock(mut); static map fds; // do some cleanup time_t now = time(0); for(auto iter = fds.begin(); iter != fds.end(); ) { if(now - iter->second.lastUsed > 60) { cout<<"Found stale entry for "<first.path< 0) { cout<<"Need to erase "<second.lastUsed = time(0); return iter->second.fd; } // cout<<"Did not find it, first doing some cleaning"<remove(d_us); } else ; //cerr<<" but we were moved already!\n"; } void update(int station, bool oldProtocol) { time_t now = time(0); std::lock_guard l(d_parent->d_mut); ClientStatus& cs = d_parent->d_clients[d_us]; cs.station = station; cs.lastMessage = now; cs.messages++; cs.oldProtocol = oldProtocol; } ClientKeeper* d_parent; ComboAddress d_us; }; Sentinel reportClient(const ComboAddress& client) { Sentinel s2(this, client); std::lock_guard l(d_mut); d_clients[client]; return s2; } void remove(const ComboAddress& client) { std::lock_guard l(d_mut); d_clients.erase(client); } void dump() { std::lock_guard l(d_mut); string format("{:<50}{:<5}{:<10}{:<10}{:<10}\n"); ofstream out("clients.bak"); time_t now=time(0); out<< fmt::format(format, "IP Address", "ID", "Protocol", "Messages", "Age"); for(const auto& c : d_clients) { out << fmt::format(format, c.first.toStringWithPort(), c.second.station, c.second.oldProtocol ? "Old" : "New", c.second.messages, now-c.second.lastMessage); } out.close(); unlink("clients.txt"); rename("clients.bak", "clients.txt"); } map d_clients; std::mutex d_mut; }; ClientKeeper g_ckeeper; // note that this moves the socket void recvSession2(Socket&& uns, ComboAddress client, ClientKeeper::Sentinel& sentinel) { string secret = SRead(uns, 8); // ignored for now cerr << "Entering compressed session for "< 30) // sleep(10); string num=SRead(s, 4); if(num.empty()) { cerr<<"EOF from "<