Squashed 'cereal/' changes from 90e48c54..b8382bbb

b8382bbb steerLimitTimer should be car dependent
9a229687 add pa0 temp to ThermalData
f6f0f60e Add stock Fcw to carState
b608683f no l/r distinction for LDW
555f48d6 Add ldw alert
8e8b4a4a Remove plusFrame socket in favor of UiLayoutState
3410325c log stock AEB events
2219f2bd Add warning about not using cython version of sec_since_boot
8f1a5122 for legacy-testing reasons, better to define the used percent instead of avail
e86d9545 adding low memory event
ad238340 remove TODO
d0962b34 log mem available and cpu perc in thermald
3b753be9 Implement error handling and exceptions (#18)
a7d5bb76 add explicit dependencies on services.h
1ba64677 fix linter
c7d215b6 Added communityFeatureDisallowed event
492140a5 Added communityFeature bit detection to CarParams
266a5fed log Panda fault types
347a8661 Switch from polling on FIFOs to signal (#12)
e25bba77 no need to double build the objects
fe43a994 20Hz for radar time step is very standard
2aabf1ee Added radar time step to car params
e8ae9086 Generate capnp for java
57126a23 cereal_shared
da655cd3 Add uptime to health
f6a8e394 add test with multiple subscribers
84b3af53 comment out the debugging
4b9c942a added power save state to health packet
66be3708 run python unittest in ci
52c6db87 Run scons in CI (#14)
9414615b do need it, but only for arm
2856c37c remove gnustl_shared
7f05ee64 fix apks
e3a6bded Revert "no more makefiles"
487fbd06 don't rely on BASEDIR, and add zmq library
223e37a5 no more makefiles
da2ed115 don't link the wrong one
fe9fe2a2 scons builds the python lib now
2f81135e err, it can't build services.h
57b03f8b now we shouldn't need that yaml crap everywhere
f8e53277 bridge builds with services.h
2b0cb608 noOutput safety mode is now called silent
83880d51 add msgq tests
bcad1848 msgq: dont block when fifo does not exists
b4b26782 Default to zmq
473e2912 fix compilation in docker
30aaaddc msgq: try again when no timeout on poll but also no message
c4f2ad53 msgq: make sure read_fifos is initalized so we dont close random fds
4e513a85 msgq: dont clean up uninitialized sockerts
c008b630 also remove the fifo from disk
ef64eb27 MSGQ stability improvements when opening and closing lots of queues
e147abcc Revert "Revert "deprecate irpwr""
932dc32e Revert "deprecate irpwr"
a6844150 disengage
ec27e18c capnpc also generated the header files
ee52ab9e deprecate irpwr
301c74c8 Merge branch 'master' of github.com:commaai/cereal
6da7d55a add front frame
a5944eb4 add conflate parameter for SubSocket::create
ca8df170 Add fault status to health
ef4ded06 add conflate support in SubSocket constructor
7fd314af update scons build file
93d814e4 add saturated flags to indi and lqr logs
50302fee add steeringRateLimited to car.capnp
05e3513d add msgq readme
a6759a95 faster make
94b73778 Add struct to log FW version
64ce0b5f add scons build
dc9ad18a add debug print statement on SIGINT
4a612698 Merge pull request #10 from commaai/msgq
4873449a use recv one or none after poll
a054864b default to msgq
fbc4a4cf oops bad number
5067cf4c add meta
cbd02865 fix export prefix and make shared library world readable
c2730541 add c exports for jni usage
e77f41ef zmq already sets the errno correctly
3196cf69 Fix service list path in bridge
d35515a2 add all msgq files, but dont use as default
a68a38fa Don't delete context from python side only
bd46c225 Revert "zmq_ctx_term is blocking"
a1fc26b8 zmq_ctx_term is blocking
09021820 remote address support
21a35361 only delete subsocket when created by same object
34df7351 remove extra underscore from __dealloc__
c8748f86 fix internal refs
79b2fbf7 fixups
23ad2563 import messaging and services

git-subtree-dir: cereal
git-subtree-split: b8382bbb2b8156f2f1d7e1c1b42b46c54d85761f
pull/898/head^2
Vehicle Researcher 2019-12-13 13:02:45 -08:00
parent 047924cb35
commit e3b21173a5
35 changed files with 19871 additions and 71 deletions

1
.dockerignore 100644
View File

@ -0,0 +1 @@
.sconsign.dblite

8
.gitignore vendored
View File

@ -3,4 +3,12 @@ node_modules
package-lock.json
*.pyc
__pycache__
.*.swp
.*.swo
libcereal*.a
libmessaging.*
libmessaging_shared.*
services.h
.sconsign.dblite
libcereal_shared.so

19
Dockerfile 100644
View File

@ -0,0 +1,19 @@
from ubuntu:16.04
RUN apt-get update && apt-get install -y libzmq3-dev clang wget git autoconf libtool curl make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl
RUN curl -L https://github.com/pyenv/pyenv-installer/raw/master/bin/pyenv-installer | bash
ENV PATH="/root/.pyenv/bin:/root/.pyenv/shims:${PATH}"
RUN pyenv install 3.7.3
RUN pyenv global 3.7.3
RUN pyenv rehash
RUN pip3 install pyyaml==5.1.2 Cython==0.29.14 scons==3.1.1 pycapnp==0.6.4
WORKDIR /project/cereal
COPY install_capnp.sh .
RUN ./install_capnp.sh
ENV PYTHONPATH=/project
COPY . .
RUN scons -c && scons -j$(nproc)

View File

@ -1,62 +0,0 @@
PWD := $(shell pwd)
SRCS := log.capnp car.capnp
GENS := gen/cpp/car.capnp.c++ gen/cpp/log.capnp.c++
JS := gen/js/car.capnp.js gen/js/log.capnp.js
UNAME_M ?= $(shell uname -m)
GENS += gen/c/car.capnp.c gen/c/log.capnp.c gen/c/include/c++.capnp.h gen/c/include/java.capnp.h
ifeq ($(UNAME_M),x86_64)
ifneq (, $(shell which capnpc-java))
GENS += gen/java/Car.java gen/java/Log.java
else
$(warning capnpc-java not found, skipping java build)
endif
endif
ifeq ($(UNAME_M),aarch64)
CAPNPC=PATH=$(PWD)/../phonelibs/capnp-cpp/aarch64/bin/:$$PATH capnpc
else
CAPNPC=capnpc
endif
.PHONY: all
all: $(GENS)
js: $(JS)
.PHONY: clean
clean:
rm -rf gen
rm -rf node_modules
rm -rf package-lock.json
gen/c/%.capnp.c: %.capnp
@echo "[ CAPNPC C ] $@"
mkdir -p gen/c/
$(CAPNPC) '$<' -o c:gen/c/
gen/js/%.capnp.js: %.capnp
@echo "[ CAPNPC JavaScript ] $@"
mkdir -p gen/js/
sh ./generate_javascript.sh
gen/cpp/%.capnp.c++: %.capnp
@echo "[ CAPNPC C++ ] $@"
mkdir -p gen/cpp/
$(CAPNPC) '$<' -o c++:gen/cpp/
gen/java/Car.java gen/java/Log.java: $(SRCS)
@echo "[ CAPNPC java ] $@"
mkdir -p gen/java/
$(CAPNPC) $^ -o java:gen/java
# c-capnproto needs some empty headers
gen/c/include/c++.capnp.h gen/c/include/java.capnp.h:
mkdir -p gen/c/include
touch '$@'

68
SConscript 100644
View File

@ -0,0 +1,68 @@
Import('env', 'arch', 'zmq')
gen_dir = Dir('gen')
messaging_dir = Dir('messaging')
# TODO: remove src-prefix and cereal from command string. can we set working directory?
env.Command(["gen/c/include/c++.capnp.h", "gen/c/include/java.capnp.h"], [], "mkdir -p " + gen_dir.path + "/c/include && touch $TARGETS")
env.Command(
['gen/c/car.capnp.c', 'gen/c/log.capnp.c', 'gen/c/car.capnp.h', 'gen/c/log.capnp.h'],
['car.capnp', 'log.capnp'],
'capnpc $SOURCES --src-prefix=cereal -o c:' + gen_dir.path + '/c/')
env.Command(
['gen/cpp/car.capnp.c++', 'gen/cpp/log.capnp.c++', 'gen/cpp/car.capnp.h', 'gen/cpp/log.capnp.h'],
['car.capnp', 'log.capnp'],
'capnpc $SOURCES --src-prefix=cereal -o c++:' + gen_dir.path + '/cpp/')
import shutil
if shutil.which('capnpc-java'):
env.Command(
['gen/java/Car.java', 'gen/java/Log.java'],
['car.capnp', 'log.capnp'],
'capnpc $SOURCES --src-prefix=cereal -o java:' + gen_dir.path + '/java/')
# TODO: remove non shared cereal and messaging
cereal_objects = env.SharedObject([
'gen/c/car.capnp.c',
'gen/c/log.capnp.c',
'gen/cpp/car.capnp.c++',
'gen/cpp/log.capnp.c++',
])
env.Library('cereal', cereal_objects)
env.SharedLibrary('cereal_shared', cereal_objects)
cereal_dir = Dir('.')
services_h = env.Command(
['services.h'],
['service_list.yaml', 'services.py'],
'python3 ' + cereal_dir.path + '/services.py > $TARGET')
messaging_objects = env.SharedObject([
'messaging/messaging.cc',
'messaging/impl_zmq.cc',
'messaging/impl_msgq.cc',
'messaging/msgq.cc',
])
messaging_lib = env.Library('messaging', messaging_objects)
Depends('messaging/impl_zmq.cc', services_h)
# note, this rebuilds the deps shared, zmq is statically linked to make APK happy
# TODO: get APK to load system zmq to remove the static link
shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else []
env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib)
env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq'])
Depends('messaging/bridge.cc', services_h)
# different target?
#env.Program('messaging/demo', ['messaging/demo.cc'], LIBS=[messaging_lib, 'zmq'])
env.Command(['messaging/messaging_pyx.so'],
[messaging_lib, 'messaging/messaging_pyx_setup.py', 'messaging/messaging_pyx.pyx', 'messaging/messaging.pxd'],
"cd " + messaging_dir.path + " && python3 messaging_pyx_setup.py build_ext --inplace")
if GetOption('test'):
env.Program('messaging/test_runner', ['messaging/test_runner.cc', 'messaging/msgq_tests.cc'], LIBS=[messaging_lib])

49
SConstruct 100644
View File

@ -0,0 +1,49 @@
import os
import subprocess
zmq = 'zmq'
arch = subprocess.check_output(["uname", "-m"], encoding='utf8').rstrip()
cereal_dir = Dir('.')
cpppath = [
cereal_dir,
'/usr/lib/include',
]
AddOption('--test',
action='store_true',
help='build test files')
AddOption('--asan',
action='store_true',
help='turn on ASAN')
ccflags_asan = ["-fsanitize=address", "-fno-omit-frame-pointer"] if GetOption('asan') else []
ldflags_asan = ["-fsanitize=address"] if GetOption('asan') else []
env = Environment(
ENV=os.environ,
CC='clang',
CXX='clang++',
CCFLAGS=[
"-g",
"-fPIC",
"-O2",
"-Werror=implicit-function-declaration",
"-Werror=incompatible-pointer-types",
"-Werror=int-conversion",
"-Werror=return-type",
"-Werror=format-extra-args",
] + ccflags_asan,
LDFLAGS=ldflags_asan,
LINKFLAGS=ldflags_asan,
CFLAGS="-std=gnu11",
CXXFLAGS="-std=c++14",
CPPPATH=cpppath,
)
Export('env', 'zmq', 'arch')
SConscript(['SConscript'])

View File

@ -0,0 +1,14 @@
pr: none
pool:
vmImage: 'ubuntu-16.04'
steps:
- script: |
set -e
docker build -t cereal .
docker run cereal bash -c "scons --test --asan -j$(nproc) && messaging/test_runner"
docker run cereal bash -c "ZMQ=1 python -m unittest discover ."
docker run cereal bash -c "MSGQ=1 python -m unittest discover ."
displayName: 'Run Tests'

View File

@ -84,6 +84,10 @@ struct CarEvent @0x9b1657f34caf3ad3 {
laneChange @59;
invalidGiraffeToyota @60;
internetConnectivityNeeded @61;
communityFeatureDisallowed @62;
lowMemory @63;
stockAeb @64;
ldw @65;
}
}
@ -117,6 +121,9 @@ struct CarState {
steeringTorque @8 :Float32; # TODO: standardize units
steeringTorqueEps @27 :Float32; # TODO: standardize units
steeringPressed @9 :Bool; # if the user is using the steering wheel
steeringRateLimited @29 :Bool; # if the torque is limited by the rate limiter
stockAeb @30 :Bool;
stockFcw @31 :Bool;
# cruise state
cruiseState @10 :CruiseState;
@ -170,7 +177,6 @@ struct CarState {
manumatic @9;
}
# send on change
struct ButtonEvent {
pressed @0 :Bool;
@ -314,7 +320,7 @@ struct CarParams {
minEnableSpeed @7 :Float32;
minSteerSpeed @8 :Float32;
safetyModel @9 :SafetyModel;
safetyModelPassive @42 :SafetyModel = noOutput;
safetyModelPassive @42 :SafetyModel = silent;
safetyParam @10 :Int16;
steerMaxBP @11 :List(Float32);
@ -324,7 +330,6 @@ struct CarParams {
brakeMaxBP @15 :List(Float32);
brakeMaxV @16 :List(Float32);
# things about the car in the manual
mass @17 :Float32; # [kg] running weight
wheelbase @18 :Float32; # [m] distance from rear to front axle
@ -345,6 +350,7 @@ struct CarParams {
}
steerLimitAlert @28 :Bool;
steerLimitTimer @47 :Float32; # time before steerLimitAlert is issued
vEgoStopping @29 :Float32; # Speed at which the car goes into stopping state
directAccelControl @30 :Bool; # Does the car have direct accel control or just gas/brake
@ -360,6 +366,9 @@ struct CarParams {
isPandaBlack @39: Bool;
dashcamOnly @41: Bool;
transmissionType @43 :TransmissionType;
carFw @44 :List(CarFw);
radarTimeStep @45: Float32 = 0.05; # time delta between radar updates, 20Hz is very standard
communityFeature @46: Bool; # true if a community maintained feature is detected
struct LateralPIDTuning {
kpBP @0 :List(Float32);
@ -378,7 +387,6 @@ struct CarParams {
deadzoneV @5 :List(Float32);
}
struct LateralINDITuning {
outerLoopGain @0 :Float32;
innerLoopGain @1 :Float32;
@ -401,7 +409,7 @@ struct CarParams {
}
enum SafetyModel {
noOutput @0;
silent @0;
honda @1;
toyota @2;
elm327 @3;
@ -420,6 +428,7 @@ struct CarParams {
toyotaIpas @16;
allOutput @17;
gmAscm @18;
noOutput @19; # like silent but with silent CAN TXs
}
enum SteerControlType {
@ -432,4 +441,16 @@ struct CarParams {
automatic @1;
manual @2;
}
struct CarFw {
ecu @0 :Ecu;
fwVersion @1 :Text;
}
enum Ecu {
eps @0;
esp @1;
fwdRadar @2;
fwdCamera @3;
}
}

View File

@ -8,7 +8,8 @@ tar xvf capnproto-c++-${VERSION}.tar.gz
cd capnproto-c++-${VERSION}
CXXFLAGS="-fPIC" ./configure
make -j4
make -j$(nproc)
make install
# manually build binaries statically
g++ -std=gnu++11 -I./src -I./src -DKJ_HEADER_WARNINGS -DCAPNP_HEADER_WARNINGS -DCAPNP_INCLUDE_DIR=\"/usr/local/include\" -pthread -O2 -DNDEBUG -pthread -pthread -o .libs/capnp src/capnp/compiler/module-loader.o src/capnp/compiler/capnp.o ./.libs/libcapnpc.a ./.libs/libcapnp.a ./.libs/libkj.a -lpthread -pthread
@ -18,7 +19,6 @@ g++ -std=gnu++11 -I./src -I./src -DKJ_HEADER_WARNINGS -DCAPNP_HEADER_WARNINGS -D
g++ -std=gnu++11 -I./src -I./src -DKJ_HEADER_WARNINGS -DCAPNP_HEADER_WARNINGS -DCAPNP_INCLUDE_DIR=\"/usr/local/include\" -pthread -O2 -DNDEBUG -pthread -pthread -o .libs/capnpc-capnp src/capnp/compiler/capnpc-capnp.o ./.libs/libcapnp.a ./.libs/libkj.a -lpthread -pthread
cp .libs/capnp /usr/local/bin/
ln -s /usr/local/bin/capnp /usr/local/bin/capnpc
cp .libs/capnpc-c++ /usr/local/bin/
cp .libs/capnpc-capnp /usr/local/bin/
cp .libs/*.a /usr/local/lib
@ -30,7 +30,8 @@ cd c-capnproto
git submodule update --init --recursive
autoreconf -f -i -s
CXXFLAGS="-fPIC" ./configure
make -j4
make -j$(nproc)
make install
# manually build binaries statically
gcc -fPIC -o .libs/capnpc-c compiler/capnpc-c.o compiler/schema.capnp.o compiler/str.o ./.libs/libcapnp_c.a

View File

@ -126,6 +126,7 @@ struct FrameData {
lensErr @13 :Float32;
lensTruePos @14 :Float32;
image @6 :Data;
gainFrac @15 :Float32;
frameType @7 :FrameType;
timestampSof @8 :UInt64;
@ -137,6 +138,7 @@ struct FrameData {
unknown @0;
neo @1;
chffrAndroid @2;
front @3;
}
struct AndroidCaptureResult {
@ -268,6 +270,7 @@ struct ThermalData {
mem @4 :UInt16;
gpu @5 :UInt16;
bat @6 :UInt32;
pa0 @21 :UInt16;
# not thermal
freeSpace @7 :Float32;
@ -285,6 +288,9 @@ struct ThermalData {
chargingError @17 :Bool;
chargingDisabled @18 :Bool;
memUsedPercent @19 :Int8;
cpuPerc @20 :Int8;
enum ThermalStatus {
green @0; # all processes run
yellow @1; # critical processes run (kill uploader), engage still allowed
@ -310,6 +316,20 @@ struct HealthData {
usbPowerMode @12 :UsbPowerMode;
ignitionCan @13 :Bool;
safetyModel @14 :Car.CarParams.SafetyModel;
faultStatus @15 :FaultStatus;
powerSaveEnabled @16 :Bool;
uptime @17 :UInt32;
faults @18 :List(FaultType);
enum FaultStatus {
none @0;
faultTemp @1;
faultPerm @2;
}
enum FaultType {
relayMalfunction @0;
}
enum HwType {
unknown @0;
@ -509,6 +529,7 @@ struct ControlsState @0x97ff69c53601abf1 {
delayedOutput @7 :Float32;
delta @8 :Float32;
output @9 :Float32;
saturated @10 :Bool;
}
struct LateralPIDState {
@ -529,6 +550,7 @@ struct ControlsState @0x97ff69c53601abf1 {
i @2 :Float32;
output @3 :Float32;
lqrOutput @4 :Float32;
saturated @5 :Bool;
}
@ -552,6 +574,7 @@ struct ModelData {
settings @5 :ModelSettings;
leadFuture @7 :LeadData;
speed @8 :List(Float32);
meta @10 :MetaData;
struct PathData {
points @0 :List(Float32);
@ -582,6 +605,13 @@ struct ModelData {
yuvCorrection @5 :List(Float32);
inputTransform @6 :List(Float32);
}
struct MetaData {
engagedProb @0 :Float32;
desirePrediction @1 :List(Float32);
brakeDisengageProb @2 :Float32;
gasDisengageProb @3 :Float32;
steerOverrideProb @4 :Float32;
}
}
struct CalibrationFeatures {
@ -1649,6 +1679,7 @@ struct UiLayoutState {
home @0;
music @1;
nav @2;
settings @3;
}
}
@ -1725,7 +1756,7 @@ struct DriverMonitoring {
rightEyeProb @7 :Float32;
leftBlinkProb @8 :Float32;
rightBlinkProb @9 :Float32;
irPwr @10 :Float32;
irPwrDEPRECATED @10 :Float32;
}
struct Boot {
@ -1857,5 +1888,6 @@ struct Event {
thumbnail @66: Thumbnail;
carEvents @68: List(Car.CarEvent);
carParams @69: Car.CarParams;
frontFrame @70: FrameData;
}
}

10
messaging/.gitignore vendored 100644
View File

@ -0,0 +1,10 @@
demo
bridge
test_runner
*.o
*.os
*.d
*.a
*.so
messaging_pyx.cpp
build/

View File

@ -0,0 +1,219 @@
# must be build with scons
from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error
from .messaging_pyx import MultiplePublishersError, MessagingError # pylint: disable=no-name-in-module, import-error
assert MultiplePublishersError
assert MessagingError
from cereal import log
from cereal.services import service_list
# sec_since_boot is faster, but allow to run standalone too
try:
from common.realtime import sec_since_boot
except ImportError:
import time
sec_since_boot = time.time
print("Warning, using python time.time() instead of faster sec_since_boot")
context = Context()
def new_message():
dat = log.Event.new_message()
dat.logMonoTime = int(sec_since_boot() * 1e9)
dat.valid = True
return dat
def pub_sock(endpoint):
sock = PubSocket()
sock.connect(context, endpoint)
return sock
def sub_sock(endpoint, poller=None, addr="127.0.0.1", conflate=False, timeout=None):
sock = SubSocket()
addr = addr.encode('utf8')
sock.connect(context, endpoint, addr, conflate)
if timeout is not None:
sock.setTimeout(timeout)
if poller is not None:
poller.registerSocket(sock)
return sock
def drain_sock_raw(sock, wait_for_one=False):
"""Receive all message currently available on the queue"""
ret = []
while 1:
if wait_for_one and len(ret) == 0:
dat = sock.receive()
else:
dat = sock.receive(non_blocking=True)
if dat is None:
break
ret.append(dat)
return ret
def drain_sock(sock, wait_for_one=False):
"""Receive all message currently available on the queue"""
ret = []
while 1:
if wait_for_one and len(ret) == 0:
dat = sock.receive()
else:
dat = sock.receive(non_blocking=True)
if dat is None: # Timeout hit
break
dat = log.Event.from_bytes(dat)
ret.append(dat)
return ret
# TODO: print when we drop packets?
def recv_sock(sock, wait=False):
"""Same as drain sock, but only returns latest message. Consider using conflate instead."""
dat = None
while 1:
if wait and dat is None:
rcv = sock.receive()
else:
rcv = sock.receive(non_blocking=True)
if rcv is None: # Timeout hit
break
dat = rcv
if dat is not None:
dat = log.Event.from_bytes(dat)
return dat
def recv_one(sock):
dat = sock.receive()
if dat is not None:
dat = log.Event.from_bytes(dat)
return dat
def recv_one_or_none(sock):
dat = sock.receive(non_blocking=True)
if dat is not None:
dat = log.Event.from_bytes(dat)
return dat
def recv_one_retry(sock):
"""Keep receiving until we get a message"""
while True:
dat = sock.receive()
if dat is not None:
return log.Event.from_bytes(dat)
def get_one_can(logcan):
while True:
can = recv_one_retry(logcan)
if len(can.can) > 0:
return can
class SubMaster():
def __init__(self, services, ignore_alive=None, addr="127.0.0.1"):
self.poller = Poller()
self.frame = -1
self.updated = {s : False for s in services}
self.rcv_time = {s : 0. for s in services}
self.rcv_frame = {s : 0 for s in services}
self.alive = {s : False for s in services}
self.sock = {}
self.freq = {}
self.data = {}
self.logMonoTime = {}
self.valid = {}
if ignore_alive is not None:
self.ignore_alive = ignore_alive
else:
self.ignore_alive = []
for s in services:
if addr is not None:
self.sock[s] = sub_sock(s, poller=self.poller, addr=addr, conflate=True)
self.freq[s] = service_list[s].frequency
data = new_message()
if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan',
'ethernetData', 'cellInfo', 'wifiScan',
'trafficEvents', 'orbObservation', 'carEvents']:
data.init(s, 0)
else:
data.init(s)
self.data[s] = getattr(data, s)
self.logMonoTime[s] = 0
self.valid[s] = data.valid
def __getitem__(self, s):
return self.data[s]
def update(self, timeout=1000):
msgs = []
for sock in self.poller.poll(timeout):
msgs.append(recv_one_or_none(sock))
self.update_msgs(sec_since_boot(), msgs)
def update_msgs(self, cur_time, msgs):
# TODO: add optional input that specify the service to wait for
self.frame += 1
self.updated = dict.fromkeys(self.updated, False)
for msg in msgs:
if msg is None:
continue
s = msg.which()
self.updated[s] = True
self.rcv_time[s] = cur_time
self.rcv_frame[s] = self.frame
self.data[s] = getattr(msg, s)
self.logMonoTime[s] = msg.logMonoTime
self.valid[s] = msg.valid
for s in self.data:
# arbitrary small number to avoid float comparison. If freq is 0, we can skip the check
if self.freq[s] > 1e-5:
# alive if delay is within 10x the expected frequency
self.alive[s] = (cur_time - self.rcv_time[s]) < (10. / self.freq[s])
else:
self.alive[s] = True
def all_alive(self, service_list=None):
if service_list is None: # check all
service_list = self.alive.keys()
return all(self.alive[s] for s in service_list if s not in self.ignore_alive)
def all_valid(self, service_list=None):
if service_list is None: # check all
service_list = self.valid.keys()
return all(self.valid[s] for s in service_list)
def all_alive_and_valid(self, service_list=None):
if service_list is None: # check all
service_list = self.alive.keys()
return self.all_alive(service_list=service_list) and self.all_valid(service_list=service_list)
class PubMaster():
def __init__(self, services):
self.sock = {}
for s in services:
self.sock[s] = pub_sock(s)
def send(self, s, dat):
# accept either bytes or capnp builder
if not isinstance(dat, bytes):
dat = dat.to_bytes()
self.sock[s].send(dat)

View File

@ -0,0 +1,62 @@
#include <iostream>
#include <string>
#include <cassert>
#include <csignal>
#include <map>
#include "services.h"
#include "impl_msgq.hpp"
#include "impl_zmq.hpp"
void sigpipe_handler(int sig) {
assert(sig == SIGPIPE);
std::cout << "SIGPIPE received" << std::endl;
}
static std::vector<std::string> get_services() {
std::vector<std::string> name_list;
for (const auto& it : services) {
std::string name = it.name;
if (name == "plusFrame" || name == "uiLayoutState") continue;
name_list.push_back(name);
}
return name_list;
}
int main(void){
signal(SIGPIPE, (sighandler_t)sigpipe_handler);
auto endpoints = get_services();
std::map<SubSocket*, PubSocket*> sub2pub;
Context *zmq_context = new ZMQContext();
Context *msgq_context = new MSGQContext();
Poller *poller = new MSGQPoller();
for (auto endpoint: endpoints){
SubSocket * msgq_sock = new MSGQSubSocket();
msgq_sock->connect(msgq_context, endpoint, "127.0.0.1", false);
poller->registerSocket(msgq_sock);
PubSocket * zmq_sock = new ZMQPubSocket();
zmq_sock->connect(zmq_context, endpoint);
sub2pub[msgq_sock] = zmq_sock;
}
while (true){
for (auto sub_sock : poller->poll(100)){
Message * msg = sub_sock->receive();
if (msg == NULL) continue;
sub2pub[sub_sock]->sendMessage(msg);
delete msg;
}
}
return 0;
}

File diff suppressed because it is too large Load Diff

50
messaging/demo.cc 100644
View File

@ -0,0 +1,50 @@
#include <iostream>
#include <cstddef>
#include <chrono>
#include <thread>
#include <cassert>
#include "messaging.hpp"
#include "impl_zmq.hpp"
#define MSGS 1e5
int main() {
Context * c = Context::create();
SubSocket * sub_sock = SubSocket::create(c, "controlsState");
PubSocket * pub_sock = PubSocket::create(c, "controlsState");
char data[8];
Poller * poller = Poller::create({sub_sock});
auto start = std::chrono::steady_clock::now();
for (uint64_t i = 0; i < MSGS; i++){
*(uint64_t*)data = i;
pub_sock->send(data, 8);
auto r = poller->poll(100);
for (auto p : r){
Message * m = p->receive();
uint64_t ii = *(uint64_t*)m->getData();
assert(i == ii);
delete m;
}
}
auto end = std::chrono::steady_clock::now();
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count() / 1e9;
double throughput = ((double) MSGS / (double) elapsed);
std::cout << throughput << " msg/s" << std::endl;
delete poller;
delete sub_sock;
delete pub_sock;
delete c;
return 0;
}

30
messaging/demo.py 100644
View File

@ -0,0 +1,30 @@
import time
from messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error
MSGS = 1e5
if __name__ == "__main__":
c = Context()
sub_sock = SubSocket()
pub_sock = PubSocket()
sub_sock.connect(c, "controlsState")
pub_sock.connect(c, "controlsState")
poller = Poller()
poller.registerSocket(sub_sock)
t = time.time()
for i in range(int(MSGS)):
bts = i.to_bytes(4, 'little')
pub_sock.send(bts)
for s in poller.poll(100):
dat = s.receive()
ii = int.from_bytes(dat, 'little')
assert(i == ii)
dt = time.time() - t
print("%.1f msg/s" % (MSGS / dt))

View File

@ -0,0 +1,190 @@
#include <cassert>
#include <cstring>
#include <iostream>
#include <cstdlib>
#include <csignal>
#include <cerrno>
#include "impl_msgq.hpp"
volatile sig_atomic_t msgq_do_exit = 0;
void sig_handler(int signal) {
assert(signal == SIGINT || signal == SIGTERM);
msgq_do_exit = 1;
}
MSGQContext::MSGQContext() {
}
MSGQContext::~MSGQContext() {
}
void MSGQMessage::init(size_t sz) {
size = sz;
data = new char[size];
}
void MSGQMessage::init(char * d, size_t sz) {
size = sz;
data = new char[size];
memcpy(data, d, size);
}
void MSGQMessage::takeOwnership(char * d, size_t sz) {
size = sz;
data = d;
}
void MSGQMessage::close() {
if (size > 0){
delete[] data;
}
size = 0;
}
MSGQMessage::~MSGQMessage() {
this->close();
}
int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){
assert(context);
assert(address == "127.0.0.1");
q = new msgq_queue_t;
int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
if (r != 0){
return r;
}
msgq_init_subscriber(q);
if (conflate){
q->read_conflate = true;
}
timeout = -1;
return 0;
}
Message * MSGQSubSocket::receive(bool non_blocking){
msgq_do_exit = 0;
void (*prev_handler_sigint)(int);
void (*prev_handler_sigterm)(int);
if (!non_blocking){
prev_handler_sigint = std::signal(SIGINT, sig_handler);
prev_handler_sigterm = std::signal(SIGTERM, sig_handler);
}
msgq_msg_t msg;
MSGQMessage *r = NULL;
r = NULL;
int rc = msgq_msg_recv(&msg, q);
// Hack to implement blocking read with a poller. Don't use this
while (!non_blocking && rc == 0 && msgq_do_exit == 0){
msgq_pollitem_t items[1];
items[0].q = q;
int t = (timeout != -1) ? timeout : 100;
int n = msgq_poll(items, 1, t);
rc = msgq_msg_recv(&msg, q);
// The poll indicated a message was ready, but the receive failed. Try again
if (n == 1 && rc == 0){
continue;
}
if (timeout != -1){
break;
}
}
if (rc > 0){
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
errno = msgq_do_exit ? EINTR : 0;
if (!non_blocking){
std::signal(SIGINT, prev_handler_sigint);
std::signal(SIGTERM, prev_handler_sigterm);
}
return (Message*)r;
}
void MSGQSubSocket::setTimeout(int t){
timeout = t;
}
MSGQSubSocket::~MSGQSubSocket(){
if (q != NULL){
msgq_close_queue(q);
delete q;
}
}
int MSGQPubSocket::connect(Context *context, std::string endpoint){
assert(context);
q = new msgq_queue_t;
msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
msgq_init_publisher(q);
return 0;
}
int MSGQPubSocket::sendMessage(Message *message){
msgq_msg_t msg;
msg.data = message->getData();
msg.size = message->getSize();
return msgq_msg_send(&msg, q);
}
int MSGQPubSocket::send(char *data, size_t size){
msgq_msg_t msg;
msg.data = data;
msg.size = size;
return msgq_msg_send(&msg, q);
}
MSGQPubSocket::~MSGQPubSocket(){
if (q != NULL){
msgq_close_queue(q);
delete q;
}
}
void MSGQPoller::registerSocket(SubSocket * socket){
assert(num_polls + 1 < MAX_POLLERS);
polls[num_polls].q = (msgq_queue_t*)socket->getRawSocket();
sockets.push_back(socket);
num_polls++;
}
std::vector<SubSocket*> MSGQPoller::poll(int timeout){
std::vector<SubSocket*> r;
msgq_poll(polls, num_polls, timeout);
for (size_t i = 0; i < num_polls; i++){
if (polls[i].revents){
r.push_back(sockets[i]);
}
}
return r;
}

View File

@ -0,0 +1,64 @@
#pragma once
#include "messaging.hpp"
#include "msgq.hpp"
#include <zmq.h>
#include <string>
#define MAX_POLLERS 128
class MSGQContext : public Context {
private:
void * context = NULL;
public:
MSGQContext();
void * getRawContext() {return context;}
~MSGQContext();
};
class MSGQMessage : public Message {
private:
char * data;
size_t size;
public:
void init(size_t size);
void init(char *data, size_t size);
void takeOwnership(char *data, size_t size);
size_t getSize(){return size;}
char * getData(){return data;}
void close();
~MSGQMessage();
};
class MSGQSubSocket : public SubSocket {
private:
msgq_queue_t * q = NULL;
int timeout;
public:
int connect(Context *context, std::string endpoint, std::string address, bool conflate=false);
void setTimeout(int timeout);
void * getRawSocket() {return (void*)q;}
Message *receive(bool non_blocking=false);
~MSGQSubSocket();
};
class MSGQPubSocket : public PubSocket {
private:
msgq_queue_t * q = NULL;
public:
int connect(Context *context, std::string endpoint);
int sendMessage(Message *message);
int send(char *data, size_t size);
~MSGQPubSocket();
};
class MSGQPoller : public Poller {
private:
std::vector<SubSocket*> sockets;
msgq_pollitem_t polls[MAX_POLLERS];
size_t num_polls = 0;
public:
void registerSocket(SubSocket *socket);
std::vector<SubSocket*> poll(int timeout);
~MSGQPoller(){};
};

View File

@ -0,0 +1,155 @@
#include <cassert>
#include <cstring>
#include <iostream>
#include <cstdlib>
#include <cerrno>
#include <zmq.h>
#include "services.h"
#include "impl_zmq.hpp"
static int get_port(std::string endpoint) {
int port = -1;
for (const auto& it : services) {
std::string name = it.name;
if (name == endpoint) {
port = it.port;
break;
}
}
assert(port >= 0);
return port;
}
ZMQContext::ZMQContext() {
context = zmq_ctx_new();
}
ZMQContext::~ZMQContext() {
zmq_ctx_term(context);
}
void ZMQMessage::init(size_t sz) {
size = sz;
data = new char[size];
}
void ZMQMessage::init(char * d, size_t sz) {
size = sz;
data = new char[size];
memcpy(data, d, size);
}
void ZMQMessage::close() {
if (size > 0){
delete[] data;
}
size = 0;
}
ZMQMessage::~ZMQMessage() {
this->close();
}
int ZMQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){
sock = zmq_socket(context->getRawContext(), ZMQ_SUB);
if (sock == NULL){
return -1;
}
zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0);
if (conflate){
int arg = 1;
zmq_setsockopt(sock, ZMQ_CONFLATE, &arg, sizeof(int));
}
int reconnect_ivl = 500;
zmq_setsockopt(sock, ZMQ_RECONNECT_IVL_MAX, &reconnect_ivl, sizeof(reconnect_ivl));
full_endpoint = "tcp://" + address + ":";
full_endpoint += std::to_string(get_port(endpoint));
return zmq_connect(sock, full_endpoint.c_str());
}
Message * ZMQSubSocket::receive(bool non_blocking){
zmq_msg_t msg;
assert(zmq_msg_init(&msg) == 0);
int flags = non_blocking ? ZMQ_DONTWAIT : 0;
int rc = zmq_msg_recv(&msg, sock, flags);
Message *r = NULL;
if (rc >= 0){
// Make a copy to ensure the data is aligned
r = new ZMQMessage;
r->init((char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
}
zmq_msg_close(&msg);
return r;
}
void ZMQSubSocket::setTimeout(int timeout){
zmq_setsockopt(sock, ZMQ_RCVTIMEO, &timeout, sizeof(int));
}
ZMQSubSocket::~ZMQSubSocket(){
zmq_close(sock);
}
int ZMQPubSocket::connect(Context *context, std::string endpoint){
sock = zmq_socket(context->getRawContext(), ZMQ_PUB);
if (sock == NULL){
return -1;
}
full_endpoint = "tcp://*:";
full_endpoint += std::to_string(get_port(endpoint));
return zmq_bind(sock, full_endpoint.c_str());
}
int ZMQPubSocket::sendMessage(Message *message){
return zmq_send(sock, message->getData(), message->getSize(), ZMQ_DONTWAIT);
}
int ZMQPubSocket::send(char *data, size_t size){
return zmq_send(sock, data, size, ZMQ_DONTWAIT);
}
ZMQPubSocket::~ZMQPubSocket(){
zmq_close(sock);
}
void ZMQPoller::registerSocket(SubSocket * socket){
assert(num_polls + 1 < MAX_POLLERS);
polls[num_polls].socket = socket->getRawSocket();
polls[num_polls].events = ZMQ_POLLIN;
sockets.push_back(socket);
num_polls++;
}
std::vector<SubSocket*> ZMQPoller::poll(int timeout){
std::vector<SubSocket*> r;
int rc = zmq_poll(polls, num_polls, timeout);
if (rc < 0){
return r;
}
for (size_t i = 0; i < num_polls; i++){
if (polls[i].revents){
r.push_back(sockets[i]);
}
}
return r;
}

View File

@ -0,0 +1,63 @@
#pragma once
#include "messaging.hpp"
#include <zmq.h>
#include <string>
#define MAX_POLLERS 128
class ZMQContext : public Context {
private:
void * context = NULL;
public:
ZMQContext();
void * getRawContext() {return context;}
~ZMQContext();
};
class ZMQMessage : public Message {
private:
char * data;
size_t size;
public:
void init(size_t size);
void init(char *data, size_t size);
size_t getSize(){return size;}
char * getData(){return data;}
void close();
~ZMQMessage();
};
class ZMQSubSocket : public SubSocket {
private:
void * sock;
std::string full_endpoint;
public:
int connect(Context *context, std::string endpoint, std::string address, bool conflate=false);
void setTimeout(int timeout);
void * getRawSocket() {return sock;}
Message *receive(bool non_blocking=false);
~ZMQSubSocket();
};
class ZMQPubSocket : public PubSocket {
private:
void * sock;
std::string full_endpoint;
public:
int connect(Context *context, std::string endpoint);
int sendMessage(Message *message);
int send(char *data, size_t size);
~ZMQPubSocket();
};
class ZMQPoller : public Poller {
private:
std::vector<SubSocket*> sockets;
zmq_pollitem_t polls[MAX_POLLERS];
size_t num_polls = 0;
public:
void registerSocket(SubSocket *socket);
std::vector<SubSocket*> poll(int timeout);
~ZMQPoller(){};
};

View File

@ -0,0 +1,117 @@
#include "messaging.hpp"
#include "impl_zmq.hpp"
#include "impl_msgq.hpp"
Context * Context::create(){
Context * c;
if (std::getenv("MSGQ")){
c = new MSGQContext();
} else {
c = new ZMQContext();
}
return c;
}
SubSocket * SubSocket::create(){
SubSocket * s;
if (std::getenv("MSGQ")){
s = new MSGQSubSocket();
} else {
s = new ZMQSubSocket();
}
return s;
}
SubSocket * SubSocket::create(Context * context, std::string endpoint){
SubSocket *s = SubSocket::create();
int r = s->connect(context, endpoint, "127.0.0.1");
if (r == 0) {
return s;
} else {
delete s;
return NULL;
}
}
SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address){
SubSocket *s = SubSocket::create();
int r = s->connect(context, endpoint, address);
if (r == 0) {
return s;
} else {
delete s;
return NULL;
}
}
SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address, bool conflate){
SubSocket *s = SubSocket::create();
int r = s->connect(context, endpoint, address, conflate);
if (r == 0) {
return s;
} else {
delete s;
return NULL;
}
}
PubSocket * PubSocket::create(){
PubSocket * s;
if (std::getenv("MSGQ")){
s = new MSGQPubSocket();
} else {
s = new ZMQPubSocket();
}
return s;
}
PubSocket * PubSocket::create(Context * context, std::string endpoint){
PubSocket *s = PubSocket::create();
int r = s->connect(context, endpoint);
if (r == 0) {
return s;
} else {
delete s;
return NULL;
}
}
Poller * Poller::create(){
Poller * p;
if (std::getenv("MSGQ")){
p = new MSGQPoller();
} else {
p = new ZMQPoller();
}
return p;
}
Poller * Poller::create(std::vector<SubSocket*> sockets){
Poller * p = Poller::create();
for (auto s : sockets){
p->registerSocket(s);
}
return p;
}
extern "C" Context * messaging_context_create() {
return Context::create();
}
extern "C" SubSocket * messaging_subsocket_create(Context* context, const char* endpoint) {
return SubSocket::create(context, std::string(endpoint));
}
extern "C" PubSocket * messaging_pubsocket_create(Context* context, const char* endpoint) {
return PubSocket::create(context, std::string(endpoint));
}
extern "C" Poller * messaging_poller_create(SubSocket** sockets, int size) {
std::vector<SubSocket*> socketsVec(sockets, sockets + size);
return Poller::create(socketsVec);
}

View File

@ -0,0 +1,56 @@
#pragma once
#include <cstddef>
#include <vector>
#include <string>
#define MSG_MULTIPLE_PUBLISHERS 100
class Context {
public:
virtual void * getRawContext() = 0;
static Context * create();
virtual ~Context(){};
};
class Message {
public:
virtual void init(size_t size) = 0;
virtual void init(char * data, size_t size) = 0;
virtual void close() = 0;
virtual size_t getSize() = 0;
virtual char * getData() = 0;
virtual ~Message(){};
};
class SubSocket {
public:
virtual int connect(Context *context, std::string endpoint, std::string address, bool conflate=false) = 0;
virtual void setTimeout(int timeout) = 0;
virtual Message *receive(bool non_blocking=false) = 0;
virtual void * getRawSocket() = 0;
static SubSocket * create();
static SubSocket * create(Context * context, std::string endpoint);
static SubSocket * create(Context * context, std::string endpoint, std::string address);
static SubSocket * create(Context * context, std::string endpoint, std::string address, bool conflate);
virtual ~SubSocket(){};
};
class PubSocket {
public:
virtual int connect(Context *context, std::string endpoint) = 0;
virtual int sendMessage(Message *message) = 0;
virtual int send(char *data, size_t size) = 0;
static PubSocket * create();
static PubSocket * create(Context * context, std::string endpoint);
virtual ~PubSocket(){};
};
class Poller {
public:
virtual void registerSocket(SubSocket *socket) = 0;
virtual std::vector<SubSocket*> poll(int timeout) = 0;
static Poller * create();
static Poller * create(std::vector<SubSocket*> sockets);
virtual ~Poller(){};
};

View File

@ -0,0 +1,39 @@
# distutils: language = c++
#cython: language_level=3
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp cimport bool
cdef extern from "messaging.hpp":
cdef cppclass Context:
@staticmethod
Context * create()
cdef cppclass Message:
void init(size_t)
void init(char *, size_t)
void close()
size_t getSize()
char *getData()
cdef cppclass SubSocket:
@staticmethod
SubSocket * create()
int connect(Context *, string, string, bool)
Message * receive(bool)
void setTimeout(int)
cdef cppclass PubSocket:
@staticmethod
PubSocket * create()
int connect(Context *, string)
int sendMessage(Message *)
int send(char *, size_t)
cdef cppclass Poller:
@staticmethod
Poller * create()
void registerSocket(SubSocket *)
vector[SubSocket*] poll(int) nogil

View File

@ -0,0 +1,151 @@
# distutils: language = c++
# cython: c_string_encoding=ascii, language_level=3
import sys
from libcpp.string cimport string
from libcpp cimport bool
from libc cimport errno
from messaging cimport Context as cppContext
from messaging cimport SubSocket as cppSubSocket
from messaging cimport PubSocket as cppPubSocket
from messaging cimport Poller as cppPoller
from messaging cimport Message as cppMessage
class MessagingError(Exception):
pass
class MultiplePublishersError(MessagingError):
pass
cdef class Context:
cdef cppContext * context
def __cinit__(self):
self.context = cppContext.create()
def term(self):
del self.context
self.context = NULL
def __dealloc__(self):
pass
# Deleting the context will hang if sockets are still active
# TODO: Figure out a way to make sure the context is closed last
# del self.context
cdef class Poller:
cdef cppPoller * poller
cdef list sub_sockets
def __cinit__(self):
self.sub_sockets = []
self.poller = cppPoller.create()
def __dealloc__(self):
del self.poller
def registerSocket(self, SubSocket socket):
self.sub_sockets.append(socket)
self.poller.registerSocket(socket.socket)
def poll(self, timeout):
sockets = []
cdef int t = timeout
with nogil:
result = self.poller.poll(t)
for s in result:
socket = SubSocket()
socket.setPtr(s)
sockets.append(socket)
return sockets
cdef class SubSocket:
cdef cppSubSocket * socket
cdef bool is_owner
def __cinit__(self):
self.socket = cppSubSocket.create()
self.is_owner = True
if self.socket == NULL:
raise MessagingError
def __dealloc__(self):
if self.is_owner:
del self.socket
cdef setPtr(self, cppSubSocket * ptr):
if self.is_owner:
del self.socket
self.is_owner = False
self.socket = ptr
def connect(self, Context context, string endpoint, string address=b"127.0.0.1", bool conflate=False):
r = self.socket.connect(context.context, endpoint, address, conflate)
if r != 0:
if errno.errno == errno.EADDRINUSE:
raise MultiplePublishersError
else:
raise MessagingError
def setTimeout(self, int timeout):
self.socket.setTimeout(timeout)
def receive(self, bool non_blocking=False):
msg = self.socket.receive(non_blocking)
if msg == NULL:
# If a blocking read returns no message check errno if SIGINT was caught in the C++ code
if errno.errno == errno.EINTR:
print("SIGINT received, exiting")
sys.exit(1)
return None
else:
sz = msg.getSize()
m = msg.getData()[:sz]
del msg
return m
cdef class PubSocket:
cdef cppPubSocket * socket
def __cinit__(self):
self.socket = cppPubSocket.create()
if self.socket == NULL:
raise MessagingError
def __dealloc__(self):
del self.socket
def connect(self, Context context, string endpoint):
r = self.socket.connect(context.context, endpoint)
if r != 0:
if errno.errno == errno.EADDRINUSE:
raise MultiplePublishersError
else:
raise MessagingError
def send(self, string data):
length = len(data)
r = self.socket.send(<char*>data.c_str(), length)
if r != length:
if errno.errno == errno.EADDRINUSE:
raise MultiplePublishersError
else:
raise MessagingError

View File

@ -0,0 +1,56 @@
import os
import subprocess
import sysconfig
from distutils.core import Extension, setup # pylint: disable=import-error,no-name-in-module
from Cython.Build import cythonize
from Cython.Distutils import build_ext
def get_ext_filename_without_platform_suffix(filename):
name, ext = os.path.splitext(filename)
ext_suffix = sysconfig.get_config_var('EXT_SUFFIX')
if ext_suffix == ext:
return filename
ext_suffix = ext_suffix.replace(ext, '')
idx = name.find(ext_suffix)
if idx == -1:
return filename
else:
return name[:idx] + ext
class BuildExtWithoutPlatformSuffix(build_ext):
def get_ext_filename(self, ext_name):
filename = super().get_ext_filename(ext_name)
return get_ext_filename_without_platform_suffix(filename)
sourcefiles = ['messaging_pyx.pyx']
extra_compile_args = ["-std=c++11"]
libraries = ['zmq']
ARCH = subprocess.check_output(["uname", "-m"], encoding='utf8').rstrip() # pylint: disable=unexpected-keyword-arg
if ARCH == "aarch64":
extra_compile_args += ["-Wno-deprecated-register"]
libraries += ['gnustl_shared']
setup(name='CAN parser',
cmdclass={'build_ext': BuildExtWithoutPlatformSuffix},
ext_modules=cythonize(
Extension(
"messaging_pyx",
language="c++",
sources=sourcefiles,
extra_compile_args=extra_compile_args,
libraries=libraries,
extra_objects=[
os.path.join(os.path.dirname(os.path.realpath(__file__)), '../', 'libmessaging.a'),
]
)
),
nthreads=4,
)

441
messaging/msgq.cc 100644
View File

@ -0,0 +1,441 @@
#include <iostream>
#include <cassert>
#include <cerrno>
#include <cmath>
#include <cstring>
#include <cstdint>
#include <chrono>
#include <algorithm>
#include <cstdlib>
#include <csignal>
#include <random>
#include <poll.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include "msgq.hpp"
void sigusr1_handler(int signal) {
assert(signal == SIGUSR1);
}
uint64_t msgq_get_uid(void){
std::random_device rd("/dev/urandom");
std::uniform_int_distribution<uint64_t> distribution(0,std::numeric_limits<uint32_t>::max());
uint64_t uid = distribution(rd) << 32 | syscall(SYS_gettid);
return uid;
}
int msgq_msg_init_size(msgq_msg_t * msg, size_t size){
msg->size = size;
msg->data = new(std::nothrow) char[size];
return (msg->data == NULL) ? -1 : 0;
}
int msgq_msg_init_data(msgq_msg_t * msg, char * data, size_t size) {
int r = msgq_msg_init_size(msg, size);
if (r == 0)
memcpy(msg->data, data, size);
return r;
}
int msgq_msg_close(msgq_msg_t * msg){
if (msg->size > 0)
delete[] msg->data;
msg->size = 0;
return 0;
}
void msgq_reset_reader(msgq_queue_t * q){
int id = q->reader_id;
q->read_valids[id]->store(true);
q->read_pointers[id]->store(*q->write_pointer);
}
void msgq_wait_for_subscriber(msgq_queue_t *q){
while (*q->num_readers == 0){
;
}
return;
}
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
std::signal(SIGUSR1, sigusr1_handler);
const char * prefix = "/dev/shm/";
char * full_path = new char[strlen(path) + strlen(prefix) + 1];
strcpy(full_path, prefix);
strcat(full_path, path);
auto fd = open(full_path, O_RDWR | O_CREAT, 0777);
delete[] full_path;
if (fd < 0)
return -1;
int rc = ftruncate(fd, size + sizeof(msgq_header_t));
if (rc < 0)
return -1;
char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
if (mem == NULL)
return -1;
q->mmap_p = mem;
msgq_header_t *header = (msgq_header_t *)mem;
// Setup pointers to header segment
q->num_readers = reinterpret_cast<std::atomic<uint64_t>*>(&header->num_readers);
q->write_pointer = reinterpret_cast<std::atomic<uint64_t>*>(&header->write_pointer);
q->write_uid = reinterpret_cast<std::atomic<uint64_t>*>(&header->write_uid);
for (size_t i = 0; i < NUM_READERS; i++){
q->read_pointers[i] = reinterpret_cast<std::atomic<uint64_t>*>(&header->read_pointers[i]);
q->read_valids[i] = reinterpret_cast<std::atomic<uint64_t>*>(&header->read_valids[i]);
q->read_uids[i] = reinterpret_cast<std::atomic<uint64_t>*>(&header->read_uids[i]);
}
q->data = mem + sizeof(msgq_header_t);
q->size = size;
q->reader_id = -1;
q->endpoint = path;
q->read_conflate = false;
return 0;
}
void msgq_close_queue(msgq_queue_t *q){
if (q->mmap_p != NULL){
munmap(q->mmap_p, q->size + sizeof(msgq_header_t));
}
}
void msgq_init_publisher(msgq_queue_t * q) {
std::cout << "Starting publisher" << std::endl;
uint64_t uid = msgq_get_uid();
*q->write_uid = uid;
*q->num_readers = 0;
for (size_t i = 0; i < NUM_READERS; i++){
*q->read_valids[i] = false;
*q->read_uids[i] = 0;
}
q->write_uid_local = uid;
}
void msgq_init_subscriber(msgq_queue_t * q) {
assert(q != NULL);
assert(q->num_readers != NULL);
uint64_t uid = msgq_get_uid();
// Get reader id
while (true){
uint64_t cur_num_readers = *q->num_readers;
uint64_t new_num_readers = cur_num_readers + 1;
// No more slots available. Reset all subscribers to kick out inactive ones
if (new_num_readers > NUM_READERS){
std::cout << "Warning, evicting all subscribers!" << std::endl;
*q->num_readers = 0;
for (size_t i = 0; i < NUM_READERS; i++){
*q->read_valids[i] = false;
uint64_t old_uid = *q->read_uids[i];
*q->read_uids[i] = 0;
// Wake up reader in case they are in a poll
syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1);
}
continue;
}
// Use atomic compare and swap to handle race condition
// where two subscribers start at the same time
if (std::atomic_compare_exchange_strong(q->num_readers,
&cur_num_readers,
new_num_readers)){
q->reader_id = cur_num_readers;
q->read_uid_local = uid;
// We start with read_valid = false,
// on the first read the read pointer will be synchronized with the write pointer
*q->read_valids[cur_num_readers] = false;
*q->read_pointers[cur_num_readers] = 0;
*q->read_uids[cur_num_readers] = uid;
break;
}
}
std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
msgq_reset_reader(q);
}
int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
// Die if we are no longer the active publisher
if (q->write_uid_local != *q->write_uid){
std::cout << "Killing old publisher: " << q->endpoint << std::endl;
errno = EADDRINUSE;
return -1;
}
uint64_t total_msg_size = ALIGN(msg->size + sizeof(int64_t));
// We need to fit at least three messages in the queue,
// then we can always safely access the last message
assert(3 * total_msg_size <= q->size);
uint64_t num_readers = *q->num_readers;
uint32_t write_cycles, write_pointer;
UNPACK64(write_cycles, write_pointer, *q->write_pointer);
char *p = q->data + write_pointer; // add base offset
// Check remaining space
// Always leave space for a wraparound tag for the next message, including alignment
int64_t remaining_space = q->size - write_pointer - total_msg_size - sizeof(int64_t);
if (remaining_space <= 0){
// Write -1 size tag indicating wraparound
*(int64_t*)p = -1;
// Invalidate all readers that are beyond the write pointer
// TODO: should we handle the case where a new reader shows up while this is running?
for (uint64_t i = 0; i < num_readers; i++){
uint64_t read_pointer = *q->read_pointers[i];
uint64_t read_cycles = read_pointer >> 32;
read_pointer &= 0xFFFFFFFF;
if ((read_pointer > write_pointer) && (read_cycles != write_cycles)) {
*q->read_valids[i] = false;
}
}
// Update global and local copies of write pointer and write_cycles
write_pointer = 0;
write_cycles = write_cycles + 1;
PACK64(*q->write_pointer, write_cycles, write_pointer);
// Set actual pointer to the beginning of the data segment
p = q->data;
}
// Invalidate readers that are in the area that will be written
uint64_t start = write_pointer;
uint64_t end = ALIGN(start + sizeof(int64_t) + msg->size);
for (uint64_t i = 0; i < num_readers; i++){
uint32_t read_cycles, read_pointer;
UNPACK64(read_cycles, read_pointer, *q->read_pointers[i]);
if ((read_pointer >= start) && (read_pointer < end) && (read_cycles != write_cycles)) {
*q->read_valids[i] = false;
}
}
// Write size tag
std::atomic<int64_t> *size_p = reinterpret_cast<std::atomic<int64_t>*>(p);
*size_p = msg->size;
// Copy data
memcpy(p + sizeof(int64_t), msg->data, msg->size);
__sync_synchronize();
// Update write pointer
uint32_t new_ptr = ALIGN(write_pointer + msg->size + sizeof(int64_t));
PACK64(*q->write_pointer, write_cycles, new_ptr);
// Notify readers
for (uint64_t i = 0; i < num_readers; i++){
uint64_t reader_uid = *q->read_uids[i];
syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1);
}
return msg->size;
}
int msgq_msg_ready(msgq_queue_t * q){
start:
int id = q->reader_id;
assert(id >= 0); // Make sure subscriber is initialized
if (q->read_uid_local != *q->read_uids[id]){
std::cout << q->endpoint << ": Reader was evicted, reconnecting" << std::endl;
msgq_init_subscriber(q);
goto start;
}
// Check valid
if (!*q->read_valids[id]){
msgq_reset_reader(q);
goto start;
}
uint32_t read_cycles, read_pointer;
UNPACK64(read_cycles, read_pointer, *q->read_pointers[id]);
uint32_t write_cycles, write_pointer;
UNPACK64(write_cycles, write_pointer, *q->write_pointer);
// Check if new message is available
return (read_pointer != write_pointer);
}
int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){
start:
int id = q->reader_id;
assert(id >= 0); // Make sure subscriber is initialized
if (q->read_uid_local != *q->read_uids[id]){
std::cout << q->endpoint << ": Reader was evicted, reconnecting" << std::endl;
msgq_init_subscriber(q);
goto start;
}
// Check valid
if (!*q->read_valids[id]){
msgq_reset_reader(q);
goto start;
}
uint32_t read_cycles, read_pointer;
UNPACK64(read_cycles, read_pointer, *q->read_pointers[id]);
uint32_t write_cycles, write_pointer;
UNPACK64(write_cycles, write_pointer, *q->write_pointer);
char * p = q->data + read_pointer;
// Check if new message is available
if (read_pointer == write_pointer) {
msg->size = 0;
return 0;
}
// Read potential message size
std::atomic<int64_t> *size_p = reinterpret_cast<std::atomic<int64_t>*>(p);
std::int64_t size = *size_p;
// Check if the size that was read is valid
if (!*q->read_valids[id]){
msgq_reset_reader(q);
goto start;
}
// If size is -1 the buffer was full, and we need to wrap around
if (size == -1){
read_cycles++;
PACK64(*q->read_pointers[id], read_cycles, 0);
goto start;
}
// crashing is better than passing garbage data to the consumer
// the size will have weird value if it was overwritten by data accidentally
assert((uint64_t)size < q->size);
assert(size > 0);
uint32_t new_read_pointer = ALIGN(read_pointer + sizeof(std::int64_t) + size);
// If conflate is true, check if this is the latest message, else start over
if (q->read_conflate){
if (new_read_pointer != write_pointer){
// Update read pointer
PACK64(*q->read_pointers[id], read_cycles, new_read_pointer);
goto start;
}
}
// Copy message
if (msgq_msg_init_size(msg, size) < 0)
return -1;
__sync_synchronize();
memcpy(msg->data, p + sizeof(int64_t), size);
__sync_synchronize();
// Update read pointer
PACK64(*q->read_pointers[id], read_cycles, new_read_pointer);
// Check if the actual data that was copied is valid
if (!*q->read_valids[id]){
msgq_msg_close(msg);
msgq_reset_reader(q);
goto start;
}
return msg->size;
}
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
assert(timeout >= 0);
int num = 0;
// Check if messages ready
for (size_t i = 0; i < nitems; i++) {
items[i].revents = msgq_msg_ready(items[i].q);
if (items[i].revents) num++;
}
int ms = (timeout == -1) ? 100 : timeout;
struct timespec ts;
ts.tv_sec = ms / 1000;
ts.tv_nsec = (ms % 1000) * 1000 * 1000;
while (num == 0) {
int ret;
ret = nanosleep(&ts, &ts);
// Check if messages ready
for (size_t i = 0; i < nitems; i++) {
if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){
num += 1;
items[i].revents = 1;
}
}
// exit if we had a timeout and the sleep finished
if (timeout != -1 && ret == 0){
break;
}
}
return num;
}

66
messaging/msgq.hpp 100644
View File

@ -0,0 +1,66 @@
#pragma once
#include <cstdint>
#include <cstring>
#include <string>
#include <atomic>
#define DEFAULT_SEGMENT_SIZE (10 * 1024 * 1024)
#define NUM_READERS 8
#define ALIGN(n) ((n + (8 - 1)) & -8)
#define UNPACK64(higher, lower, input) do {uint64_t tmp = input; higher = tmp >> 32; lower = tmp & 0xFFFFFFFF;} while (0)
#define PACK64(output, higher, lower) output = ((uint64_t)higher << 32 ) | ((uint64_t)lower & 0xFFFFFFFF)
struct msgq_header_t {
uint64_t num_readers;
uint64_t write_pointer;
uint64_t write_uid;
uint64_t read_pointers[NUM_READERS];
uint64_t read_valids[NUM_READERS];
uint64_t read_uids[NUM_READERS];
};
struct msgq_queue_t {
std::atomic<uint64_t> *num_readers;
std::atomic<uint64_t> *write_pointer;
std::atomic<uint64_t> *write_uid;
std::atomic<uint64_t> *read_pointers[NUM_READERS];
std::atomic<uint64_t> *read_valids[NUM_READERS];
std::atomic<uint64_t> *read_uids[NUM_READERS];
char * mmap_p;
char * data;
size_t size;
int reader_id;
uint64_t read_uid_local;
uint64_t write_uid_local;
bool read_conflate;
std::string endpoint;
};
struct msgq_msg_t {
size_t size;
char * data;
};
struct msgq_pollitem_t {
msgq_queue_t *q;
int revents;
};
void msgq_wait_for_subscriber(msgq_queue_t *q);
void msgq_reset_reader(msgq_queue_t *q);
int msgq_msg_init_size(msgq_msg_t *msg, size_t size);
int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size);
int msgq_msg_close(msgq_msg_t *msg);
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size);
void msgq_close_queue(msgq_queue_t *q);
void msgq_init_publisher(msgq_queue_t * q);
void msgq_init_subscriber(msgq_queue_t * q);
int msgq_msg_send(msgq_msg_t *msg, msgq_queue_t *q);
int msgq_msg_recv(msgq_msg_t *msg, msgq_queue_t *q);
int msgq_msg_ready(msgq_queue_t * q);
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout);

56
messaging/msgq.md 100644
View File

@ -0,0 +1,56 @@
# MSGQ: A lock free single producer multi consumer message queue
[![Build Status](https://dev.azure.com/commaai/default/_apis/build/status/commaai.msgq?branchName=master)](https://dev.azure.com/commaai/default/_build/latest?definitionId=21&branchName=master)
## What is MSGQ?
MSGQ is a system to pass messages from a single producer to multiple consumers. All the consumers need to be able to receive all the messages. It is designed to be a high performance replacement for ZMQ-like SUB/PUB patterns. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance.
## Storage
The storage for the queue consists of an area of metadata, and the actual buffer. The metadata contains:
1. A counter to the number of readers that are active
2. A pointer to the head of the queue for writing. From now on referred to as *write pointer*
3. A cycle counter for the writer. This counter is incremented when the writer wraps around
4. N pointers, pointing to the current read position for all the readers. From now on referred to as *read pointer*
5. N counters, counting the number of cycles for all the readers
6. N booleans, indicating validity for all the readers. From now on referred to as *validity flag*
The counter and the pointer are both 32 bit values, packed into 64 bit so they can be read and written atomically.
The data buffer is a ring buffer. All messages are prefixed by an 8 byte size field, followed by the data. A size of -1 indicates a wrap-around, and means the next message is stored at the beginning of the buffer.
## Writing
Writing involves the following steps:
1. Check if the area that is to be written overlaps with any of the read pointers, mark those readers as invalid by clearing the validity flag.
2. Write the message
3. Increase the write pointer by the size of the message
In case there is not enough space at the end of the buffer, a special empty message with a prefix of -1 is written. The cycle counter is incremented by one. In this case step 1 will check there are no read pointers pointing to the remainder of the buffer. Then another write cycle will start with the actual message.
There always needs to be 8 bytes of empty space at the end of the buffer. By doing this there is always space to write the -1.
## Reset reader
When the reader is lagging too much behind the read pointer becomes invalid and no longer points to the beginning of a valid message. To reset a reader to the current write pointer, the following steps are performed:
1. Set valid flag
2. Set read cycle counter to that of the writer
3. Set read pointer to write pointer
## Reading
Reading involves the following steps:
1. Read the size field at the current read pointer
2. Read the validity flag
3. Copy the data out of the buffer
4. Increase the read pointer by the size of the message
5. Check the validity flag again
Before starting the copy, the valid flag is checked. This is to prevent a race condition where the size prefix was invalid, and the read could read outside of the buffer. Make sure that step 1 and 2 are not reordered by your compiler or CPU.
If a writer overwrites the data while it's being copied out, the data will be invalid. Therefore the validity flag is also checked after reading it. The order of step 4 and 5 does not matter.
If at steps 2 or 5 the validity flag is not set, the reader is reset. Any data that was already read is discarded. After the reader is reset, the reading starts from the beginning.
If a message with size -1 is encountered, step 3 and 4 are replaced by increasing the cycle counter and setting the read pointer to the beginning of the buffer. After that another read is performed.

View File

@ -0,0 +1,395 @@
#include "catch2/catch.hpp"
#include "msgq.hpp"
TEST_CASE("ALIGN"){
REQUIRE(ALIGN(0) == 0);
REQUIRE(ALIGN(1) == 8);
REQUIRE(ALIGN(7) == 8);
REQUIRE(ALIGN(8) == 8);
REQUIRE(ALIGN(99999) == 100000);
}
TEST_CASE("msgq_msg_init_size"){
const size_t msg_size = 30;
msgq_msg_t msg;
msgq_msg_init_size(&msg, msg_size);
REQUIRE(msg.size == msg_size);
msgq_msg_close(&msg);
}
TEST_CASE("msgq_msg_init_data"){
const size_t msg_size = 30;
char * data = new char[msg_size];
for (size_t i = 0; i < msg_size; i++){
data[i] = i;
}
msgq_msg_t msg;
msgq_msg_init_data(&msg, data, msg_size);
REQUIRE(msg.size == msg_size);
REQUIRE(memcmp(msg.data, data, msg_size) == 0);
delete[] data;
msgq_msg_close(&msg);
}
TEST_CASE("msgq_init_subscriber"){
remove("/dev/shm/test_queue");
msgq_queue_t q;
msgq_new_queue(&q, "test_queue", 1024);
REQUIRE(*q.num_readers == 0);
q.reader_id = 1;
*q.read_valids[0] = false;
*q.read_pointers[0] = ((uint64_t)1 << 32);
*q.write_pointer = 255;
msgq_init_subscriber(&q);
REQUIRE(q.read_conflate == false);
REQUIRE(*q.read_valids[0] == true);
REQUIRE((*q.read_pointers[0] >> 32) == 0);
REQUIRE((*q.read_pointers[0] & 0xFFFFFFFF) == 255);
}
TEST_CASE("msgq_msg_send first message"){
remove("/dev/shm/test_queue");
msgq_queue_t q;
msgq_new_queue(&q, "test_queue", 1024);
msgq_init_publisher(&q);
REQUIRE(*q.write_pointer == 0);
size_t msg_size = 128;
SECTION("Aligned message size"){
}
SECTION("Unaligned message size"){
msg_size--;
}
char * data = new char[msg_size];
for (size_t i = 0; i < msg_size; i++){
data[i] = i;
}
msgq_msg_t msg;
msgq_msg_init_data(&msg, data, msg_size);
msgq_msg_send(&msg, &q);
REQUIRE(*(int64_t*)q.data == msg_size); // Check size tag
REQUIRE(*q.write_pointer == 128 + sizeof(int64_t));
REQUIRE(memcmp(q.data + sizeof(int64_t), data, msg_size) == 0);
delete[] data;
msgq_msg_close(&msg);
}
TEST_CASE("msgq_msg_send test wraparound"){
remove("/dev/shm/test_queue");
msgq_queue_t q;
msgq_new_queue(&q, "test_queue", 1024);
msgq_init_publisher(&q);
REQUIRE((*q.write_pointer & 0xFFFFFFFF) == 0);
REQUIRE((*q.write_pointer >> 32) == 0);
const size_t msg_size = 120;
msgq_msg_t msg;
msgq_msg_init_size(&msg, msg_size);
for (int i = 0; i < 8; i++) {
msgq_msg_send(&msg, &q);
}
// Check 8th message was written at the beginning
REQUIRE((*q.write_pointer & 0xFFFFFFFF) == msg_size + sizeof(int64_t));
// Check cycle count
REQUIRE((*q.write_pointer >> 32) == 1);
// Check wraparound tag
char * tag_location = q.data;
tag_location += 7 * (msg_size + sizeof(int64_t));
REQUIRE(*(int64_t*)tag_location == -1);
msgq_msg_close(&msg);
}
TEST_CASE("msgq_msg_recv test wraparound"){
remove("/dev/shm/test_queue");
msgq_queue_t q_pub, q_sub;
msgq_new_queue(&q_pub, "test_queue", 1024);
msgq_new_queue(&q_sub, "test_queue", 1024);
msgq_init_publisher(&q_pub);
msgq_init_subscriber(&q_sub);
REQUIRE((*q_pub.write_pointer >> 32) == 0);
REQUIRE((*q_sub.read_pointers[0] >> 32) == 0);
const size_t msg_size = 120;
msgq_msg_t msg1;
msgq_msg_init_size(&msg1, msg_size);
SECTION("Check cycle counter after reset") {
for (int i = 0; i < 8; i++) {
msgq_msg_send(&msg1, &q_pub);
}
msgq_msg_t msg2;
msgq_msg_recv(&msg2, &q_sub);
REQUIRE(msg2.size == 0); // Reader had to reset
msgq_msg_close(&msg2);
}
SECTION("Check cycle counter while keeping up with writer") {
for (int i = 0; i < 8; i++) {
msgq_msg_send(&msg1, &q_pub);
msgq_msg_t msg2;
msgq_msg_recv(&msg2, &q_sub);
REQUIRE(msg2.size > 0);
msgq_msg_close(&msg2);
}
}
REQUIRE((*q_sub.read_pointers[0] >> 32) == 1);
msgq_msg_close(&msg1);
}
TEST_CASE("msgq_msg_send test invalidation"){
remove("/dev/shm/test_queue");
msgq_queue_t q_pub, q_sub;
msgq_new_queue(&q_pub, "test_queue", 1024);
msgq_new_queue(&q_sub, "test_queue", 1024);
msgq_init_publisher(&q_pub);
msgq_init_subscriber(&q_sub);
*q_sub.write_pointer = (uint64_t)1 << 32;
REQUIRE(*q_sub.read_valids[0] == true);
SECTION("read pointer in tag"){
*q_sub.read_pointers[0] = 0;
}
SECTION("read pointer in data section"){
*q_sub.read_pointers[0] = 64;
}
SECTION("read pointer in wraparound section"){
*q_pub.write_pointer = ((uint64_t)1 << 32) | 1000; // Writer is one cycle ahead
*q_sub.read_pointers[0] = 1020;
}
msgq_msg_t msg;
msgq_msg_init_size(&msg, 128);
msgq_msg_send(&msg, &q_pub);
REQUIRE(*q_sub.read_valids[0] == false);
msgq_msg_close(&msg);
}
TEST_CASE("msgq_init_subscriber init 2 subscribers"){
remove("/dev/shm/test_queue");
msgq_queue_t q1, q2;
msgq_new_queue(&q1, "test_queue", 1024);
msgq_new_queue(&q2, "test_queue", 1024);
*q1.num_readers = 0;
REQUIRE(*q1.num_readers == 0);
REQUIRE(*q2.num_readers == 0);
msgq_init_subscriber(&q1);
REQUIRE(*q1.num_readers == 1);
REQUIRE(*q2.num_readers == 1);
REQUIRE(q1.reader_id == 0);
msgq_init_subscriber(&q2);
REQUIRE(*q1.num_readers == 2);
REQUIRE(*q2.num_readers == 2);
REQUIRE(q2.reader_id == 1);
}
TEST_CASE("Write 1 msg, read 1 msg", "[integration]"){
remove("/dev/shm/test_queue");
const size_t msg_size = 128;
msgq_queue_t writer, reader;
msgq_new_queue(&writer, "test_queue", 1024);
msgq_new_queue(&reader, "test_queue", 1024);
msgq_init_publisher(&writer);
msgq_init_subscriber(&reader);
// Build 128 byte message
msgq_msg_t outgoing_msg;
msgq_msg_init_size(&outgoing_msg, msg_size);
for (size_t i = 0; i < msg_size; i++){
outgoing_msg.data[i] = i;
}
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size);
msgq_msg_t incoming_msg1;
REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size);
REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0);
// Verify that there are no more messages
msgq_msg_t incoming_msg2;
REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == 0);
msgq_msg_close(&outgoing_msg);
msgq_msg_close(&incoming_msg1);
msgq_msg_close(&incoming_msg2);
}
TEST_CASE("Write 2 msg, read 2 msg - conflate = false", "[integration]"){
remove("/dev/shm/test_queue");
const size_t msg_size = 128;
msgq_queue_t writer, reader;
msgq_new_queue(&writer, "test_queue", 1024);
msgq_new_queue(&reader, "test_queue", 1024);
msgq_init_publisher(&writer);
msgq_init_subscriber(&reader);
// Build 128 byte message
msgq_msg_t outgoing_msg;
msgq_msg_init_size(&outgoing_msg, msg_size);
for (size_t i = 0; i < msg_size; i++){
outgoing_msg.data[i] = i;
}
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size);
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size);
msgq_msg_t incoming_msg1;
REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size);
REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0);
msgq_msg_t incoming_msg2;
REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == msg_size);
REQUIRE(memcmp(incoming_msg2.data, outgoing_msg.data, msg_size) == 0);
msgq_msg_close(&outgoing_msg);
msgq_msg_close(&incoming_msg1);
msgq_msg_close(&incoming_msg2);
}
TEST_CASE("Write 2 msg, read 2 msg - conflate = true", "[integration]"){
remove("/dev/shm/test_queue");
const size_t msg_size = 128;
msgq_queue_t writer, reader;
msgq_new_queue(&writer, "test_queue", 1024);
msgq_new_queue(&reader, "test_queue", 1024);
msgq_init_publisher(&writer);
msgq_init_subscriber(&reader);
reader.read_conflate = true;
// Build 128 byte message
msgq_msg_t outgoing_msg;
msgq_msg_init_size(&outgoing_msg, msg_size);
for (size_t i = 0; i < msg_size; i++){
outgoing_msg.data[i] = i;
}
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size);
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size);
msgq_msg_t incoming_msg1;
REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size);
REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0);
// Verify that there are no more messages
msgq_msg_t incoming_msg2;
REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == 0);
msgq_msg_close(&outgoing_msg);
msgq_msg_close(&incoming_msg1);
msgq_msg_close(&incoming_msg2);
}
TEST_CASE("1 publisher, 1 slow subscriber", "[integration]"){
remove("/dev/shm/test_queue");
msgq_queue_t writer, reader;
msgq_new_queue(&writer, "test_queue", 1024);
msgq_new_queue(&reader, "test_queue", 1024);
msgq_init_publisher(&writer);
msgq_init_subscriber(&reader);
int n_received = 0;
int n_skipped = 0;
for (uint64_t i = 0; i < 1e5; i++) {
msgq_msg_t outgoing_msg;
msgq_msg_init_data(&outgoing_msg, (char*)&i, sizeof(uint64_t));
msgq_msg_send(&outgoing_msg, &writer);
msgq_msg_close(&outgoing_msg);
if (i % 10 == 0){
msgq_msg_t msg1;
msgq_msg_recv(&msg1, &reader);
if (msg1.size == 0){
n_skipped++;
} else {
n_received++;
}
msgq_msg_close(&msg1);
}
}
// TODO: verify these numbers by hand
REQUIRE(n_received == 8572);
REQUIRE(n_skipped == 1428);
}
TEST_CASE("1 publisher, 2 subscribers", "[integration]"){
remove("/dev/shm/test_queue");
msgq_queue_t writer, reader1, reader2;
msgq_new_queue(&writer, "test_queue", 1024);
msgq_new_queue(&reader1, "test_queue", 1024);
msgq_new_queue(&reader2, "test_queue", 1024);
msgq_init_publisher(&writer);
msgq_init_subscriber(&reader1);
msgq_init_subscriber(&reader2);
for (uint64_t i = 0; i < 1024 * 3; i++) {
msgq_msg_t outgoing_msg;
msgq_msg_init_data(&outgoing_msg, (char*)&i, sizeof(uint64_t));
msgq_msg_send(&outgoing_msg, &writer);
msgq_msg_t msg1, msg2;
msgq_msg_recv(&msg1, &reader1);
msgq_msg_recv(&msg2, &reader2);
REQUIRE(msg1.size == sizeof(uint64_t));
REQUIRE(msg2.size == sizeof(uint64_t));
REQUIRE(*(uint64_t*)msg1.data == i);
REQUIRE(*(uint64_t*)msg2.data == i);
msgq_msg_close(&outgoing_msg);
msgq_msg_close(&msg1);
msgq_msg_close(&msg2);
}
}

View File

@ -0,0 +1,14 @@
from messaging_pyx import Context, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error
if __name__ == "__main__":
c = Context()
pub_sock = PubSocket()
pub_sock.connect(c, "controlsState")
for i in range(int(1e10)):
print(i)
sub_sock = SubSocket()
sub_sock.connect(c, "controlsState")
pub_sock.send(b'a')
print(sub_sock.receive())

View File

@ -0,0 +1,2 @@
#define CATCH_CONFIG_MAIN
#include "catch2/catch.hpp"

View File

View File

@ -0,0 +1,142 @@
import unittest
import time
import cereal.messaging as messaging
import concurrent.futures
def poller():
context = messaging.Context()
p = messaging.Poller()
sub = messaging.SubSocket()
sub.connect(context, 'controlsState')
p.registerSocket(sub)
socks = p.poll(10000)
r = [s.receive(non_blocking=True) for s in socks]
return r
class TestPoller(unittest.TestCase):
def test_poll_once(self):
context = messaging.Context()
pub = messaging.PubSocket()
pub.connect(context, 'controlsState')
with concurrent.futures.ThreadPoolExecutor() as e:
poll = e.submit(poller)
time.sleep(0.1) # Slow joiner syndrome
# Send message
pub.send("a")
# Wait for poll result
result = poll.result()
del pub
context.term()
self.assertEqual(result, [b"a"])
def test_poll_and_create_many_subscribers(self):
context = messaging.Context()
pub = messaging.PubSocket()
pub.connect(context, 'controlsState')
with concurrent.futures.ThreadPoolExecutor() as e:
poll = e.submit(poller)
time.sleep(0.1) # Slow joiner syndrome
c = messaging.Context()
for _ in range(10):
messaging.SubSocket().connect(c, 'controlsState')
time.sleep(0.1)
# Send message
pub.send("a")
# Wait for poll result
result = poll.result()
del pub
context.term()
self.assertEqual(result, [b"a"])
def test_multiple_publishers_exception(self):
context = messaging.Context()
with self.assertRaises(messaging.MultiplePublishersError):
pub1 = messaging.PubSocket()
pub1.connect(context, 'controlsState')
pub2 = messaging.PubSocket()
pub2.connect(context, 'controlsState')
pub1.send("a")
del pub1
del pub2
context.term()
def test_multiple_messages(self):
context = messaging.Context()
pub = messaging.PubSocket()
pub.connect(context, 'controlsState')
sub = messaging.SubSocket()
sub.connect(context, 'controlsState')
time.sleep(0.1) # Slow joiner
for i in range(100):
pub.send(str(i))
msg_seen = False
i = 0
while True:
r = sub.receive(non_blocking=True)
if r is not None:
self.assertEqual(str(i), r.decode('utf8'))
msg_seen = True
i += 1
if r is None and msg_seen: # ZMQ sometimes receives nothing on the first receive
break
del pub
del sub
context.term()
def test_conflate(self):
context = messaging.Context()
pub = messaging.PubSocket()
pub.connect(context, 'controlsState')
sub = messaging.SubSocket()
sub.connect(context, 'controlsState', conflate=True)
time.sleep(0.1) # Slow joiner
pub.send('a')
pub.send('b')
self.assertEqual(b'b', sub.receive())
del pub
del sub
context.term()
if __name__ == "__main__":
unittest.main()

163
service_list.yaml 100644
View File

@ -0,0 +1,163 @@
# TODO: these port numbers are hardcoded in c, fix this
# LogRotate: 8001 is a PUSH PULL socket between loggerd and visiond
# all ZMQ pub sub: port, should_log, frequency, (qlog_decimation)
# frame syncing packet
frame: [8002, true, 20., 1]
# accel, gyro, and compass
sensorEvents: [8003, true, 100., 100]
# GPS data, also global timestamp
gpsNMEA: [8004, true, 9.] # 9 msgs each sec
# CPU+MEM+GPU+BAT temps
thermal: [8005, true, 2., 1]
# List(CanData), list of can messages
can: [8006, true, 100.]
controlsState: [8007, true, 100., 100]
#liveEvent: [8008, true, 0.]
model: [8009, true, 20., 5]
features: [8010, true, 0.]
health: [8011, true, 2., 1]
radarState: [8012, true, 20.]
#liveUI: [8014, true, 0.]
encodeIdx: [8015, true, 20.]
liveTracks: [8016, true, 20.]
sendcan: [8017, true, 100.]
logMessage: [8018, true, 0.]
liveCalibration: [8019, true, 5.]
androidLog: [8020, true, 0.]
carState: [8021, true, 100., 10]
# 8022 is reserved for sshd
carControl: [8023, true, 100., 10]
plan: [8024, true, 20.]
liveLocation: [8025, true, 0.]
gpsLocation: [8026, true, 1., 1]
ethernetData: [8027, true, 0.]
navUpdate: [8028, true, 0.]
qcomGnss: [8029, true, 0.]
lidarPts: [8030, true, 0.]
procLog: [8031, true, 0.5]
gpsLocationExternal: [8032, true, 10., 1]
ubloxGnss: [8033, true, 10.]
clocks: [8034, true, 1., 1]
liveMpc: [8035, false, 20.]
liveLongitudinalMpc: [8036, false, 20.]
navStatus: [8038, true, 0.]
gpsLocationTrimble: [8039, true, 0.]
trimbleGnss: [8041, true, 0.]
ubloxRaw: [8042, true, 20.]
gpsPlannerPoints: [8043, true, 0.]
gpsPlannerPlan: [8044, true, 0.]
applanixRaw: [8046, true, 0.]
orbLocation: [8047, true, 0.]
trafficEvents: [8048, true, 0.]
liveLocationTiming: [8049, true, 0.]
orbslamCorrection: [8050, true, 0.]
liveLocationCorrected: [8051, true, 0.]
orbObservation: [8052, true, 0.]
applanixLocation: [8053, true, 0.]
liveLocationKalman: [8054, true, 0.]
uiNavigationEvent: [8055, true, 0.]
orbOdometry: [8057, true, 0.]
orbFeatures: [8058, false, 0.]
orbKeyFrame: [8059, true, 0.]
uiLayoutState: [8060, true, 0.]
frontEncodeIdx: [8061, true, 5.]
orbFeaturesSummary: [8062, true, 0.]
driverMonitoring: [8063, true, 5., 1]
liveParameters: [8064, true, 10.]
liveMapData: [8065, true, 0.]
cameraOdometry: [8066, true, 5.]
pathPlan: [8067, true, 20.]
kalmanOdometry: [8068, true, 0.]
thumbnail: [8069, true, 0.2, 1]
carEvents: [8070, true, 1., 1]
carParams: [8071, true, 0.02, 1]
frontFrame: [8072, true, 10.]
testModel: [8040, false, 0.]
testLiveLocation: [8045, false, 0.]
testJoystick: [8056, false, 0.]
# 8080 is reserved for slave testing daemon
# 8762 is reserved for logserver
# manager -- base process to manage starting and stopping of all others
# subscribes: thermal
# **** processes that communicate with the outside world ****
# thermald -- decides when to start and stop onroad
# subscribes: health, location
# publishes: thermal
# boardd -- communicates with the car
# subscribes: sendcan
# publishes: can, health, ubloxRaw
# sensord -- publishes IMU and Magnetometer
# publishes: sensorEvents
# gpsd -- publishes EON's gps
# publishes: gpsNMEA
# visiond -- talks to the cameras, runs the model, saves the videos
# publishes: frame, model, driverMonitoring, cameraOdometry, thumbnail
# **** stateful data transformers ****
# plannerd -- decides where to drive the car
# subscribes: carState, model, radarState, controlsState, liveParameters
# publishes: plan, pathPlan, liveMpc, liveLongitudinalMpc
# controlsd -- drives the car by sending CAN messages to panda
# subscribes: can, thermal, health, plan, pathPlan, driverMonitoring, liveCalibration
# publishes: carState, carControl, sendcan, controlsState, carEvents, carParams
# radard -- processes the radar and vision data
# subscribes: can, controlsState, model, liveParameters
# publishes: radarState, liveTracks
# params_learner -- learns vehicle params by observing the vehicle dynamics
# subscribes: controlsState, sensorEvents, cameraOdometry
# publishes: liveParameters
# calibrationd -- reads posenet and applies a temporal filter on the frame region to look at
# subscribes: cameraOdometry
# publishes: liveCalibration
# ubloxd -- read raw ublox data and converts them in readable format
# subscribes: ubloxRaw
# publishes: ubloxGnss
# **** LOGGING SERVICE ****
# loggerd
# subscribes: EVERYTHING
# **** NON VITAL SERVICES ****
# ui
# subscribes: thermal, model, controlsState, uiLayout, liveCalibration, radarState, liveMpc, plusFrame, liveMapData
# uploader
# communicates through file system with loggerd
# deleter
# communicates through file system with loggerd and uploader
# logmessaged -- central logging service, can log to cloud
# publishes: logMessage
# logcatd -- fetches logcat info from android
# publishes: androidLog
# proclogd -- fetches process information
# publishes: procLog
# tombstoned -- reports native crashes
# athenad -- on request, open a sub socket and return the value
# updated -- waits for network access and tries to update every hour

33
services.py 100755
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python3
import os
import yaml
class Service():
def __init__(self, port, should_log, frequency, decimation=None):
self.port = port
self.should_log = should_log
self.frequency = frequency
self.decimation = decimation
service_list_path = os.path.join(os.path.dirname(__file__), "service_list.yaml")
service_list = {}
with open(service_list_path, "r") as f:
for k, v in yaml.safe_load(f).items():
decimation = None
if len(v) == 4:
decimation = v[3]
service_list[k] = Service(v[0], v[1], v[2], decimation)
if __name__ == "__main__":
print("/* THIS IS AN AUTOGENERATED FILE, PLEASE EDIT service_list.yaml */")
print("#ifndef __SERVICES_H")
print("#define __SERVICES_H")
print("struct service { int port; bool should_log; int frequency; int decimation; char name[0x100]; };")
print("static struct service services[] = {")
for k, v in service_list.items():
print(' { .name = "%s", .port = %d, .should_log = %s, .frequency = %d, .decimation = %d },' % (k, v.port, "true" if v.should_log else "false", v.frequency, -1 if v.decimation is None else v.decimation))
print("};")
print("#endif")