2020-02-29 07:26:05 -07:00
# include "nmmsender.hh"
# include "comboaddress.hh"
# include "swrappers.hh"
# include "sclasses.hh"
# include <random>
# include "navmon.hh"
# include <algorithm>
2020-04-26 08:12:19 -06:00
# include "zstdwrap.hh"
2020-04-27 15:30:06 -06:00
# include <netinet/tcp.h>
2020-02-29 07:26:05 -07:00
using namespace std ;
void NMMSender : : sendTCPThread ( Destination * d )
{
struct NameError { } ;
for ( ; ; ) {
ComboAddress chosen ;
2020-04-26 08:12:19 -06:00
map < uint32_t , string > unacked ;
2020-02-29 07:26:05 -07:00
try {
vector < ComboAddress > addrs ;
for ( ; ; ) {
addrs = resolveName ( d - > dst , true , true ) ;
if ( ! addrs . empty ( ) )
break ;
cerr < < humanTimeNow ( ) < < " Unable to resolve " < < d - > dst < < " , sleeping and trying again later " < < endl ;
throw NameError ( ) ;
}
std : : random_device rng ;
std : : mt19937 urng ( rng ( ) ) ;
std : : shuffle ( addrs . begin ( ) , addrs . end ( ) , urng ) ;
for ( auto & addr : addrs ) {
if ( ! addr . sin4 . sin_port )
addr . sin4 . sin_port = ntohs ( 29603 ) ;
chosen = addr ;
Socket s ( addr . sin4 . sin_family , SOCK_STREAM ) ;
SocketCommunicator sc ( s ) ;
sc . setTimeout ( 3 ) ;
sc . connect ( addr ) ;
2020-04-28 08:07:49 -06:00
# if !defined(TCP_CORK) && defined(TCP_NOPUSH)
/* start off "buffering" */
SSetsockopt ( s , IPPROTO_TCP , TCP_NOPUSH , 1 ) ;
# endif
2020-04-26 08:12:19 -06:00
time_t connStartTime = time ( 0 ) ;
2020-02-29 07:26:05 -07:00
if ( d_debug ) { cerr < < humanTimeNow ( ) < < " Connected to " < < d - > dst < < " on " < < addr . toStringWithPort ( ) < < endl ; }
2020-04-26 08:12:19 -06:00
auto emit = [ & sc ] ( const char * buf , uint32_t len ) {
sc . writen ( string ( buf , len ) ) ;
} ;
std : : unique_ptr < ZStdCompressor > zsc ;
if ( d_compress ) {
2020-04-27 15:30:06 -06:00
sc . writen ( " RNIE00000000 " ) ; // the other magic value is "bert". hence.
// the 00000000 is a placeholder for a "secret" we might implement later
2020-04-28 13:19:52 -06:00
zsc = std : : make_unique < ZStdCompressor > ( emit , 9 ) ;
2020-04-26 08:12:19 -06:00
}
bool hadMessage = false ;
int msgnum = 0 ;
2020-05-05 13:40:40 -06:00
for ( ; ; ) {
2020-04-26 08:12:19 -06:00
uint32_t num ;
// read acks
2020-05-05 13:40:40 -06:00
for ( ; zsc ; ) { // only do this for compressed protocol
try {
readn2 ( s , & num , 4 ) ; // this will give us 4, or throw
num = ntohl ( num ) ;
unacked . erase ( num ) ;
}
catch ( EofException & ee ) {
throw std : : runtime_error ( " EOF while reading acks " ) ;
}
catch ( std : : exception & e ) {
2020-04-26 08:12:19 -06:00
if ( errno ! = EAGAIN )
unixDie ( " Reading acknowledgements in nmmsender " ) ;
break ;
}
}
2020-02-29 07:26:05 -07:00
std : : string msg ;
{
std : : lock_guard < std : : mutex > mut ( d - > mut ) ;
if ( ! d - > queue . empty ( ) ) {
msg = d - > queue . front ( ) ;
}
}
if ( ! msg . empty ( ) ) {
2020-04-26 08:12:19 -06:00
hadMessage = true ;
if ( zsc ) {
uint32_t num = htonl ( msgnum ) ;
string encap ( ( const char * ) & num , 4 ) ;
encap + = msg ;
zsc - > give ( encap . c_str ( ) , encap . size ( ) ) ;
unacked [ msgnum ] = msg ;
msgnum + + ;
}
else
sc . writen ( msg ) ;
2020-02-29 07:26:05 -07:00
std : : lock_guard < std : : mutex > mut ( d - > mut ) ;
d - > queue . pop_front ( ) ;
2020-04-26 08:12:19 -06:00
}
else {
if ( zsc & & hadMessage ) {
2020-04-28 03:17:55 -06:00
// cerr << "Compressed to: "<< 100.0*zsc->d_outputBytes/zsc->d_inputBytes<<"%, buffered compressed: "<<zsc->outputBufferBytes()<<" out of " <<zsc->outputBufferCapacity()<<" bytes. Unacked: "<<unacked.size()<<endl;
2020-04-26 08:12:19 -06:00
zsc - > flush ( ) ;
if ( time ( 0 ) - connStartTime > 10 & & unacked . size ( ) > 1000 )
2020-05-05 13:40:40 -06:00
throw std : : runtime_error ( " Too many messages unacked ( " + to_string ( unacked . size ( ) ) + " ), recycling connection " ) ;
2020-04-26 08:12:19 -06:00
}
hadMessage = false ;
2020-05-05 13:40:40 -06:00
if ( d_pleaseQuit )
return ;
2020-04-26 08:12:19 -06:00
usleep ( 100000 ) ;
2020-04-28 08:07:49 -06:00
# if defined(TCP_CORK)
/* linux-only: has an implied 200ms timeout */
2020-04-27 15:30:06 -06:00
SSetsockopt ( s , IPPROTO_TCP , TCP_CORK , 1 ) ;
2020-04-28 08:07:49 -06:00
# elif defined(TCP_NOPUSH)
/*
* freebsd / osx : buffers until buffer full / connection closed , so
* we toggle it every other loop through
*/
static bool push_toggle ;
if ( push_toggle ) {
SSetsockopt ( s , IPPROTO_TCP , TCP_NOPUSH , 0 ) ;
SSetsockopt ( s , IPPROTO_TCP , TCP_NOPUSH , 1 ) ;
}
push_toggle = ! push_toggle ;
2020-04-27 15:30:06 -06:00
# endif
2020-04-26 08:12:19 -06:00
2020-02-29 07:26:05 -07:00
}
}
}
}
catch ( NameError & ) {
{
std : : lock_guard < std : : mutex > mut ( d - > mut ) ;
2020-04-26 08:12:19 -06:00
if ( d_debug ) { cerr < < humanTimeNow ( ) < < " There are now " < < d - > queue . size ( ) < < " messages queued for " < < d - > dst < < " , and " < < unacked . size ( ) < < " unacknowledged " < < endl ; }
2020-02-29 07:26:05 -07:00
}
sleep ( 30 ) ;
}
catch ( std : : exception & e ) {
if ( d_debug ) { cerr < < humanTimeNow ( ) < < " Sending thread for " < < d - > dst < < " via " < < chosen . toStringWithPort ( ) < < " had error: " < < e . what ( ) < < endl ; }
{
std : : lock_guard < std : : mutex > mut ( d - > mut ) ;
2020-04-26 08:12:19 -06:00
if ( d_debug ) { cerr < < humanTimeNow ( ) < < " There are now " < < d - > queue . size ( ) < < " messages queued for " < < d - > dst < < " , and " < < unacked . size ( ) < < " unacknowledged " < < endl ; }
2020-02-29 07:26:05 -07:00
}
sleep ( 1 ) ;
}
catch ( . . . ) {
if ( d_debug ) { cerr < < humanTimeNow ( ) < < " Sending thread for " < < d - > dst < < " via " < < chosen . toStringWithPort ( ) < < " had error " ; }
{
std : : lock_guard < std : : mutex > mut ( d - > mut ) ;
2020-04-26 08:12:19 -06:00
if ( d_debug ) { cerr < < " There are now " < < d - > queue . size ( ) < < " messages queued for " < < d - > dst < < " , and " < < unacked . size ( ) < < " unacknowledge via " < < chosen . toStringWithPort ( ) < < endl ; }
2020-02-29 07:26:05 -07:00
}
sleep ( 1 ) ;
}
2020-04-26 08:12:19 -06:00
std : : lock_guard < std : : mutex > mut ( d - > mut ) ;
if ( ! unacked . empty ( ) ) {
cerr < < humanTimeNow ( ) < < " Stuffing " < < unacked . size ( ) < < " messages back into the queue " < < endl ;
for ( auto iter = unacked . rbegin ( ) ; iter ! = unacked . rend ( ) ; + + iter ) {
d - > queue . push_front ( iter - > second ) ;
}
unacked . clear ( ) ;
}
2020-02-29 07:26:05 -07:00
}
}
void NMMSender : : emitNMM ( const NavMonMessage & nmm )
2020-08-03 15:20:17 -06:00
{
string out ;
nmm . SerializeToString ( & out ) ;
emitNMM ( out ) ;
}
void NMMSender : : emitNMM ( const std : : string & out )
2020-02-29 07:26:05 -07:00
{
for ( auto & d : d_dests ) {
2020-08-03 15:20:17 -06:00
d - > emitNMM ( out , d_compress ) ;
2020-02-29 07:26:05 -07:00
}
}
2020-08-03 15:20:17 -06:00
void NMMSender : : Destination : : emitNMM ( const std : : string & out , bool compressed )
2020-02-29 07:26:05 -07:00
{
2020-04-26 08:12:19 -06:00
string msg ;
2020-04-28 09:21:23 -06:00
if ( dst . empty ( ) | | ! compressed )
2020-04-26 08:12:19 -06:00
msg = " bert " ;
2020-02-29 07:26:05 -07:00
uint16_t len = htons ( out . size ( ) ) ;
msg . append ( ( char * ) & len , 2 ) ;
msg . append ( out ) ;
if ( ! dst . empty ( ) ) {
std : : lock_guard < std : : mutex > l ( mut ) ;
queue . push_back ( msg ) ;
}
else
writen2 ( fd , msg . c_str ( ) , msg . size ( ) ) ;
}