#include "sclasses.hh" #include #include "navmon.hh" #include "navmon.pb.h" #include #include "nmmsender.hh" #include "CLI/CLI.hpp" #include "version.hh" using namespace std; static char program[]="navmerge"; /* ubxtool/rtcmtool/septool deliver data to one of several `navrecv` instances This means we need a 'merge' tool. The merge tool should be able to stream data from multiple `navnexus` instances (that correspond to the `navrecv` instances) Currently, `navnexus` is really simple - it will send you a feed from x hours back, where you don't get to pick x. The simplest navmerge implementation does nothing but connect to a few navnexus instances and it mixes them together. Every message "should" only appear on one of the upstreams, but you never know. On initial connection, the different navnexuses may start up from a different time, currently. Let us state that This Should Not Happen. On initial connect, a navnexus might take dozens of seconds before it starts coughing up data. Initial goal for navmerge is: only make sure realtime works. Every upstream has a thread that loops trying to connect If a new message comes in, it is stored in a shared data structure If a new connect is made, set a "don't send" marker for a whole minute There is a sender thread that periodically polls this data structure Any data that is older than the previous high-water mark gets sent out & removed from structure However, transmission stops 10 seconds before realtime If a "don't send" marker is set, we don't do a thing */ multimap, string> g_buffer; std::mutex g_mut; void recvSession(ComboAddress upstream) { for(;;) { try { Socket sock(upstream.sin4.sin_family, SOCK_STREAM); cerr<<"Connecting to "<< upstream.toStringWithPort()<<" to source data.."; SConnectWithTimeout(sock, upstream, 5); cerr<<" done"< mut(g_mut); g_buffer.insert({{nmm.localutcseconds(), nmm.localutcnanoseconds()}, part}); } } catch(std::exception& e) { cerr<<"Error in receiving thread: "< destinations; vector sources; bool doVERSION{false}, doSTDOUT{false}; CLI::App app(program); app.add_option("--source", sources, "Connect to these IP address:port to source protobuf"); app.add_option("--destination,-d", destinations, "Send output to this IPv4/v6 address"); app.add_flag("--version", doVERSION, "show program version and copyright"); app.add_flag("--stdout", doSTDOUT, "Emit output to stdout"); CLI11_PARSE(app, argc, argv); if(doVERSION) { showVersion(program, g_gitHash); exit(0); } signal(SIGPIPE, SIG_IGN); NMMSender ns; ns.d_debug = true; for(const auto& s : destinations) { auto res=resolveName(s, true, true); if(res.empty()) { cerr<<"Unable to resolve '"< tosend; { std::lock_guard mut(g_mut); time_t now = time(0); if(now - start < 30) { cerr<<"Have "<first.first > (uint64_t)now - 5) break; tosend.push_back(iter->second); iter = g_buffer.erase(iter); } } // cerr<<"Have "<