nopenpilot/common/realtime.py

99 lines
2.6 KiB
Python

"""Utilities for reading real time clocks and keeping soft real time constraints."""
import time
import ctypes
import platform
import subprocess
import multiprocessing
import os
CLOCK_MONOTONIC_RAW = 4 # see <linux/time.h>
CLOCK_BOOTTIME = 7
class timespec(ctypes.Structure):
_fields_ = [
('tv_sec', ctypes.c_long),
('tv_nsec', ctypes.c_long),
]
try:
libc = ctypes.CDLL('libc.so', use_errno=True)
except OSError:
try:
libc = ctypes.CDLL('libc.so.6', use_errno=True)
except OSError:
libc = None
if libc is not None:
libc.clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)]
def clock_gettime(clk_id):
if platform.system().lower() == "darwin":
# TODO: fix this
return time.time()
else:
t = timespec()
if libc.clock_gettime(clk_id, ctypes.pointer(t)) != 0:
errno_ = ctypes.get_errno()
raise OSError(errno_, os.strerror(errno_))
return t.tv_sec + t.tv_nsec * 1e-9
def monotonic_time():
return clock_gettime(CLOCK_MONOTONIC_RAW)
def sec_since_boot():
return clock_gettime(CLOCK_BOOTTIME)
def set_realtime_priority(level):
if os.getuid() != 0:
print("not setting priority, not root")
return
if platform.machine() == "x86_64":
NR_gettid = 186
elif platform.machine() == "aarch64":
NR_gettid = 178
else:
raise NotImplementedError
tid = libc.syscall(NR_gettid)
return subprocess.call(['chrt', '-f', '-p', str(level), str(tid)])
class Ratekeeper(object):
def __init__(self, rate, print_delay_threshold=0.):
"""Rate in Hz for ratekeeping. print_delay_threshold must be nonnegative."""
self._interval = 1. / rate
self._next_frame_time = sec_since_boot() + self._interval
self._print_delay_threshold = print_delay_threshold
self._frame = 0
self._remaining = 0
self._process_name = multiprocessing.current_process().name
@property
def frame(self):
return self._frame
@property
def remaining(self):
return self._remaining
# Maintain loop rate by calling this at the end of each loop
def keep_time(self):
lagged = self.monitor_time()
if self._remaining > 0:
time.sleep(self._remaining)
return lagged
# this only monitor the cumulative lag, but does not enforce a rate
def monitor_time(self):
lagged = False
remaining = self._next_frame_time - sec_since_boot()
self._next_frame_time += self._interval
if remaining < -self._print_delay_threshold:
print(self._process_name, "lagging by", round(-remaining * 1000, 2), "ms")
lagged = True
self._frame += 1
self._remaining = remaining
return lagged