msgq is done, messaging bechmarks can go

This commit is contained in:
Willem Melching 2020-04-24 14:12:41 -07:00
parent ca690e6ff9
commit 2d944e4042
16 changed files with 0 additions and 758 deletions

View file

@ -1,2 +0,0 @@
sender
receiver

View file

@ -1,61 +0,0 @@
CC = clang
CXX = clang++
ARCH := $(shell uname -m)
OS := $(shell uname -o)
BASEDIR = ../../../..
PHONELIBS = ../../../../phonelibs
WARN_FLAGS = -Werror=implicit-function-declaration \
-Werror=incompatible-pointer-types \
-Werror=int-conversion \
-Werror=return-type \
-Werror=format-extra-args
CFLAGS = -std=gnu11 -g -fPIC -O2 $(WARN_FLAGS) -Wall
CXXFLAGS = -std=c++11 -g -fPIC -O2 $(WARN_FLAGS) -Wall
ifeq ($(ARCH),aarch64)
CFLAGS += -mcpu=cortex-a57
CXXFLAGS += -mcpu=cortex-a57
endif
EXTRA_LIBS = -lpthread
ifeq ($(ARCH),x86_64)
BOOST_LIBS = -lboost_system -lboost_locale -lrt
else
EXTRA_LIBS += -llog -luuid
endif
.PHONY: all
all: sender receiver
receiver: receiver.o
@echo "[ LINK ] $@"
$(CXX) -fPIC -o '$@' $^ \
$(CEREAL_LIBS) \
$(BOOST_LIBS) \
$(EXTRA_LIBS)
sender: sender.o
@echo "[ LINK ] $@"
$(CXX) -fPIC -o '$@' $^ \
$(CEREAL_LIBS) \
$(BOOST_LIBS) \
$(EXTRA_LIBS)
%.o: %.cc
@echo "[ CXX ] $@"
$(CXX) $(CXXFLAGS) -MMD \
-Iinclude -I.. -I../.. \
-I../ \
-I../../ \
-c -o '$@' '$<'
.PHONY: clean
clean:
rm -f *.d sender receiver *.o

View file

