diff --git a/nmmsender.cc b/nmmsender.cc new file mode 100644 index 0000000..5e244d1 --- /dev/null +++ b/nmmsender.cc @@ -0,0 +1,108 @@ +#include "nmmsender.hh" +#include "comboaddress.hh" +#include "swrappers.hh" +#include "sclasses.hh" +#include +#include "navmon.hh" +#include + +using namespace std; + +void NMMSender::sendTCPThread(Destination* d) +{ + struct NameError{}; + for(;;) { + ComboAddress chosen; + try { + vector addrs; + for(;;) { + addrs=resolveName(d->dst, true, true); + if(!addrs.empty()) + break; + + cerr<dst<<", sleeping and trying again later"<dst<<" on "< mut(d->mut); + if(!d->queue.empty()) { + msg = d->queue.front(); + } + } + if(!msg.empty()) { + sc.writen(msg); + std::lock_guard mut(d->mut); + d->queue.pop_front(); + } + else usleep(100000); + } + } + } + catch(NameError&) { + { + std::lock_guard mut(d->mut); + if (d_debug) { cerr<queue.size()<<" messages queued for "<dst<dst<<" via "< mut(d->mut); + if (d_debug) { cerr<queue.size()<<" messages queued for "<dst<dst <<" via "< mut(d->mut); + if (d_debug) { cerr<<"There are now "<queue.size()<<" messages queued for "<dst<<" via "<emitNMM(nmm); + } +} + +void NMMSender::Destination::emitNMM(const NavMonMessage& nmm) +{ + string out; + nmm.SerializeToString(& out); + string msg("bert"); + + uint16_t len = htons(out.size()); + msg.append((char*)&len, 2); + msg.append(out); + + if(!dst.empty()) { + std::lock_guard l(mut); + queue.push_back(msg); + } + else + writen2(fd, msg.c_str(), msg.size()); +} + diff --git a/nmmsender.hh b/nmmsender.hh new file mode 100644 index 0000000..de482d8 --- /dev/null +++ b/nmmsender.hh @@ -0,0 +1,53 @@ +#pragma once +#include +#include +#include +#include "navmon.pb.h" +#include +#include + +class NMMSender +{ + struct Destination + { + int fd{-1}; + std::string dst; + std::string fname; + + std::deque queue; + std::mutex mut; + void emitNMM(const NavMonMessage& nmm); + }; + +public: + void addDestination(int fd) + { + auto d = std::make_unique(); + d->fd = fd; + d_dests.push_back(std::move(d)); + } + void addDestination(const std::string& dest) + { + auto d = std::make_unique(); + d->dst = dest; + d_dests.push_back(std::move(d)); + } + + void launch() + { + for(auto& d : d_dests) { + if(!d->dst.empty()) { + std::thread t(&NMMSender::sendTCPThread, this, d.get()); + t.detach(); + } + } + } + + void sendTCPThread(Destination* d); + + void emitNMM(const NavMonMessage& nmm); + bool d_debug{false}; +private: + + std::vector> d_dests; +};