@ -1,54 +0,0 @@
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>
#include <thread>
using namespace boost::interprocess;
#define N 1024
message_queue *sub_queue(const char *name){
while (true){
try {
message_queue *mq = new message_queue(open_only, name);
return mq;
}
catch(interprocess_exception &ex){
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}
message_queue *pub_queue(const char *name){
message_queue::remove(name);
message_queue *mq = new message_queue(create_only, name, 100, N);
return mq;
}
int main ()
{
message_queue::remove("queue_1");
message_queue::remove("queue_2");
message_queue *pq = pub_queue("queue_2");
message_queue *sq = sub_queue("queue_1");
std::cout << "Ready" << std::endl;
unsigned int priority;
std::size_t recvd_size;
char * rcv_msg = new char[N];
while (true){
sq->receive(rcv_msg, N, recvd_size, priority);
assert(N == recvd_size);
pq->send(rcv_msg, N, 0);
}
return 0;
}

View file

@ -1,62 +0,0 @@
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <cassert>
#define N 1024
#define MSGS 1e5
using namespace boost::interprocess;
message_queue *sub_queue(const char *name){
while (true){
try {
message_queue *mq = new message_queue(open_only, name);
return mq;
}
catch(interprocess_exception &ex){
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}
message_queue *pub_queue(const char *name){
message_queue::remove(name);
message_queue *mq = new message_queue(create_only, name, 100, N);
return mq;
}
int main ()
{
message_queue *pq = pub_queue("queue_1");
message_queue *sq = sub_queue("queue_2");
std::cout << "Ready" << std::endl;
auto start = std::chrono::steady_clock::now();
char * rcv_msg = new char[N];
char * snd_msg = new char[N];
unsigned int priority;
std::size_t recvd_size;
for (int i = 0; i < MSGS; i++){
sprintf(snd_msg, "%d", i);
pq->send(snd_msg, N, 0);
sq->receive(rcv_msg, N, recvd_size, priority);
}
auto end = std::chrono::steady_clock::now();
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count() / 1e9;
double throughput = ((double) MSGS / (double) elapsed);
std::cout << "Elapsed: " << elapsed << " s" << std::endl;
std::cout << "Throughput: " << throughput << " msg/s" << std::endl;
return 0;
}

View file

@ -1,2 +0,0 @@
receiver
sender

View file

@ -1,65 +0,0 @@
CC = clang
CXX = clang++
ARCH := $(shell uname -m)
OS := $(shell uname -o)
BASEDIR = ../../../..
PHONELIBS = ../../../../phonelibs
WARN_FLAGS = -Werror=implicit-function-declaration \
-Werror=incompatible-pointer-types \
-Werror=int-conversion \
-Werror=return-type \
-Werror=format-extra-args
CFLAGS = -std=gnu11 -g -fPIC -O2 $(WARN_FLAGS) -Wall
CXXFLAGS = -std=c++11 -g -fPIC -O2 $(WARN_FLAGS) -Wall
# NANOMSG_LIBS = -l:libnanomsg.a
ifeq ($(ARCH),aarch64)
CFLAGS += -mcpu=cortex-a57
CXXFLAGS += -mcpu=cortex-a57
endif
EXTRA_LIBS = -lpthread
ifeq ($(ARCH),x86_64)
NANOMSG_FLAGS = -I$(BASEDIR)/phonelibs/nanomsg/x64/include
NANOMSG_LIBS = -L$(BASEDIR)/phonelibs/nanomsg/x64/lib \
-lnanomsg -Wl,-rpath,$(BASEDIR)/phonelibs/nanomsg/x64/lib
else
EXTRA_LIBS += -llog -luuid
endif
.PHONY: all
all: sender receiver
receiver: receiver.o
@echo "[ LINK ] $@"
$(CXX) -fPIC -o '$@' $^ \
$(NANOMSG_LIBS) \
$(EXTRA_LIBS)
sender: sender.o
@echo "[ LINK ] $@"
$(CXX) -fPIC -o '$@' $^ \
$(NANOMSG_LIBS) \
$(EXTRA_LIBS)
%.o: %.cc
@echo "[ CXX ] $@"
$(CXX) $(CXXFLAGS) -MMD \
-Iinclude -I.. -I../.. \
$(NANOMSG_FLAGS) \
$(JSON11_FLAGS) \
$(JSON_FLAGS) \
-I../ \
-I../../ \
-c -o '$@' '$<'
.PHONY: clean
clean:
rm -f *.d sender receiver

View file

@ -1,48 +0,0 @@
#include <future>
#include <cassert>
#include <iostream>
#include <cstring>
#include <thread>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
#include <nanomsg/tcp.h>
#define N 1024
int sub_sock(const char *endpoint) {
int sock = nn_socket(AF_SP, NN_SUB);
assert(sock >= 0);
nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
assert(nn_connect(sock, endpoint) >= 0);
return sock;
}
int pub_sock(const char *endpoint){
int sock = nn_socket(AF_SP, NN_PUB);
assert(sock >= 0);
int b = 1;
nn_setsockopt(sock, NN_TCP, NN_TCP_NODELAY, &b, sizeof(b));
assert(nn_bind(sock, endpoint) >= 0);
return sock;
}
int main(int argc, char *argv[]) {
auto p_sock = pub_sock("tcp://*:10011");
auto s_sock = sub_sock("tcp://127.0.0.1:10010");
std::cout << "Ready!" << std::endl;
char * msg = new char[N];
while (true){
int bytes = nn_recv(s_sock, msg, N, 0);
nn_send(p_sock, msg, bytes, 0);
}
return 0;
}

View file

@ -1,73 +0,0 @@
#include <iostream>
#include <cassert>
#include <chrono>
#include <thread>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
#include <nanomsg/tcp.h>
#define N 1024
#define MSGS 1e5
int sub_sock(const char *endpoint) {
int sock = nn_socket(AF_SP, NN_SUB);
assert(sock >= 0);
nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
int timeout = 100;
nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout , sizeof(timeout));
assert(nn_connect(sock, endpoint) >= 0);
return sock;
}
int pub_sock(const char *endpoint){
int sock = nn_socket(AF_SP, NN_PUB);
assert(sock >= 0);
int b = 1;
nn_setsockopt(sock, NN_TCP, NN_TCP_NODELAY, &b, sizeof(b));
assert(nn_bind(sock, endpoint) >= 0);
return sock;
}
int main(int argc, char *argv[]) {
auto p_sock = pub_sock("tcp://*:10010");
auto s_sock = sub_sock("tcp://127.0.0.1:10011");
std::cout << "Ready!" << std::endl;
// auto p_sock = pub_sock("ipc:///tmp/feeds/3");
// auto s_sock = sub_sock("ipc:///tmp/feeds/2");
char * msg = new char[N];
auto start = std::chrono::steady_clock::now();
for (int i = 0; i < MSGS; i++){
sprintf(msg, "%d", i);
nn_send(p_sock, msg, N, 0);
int bytes = nn_recv(s_sock, msg, N, 0);
if (bytes < 0) {
std::cout << "Timeout" << std::endl;
}
}
auto end = std::chrono::steady_clock::now();
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count() / 1e9;
double throughput = ((double) MSGS / (double) elapsed);
std::cout << "Elapsed: " << elapsed << " s" << std::endl;
std::cout << "Throughput: " << throughput << " msg/s" << std::endl;
return 0;
}

View file

@ -1,2 +0,0 @@
receiver
sender

View file

@ -1,68 +0,0 @@
CC = clang
CXX = clang++
ARCH := $(shell uname -m)
OS := $(shell uname -o)
BASEDIR = ../../../..
PHONELIBS = ../../../../phonelibs
WARN_FLAGS = -Werror=implicit-function-declaration \
-Werror=incompatible-pointer-types \
-Werror=int-conversion \
-Werror=return-type \
-Werror=format-extra-args
CFLAGS = -std=gnu11 -g -fPIC -O2 $(WARN_FLAGS) -Wall
CXXFLAGS = -std=c++11 -g -fPIC -O2 $(WARN_FLAGS) -Wall
NNG_LIBS = -l:libnng.a
ifeq ($(ARCH),aarch64)
CFLAGS += -mcpu=cortex-a57
CXXFLAGS += -mcpu=cortex-a57
endif
EXTRA_LIBS = -lpthread
ifeq ($(ARCH),x86_64)
ZMQ_FLAGS = -I$(BASEDIR)/phonelibs/nng/x64/include
NNG_LIBS = -L$(BASEDIR)/phonelibs/nng/x64/lib \
-l:libnng.a
else
EXTRA_LIBS += -llog -luuid
endif
.PHONY: all
all: sender receiver
receiver: receiver.o
@echo "[ LINK ] $@"
$(CXX) -fPIC -o '$@' $^ \
$(CEREAL_LIBS) \
$(NNG_LIBS) \
$(EXTRA_LIBS)
sender: sender.o
@echo "[ LINK ] $@"
$(CXX) -fPIC -o '$@' $^ \
$(CEREAL_LIBS) \
$(NNG_LIBS) \
$(EXTRA_LIBS)
%.o: %.cc
@echo "[ CXX ] $@"
$(CXX) $(CXXFLAGS) -MMD \
-Iinclude -I.. -I../.. \
$(CEREAL_CXXFLAGS) \
$(ZMQ_FLAGS) \
$(JSON11_FLAGS) \
$(JSON_FLAGS) \
-I../ \
-I../../ \
-c -o '$@' '$<'
.PHONY: clean
clean:
rm -f *.d sender receiver

View file

@ -1,56 +0,0 @@
#include <future>
#include <cassert>
#include <iostream>
#include <cstring>
#include <thread>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
nng_socket sub_sock(const char *endpoint) {
nng_socket sock;
int r;
r = nng_sub0_open(&sock);
assert(r == 0);
nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0);
while (true){
r = nng_dial(sock, endpoint, NULL, 0);
if (r == 0){
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return sock;
}
nng_socket pub_sock(const char *endpoint){
nng_socket sock;
int r;
r = nng_pub0_open(&sock);
assert(r == 0);
r = nng_listen(sock, endpoint, NULL, 0);
assert(r == 0);
return sock;
}
int main(int argc, char *argv[]) {
// auto p_sock = pub_sock("tcp://*:10004");
// auto s_sock = sub_sock("tcp://127.0.0.1:10003");
auto p_sock = pub_sock("ipc:///tmp/feeds/2");
auto s_sock = sub_sock("ipc:///tmp/feeds/3");
while (true){
nng_msg *msg;
nng_recvmsg(s_sock, &msg, 0);
nng_sendmsg(p_sock, msg, 0);
}
return 0;
}

View file

@ -1,78 +0,0 @@
#include <iostream>
#include <cassert>
#include <chrono>
#include <thread>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#define N 1024
#define MSGS 1e5
nng_socket sub_sock(const char *endpoint) {
nng_socket sock;
int r;
r = nng_sub0_open(&sock);
assert(r == 0);
nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0);
nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 100);
while (true){
r = nng_dial(sock, endpoint, NULL, 0);
if (r == 0){
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return sock;
}
nng_socket pub_sock(const char *endpoint){
nng_socket sock;
int r;
r = nng_pub0_open(&sock);
assert(r == 0);
r = nng_listen(sock, endpoint, NULL, 0);
assert(r == 0);
return sock;
}
int main(int argc, char *argv[]) {
// auto p_sock = pub_sock("tcp://*:10003");
// auto s_sock = sub_sock("tcp://127.0.0.1:10004");
auto p_sock = pub_sock("ipc:///tmp/feeds/3");
auto s_sock = sub_sock("ipc:///tmp/feeds/2");
auto start = std::chrono::steady_clock::now();
for (int i = 0; i < MSGS; i++){
nng_msg *msg;
nng_msg_alloc(&msg, N);
nng_sendmsg(p_sock, msg, 0);
nng_msg *rmsg;
int r = nng_recvmsg(s_sock, &rmsg, 0);
if (r) {
std::cout << "Timeout" << std::endl;
}
}
auto end = std::chrono::steady_clock::now();
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count() / 1e9;
double throughput = ((double) MSGS / (double) elapsed);
std::cout << "Elapsed: " << elapsed << " s" << std::endl;
std::cout << "Throughput: " << throughput << " msg/s" << std::endl;
return 0;
}

View file

@ -1,2 +0,0 @@
receiver
sender

View file

@ -1,71 +0,0 @@
CC = clang
CXX = clang++
ARCH := $(shell uname -m)
OS := $(shell uname -o)
BASEDIR = ../../../../../
PHONELIBS = ../../../../../phonelibs
WARN_FLAGS = -Werror=implicit-function-declaration \
-Werror=incompatible-pointer-types \
-Werror=int-conversion \
-Werror=return-type \
-Werror=format-extra-args
CFLAGS = -std=gnu11 -g -fPIC -O2 $(WARN_FLAGS) -Wall
CXXFLAGS = -std=c++11 -g -fPIC -O2 $(WARN_FLAGS) -Wall
ZMQ_LIBS = -l:libczmq.a -l:libzmq.a
ifeq ($(ARCH),aarch64)
CFLAGS += -mcpu=cortex-a57
CXXFLAGS += -mcpu=cortex-a57
ZMQ_LIBS += -lgnustl_shared
endif
EXTRA_LIBS = -lpthread
ifeq ($(ARCH),x86_64)
ZMQ_FLAGS = -I$(BASEDIR)/phonelibs/zmq/x64/include
ZMQ_LIBS = -L$(BASEDIR)/external/zmq/lib \
-l:libczmq.a -l:libzmq.a
ZMQ_SHARED_LIBS = -L$(BASEDIR)/external/zmq/lib \
-lczmq -lzmq
else
EXTRA_LIBS += -llog -luuid
endif
.PHONY: all
all: sender receiver
receiver: receiver.o
@echo "[ LINK ] $@"
$(CXX) -fPIC -o '$@' $^ \
$(CEREAL_LIBS) \
$(ZMQ_LIBS) \
$(EXTRA_LIBS)
sender: sender.o
@echo "[ LINK ] $@"
$(CXX) -fPIC -o '$@' $^ \
$(CEREAL_LIBS) \
$(ZMQ_LIBS) \
$(EXTRA_LIBS)
%.o: %.cc
@echo "[ CXX ] $@"
$(CXX) $(CXXFLAGS) -MMD \
-Iinclude -I.. -I../.. \
$(CEREAL_CXXFLAGS) \
$(ZMQ_FLAGS) \
$(JSON11_FLAGS) \
$(JSON_FLAGS) \
-I../ \
-I../../ \
-c -o '$@' '$<'
.PHONY: clean
clean:
rm -f *.d sender receiver

View file

@ -1,49 +0,0 @@
#include <future>
#include <iostream>
#include <cstring>
#include <zmq.h>
// #define IPC
void *sub_sock(void *ctx, const char *endpoint) {
void* sock = zmq_socket(ctx, ZMQ_SUB);
zmq_connect(sock, endpoint);
zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0);
return sock;
}
void *pub_sock(void *ctx, const char *endpoint){
void * sock = zmq_socket(ctx, ZMQ_PUB);
zmq_bind(sock, endpoint);
return sock;
}
int main(int argc, char *argv[]) {
auto ctx = zmq_ctx_new();
#ifdef IPC
auto s_sock = sub_sock(ctx, "ipc:///tmp/q0");
auto p_sock = pub_sock(ctx, "ipc:///tmp/q1");
#else
auto s_sock = sub_sock(ctx, "tcp://localhost:10005");
auto p_sock = pub_sock(ctx, "tcp://*:10004");
#endif
zmq_msg_t msg;
zmq_msg_init(&msg);
while (true){
zmq_msg_recv(&msg, s_sock, 0);
zmq_msg_send(&msg, p_sock, ZMQ_DONTWAIT);
}
zmq_msg_close(&msg);
zmq_close(p_sock);
zmq_close(s_sock);
return 0;
}

View file

@ -1,65 +0,0 @@
#include <iostream>
#include <zmq.h>
#include <chrono>
#define N 1024
#define MSGS 1e5
// #define IPC
void *sub_sock(void *ctx, const char *endpoint) {
void* sock = zmq_socket(ctx, ZMQ_SUB);
zmq_connect(sock, endpoint);
zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0);
int timeout = 100;
zmq_setsockopt(sock, ZMQ_RCVTIMEO, &timeout, sizeof(int));
return sock;
}
void *pub_sock(void *ctx, const char *endpoint){
void * sock = zmq_socket(ctx, ZMQ_PUB);
zmq_bind(sock, endpoint);
return sock;
}
int main(int argc, char *argv[]) {
auto ctx = zmq_ctx_new();
#ifdef IPC
auto s_sock = sub_sock(ctx, "ipc:///tmp/q1");
auto p_sock = pub_sock(ctx, "ipc:///tmp/q0");
#else
auto s_sock = sub_sock(ctx, "tcp://127.0.0.1:10004");
auto p_sock = pub_sock(ctx, "tcp://*:10005");
#endif
zmq_msg_t msg;
zmq_msg_init_size (&msg, N);
auto start = std::chrono::steady_clock::now();
for (int i = 0; i < MSGS; i++){
zmq_msg_send(&msg, p_sock, ZMQ_DONTWAIT);
int r = zmq_msg_recv(&msg, s_sock, 0);
if (r) {
start = std::chrono::steady_clock::now();
std::cout << "Timeout" << std::endl;
}
}
auto end = std::chrono::steady_clock::now();
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count() / 1e9;
double throughput = ((double) MSGS / (double) elapsed);
std::cout << "Elapsed: " << elapsed << " s" << std::endl;
std::cout << "Throughput: " << throughput << " msg/s" << std::endl;
zmq_close(p_sock);
zmq_close(s_sock);
return 0;
}