ELM327: Car simulator now simulates LIN ISO 14230-4 (KWP FAST).

master
Jessy Diamond Exum 2017-08-13 00:29:04 -07:00
parent 592729cf4b
commit 47d5627836
3 changed files with 240 additions and 103 deletions

View File

@ -343,7 +343,7 @@ class Panda(object):
return b''.join(ret)
def serial_write(self, port_number, ln):
return self._handle.bulkWrite(2, chr(port_number) + ln)
return self._handle.bulkWrite(2, struct.pack("B", port_number) + ln)
# ******************* kline *******************
@ -370,9 +370,8 @@ class Panda(object):
def kline_send(self, x, bus=2, checksum=True):
def get_checksum(dat):
result = 0
result += sum(map(ord, dat))
result = -result
return chr(result&0xFF)
result += sum(map(ord, dat)) if isinstance(b'dat', str) else sum(dat)
return struct.pack("B", result % 0x100)
self.kline_drain(bus=bus)
if checksum:
@ -391,4 +390,3 @@ class Panda(object):
msg = self.kline_ll_recv(2, bus=bus)
msg += self.kline_ll_recv(ord(msg[1])-2, bus=bus)
return msg

View File

@ -165,7 +165,7 @@ def test_elm_protocol_failure():
def test_elm_protocol_autodetect_ISO15765():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCanCarSimulator(serial, silent=True)
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False, silent=True)
sim.start()
try:
@ -203,7 +203,7 @@ def test_elm_protocol_autodetect_ISO15765():
def test_elm_basic_send_can():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCanCarSimulator(serial, silent=True)
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False, silent=True)
sim.start()
try:
@ -239,7 +239,7 @@ def test_elm_basic_send_can():
def test_elm_send_can_multimsg():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCanCarSimulator(serial)
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
sim.start()
try:
@ -249,7 +249,7 @@ def test_elm_send_can_multimsg():
send_compare(s, b'ATH1\r', b'OK\r\r>') # Headers ON
send_compare(s, b'ATSP6\r', b"OK\r\r>") # Set Proto ISO 15765-4 (CAN 11/500)
sim.add_extra_noise(b'\x03\x41\x0D\xFA', addr=0x7E9)# Inject message into the stream
sim.can_add_extra_noise(b'\x03\x41\x0D\xFA', addr=0x7E9)# Inject message into the stream
send_compare(s, b'010D\r',
b"7E8 03 41 0D 53 \r"
"7E9 03 41 0D FA \r\r>") # Check it was ignored.
@ -263,7 +263,7 @@ implemented correctly in the reference device."""
def test_elm_can_check_mode_pid():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCanCarSimulator(serial)
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
sim.start()
try:
@ -273,7 +273,7 @@ def test_elm_can_check_mode_pid():
send_compare(s, b'ATH0\r', b'OK\r\r>') # Headers OFF
send_compare(s, b'ATSP6\r', b"OK\r\r>") # Set Proto ISO 15765-4 (CAN 11/500)
sim.add_extra_noise(b'\x03\x41\x0E\xFA')# Inject message into the stream
sim.can_add_extra_noise(b'\x03\x41\x0E\xFA')# Inject message into the stream
send_compare(s, b'010D\r', b"410D53\r\r>") # Check it was ignored.
send_compare(s, b'0100\r', b"4100FFFFFFFE\r\r>") # Check it was ignored again.
finally:
@ -284,7 +284,7 @@ def test_elm_can_check_mode_pid():
def test_elm_send_can_multiline_msg():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCanCarSimulator(serial)
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
sim.start()
try:
@ -326,7 +326,7 @@ def test_elm_send_can_multiline_msg():
def test_elm_send_can_multiline_msg_throughput():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCanCarSimulator(serial)
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
sim.start()
try:
@ -351,7 +351,7 @@ def test_elm_send_can_multiline_msg_throughput():
def test_elm_interrupted_obd_cmd_resets_state():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCanCarSimulator(serial, silent=True)
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False, silent=True)
sim.start()
try:
@ -381,7 +381,7 @@ def test_elm_interrupted_obd_cmd_resets_state():
def test_elm_can_baud():
s = elm_connect()
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
sim = elm_car_simulator.ELMCanCarSimulator(serial)
sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
sim.start()
try:
@ -417,7 +417,7 @@ def test_elm_panda_safety_mode_ISO15765():
p_elm = Panda("WIFI")
#sim = elm_car_simulator.ELMCanCarSimulator(serial)
#sim = elm_car_simulator.ELMCarSimulator(serial, lin=False)
#sim.start()
def did_send(p, addr, dat, bus):

View File

@ -12,60 +12,155 @@ from collections import deque
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda
class ELMCanCarSimulator(threading.Thread):
def __init__(self, sn, can_kbaud=500, silent=False,
can11b=True, can29b=True, *args, **kwargs):
super(ELMCanCarSimulator, self).__init__(*args, **kwargs)
self._p = Panda(sn if sn else Panda.list()[0])
self.__stop = False
self._multipart_data = None
self._can_kbaud = can_kbaud
self._extra_noise_msgs = deque()
self.__silent = silent
def lin_checksum(dat):
return sum(dat) % 0x100
class ELMCarSimulator():
def __init__(self, sn, silent=False, can_kbaud=500,
can=True, can11b=True, can29b=True,
lin=True):
self.__p = Panda(sn if sn else Panda.list()[0])
self.__on = True
self.__stop = False
self.__silent = silent
self.__lin_timer = None
self.__lin_active = False
self.__lin_enable = lin
self.__lin_monitor_thread = threading.Thread(target=self.__lin_monitor)
self.__can_multipart_data = None
self.__can_kbaud = can_kbaud
self.__can_extra_noise_msgs = deque()
self.__can_enable = can
self.__can11b = can11b
self.__can29b = can29b
self.__can_monitor_thread = threading.Thread(target=self.__can_monitor)
self._p.can_recv() # Toss whatever was already there
@property
def panda(self):
return self.__p
def stop(self):
if self.__lin_timer:
self.__lin_timer.cancel()
self.__lin_timeout_handler()
self.__stop = True
def _can_send(self, addr, msg):
if not self.__silent:
print(" Reply (%x)" % addr, binascii.hexlify(msg))
self._p.can_send(addr, msg + b'\x00'*(8-len(msg)), 0)
if self._extra_noise_msgs:
noise = self._extra_noise_msgs.popleft()
self._p.can_send(noise[0] if noise[0] is not None else addr,
noise[1] + b'\x00'*(8-len(noise[1])), 0)
def _addr_matches(self, addr):
if self.__can11b and (addr == 0x7DF or (addr & 0x7F8) == 0x7E0):
return True
if self.__can29b and (addr == 0x18db33f1 or (addr & 0x1FFF00FF) == 0x18da00f1):
return True
return False
def run(self):
self._p.set_can_speed_kbps(0, self._can_kbaud)
self._p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
while not self.__stop:
for address, ts, data, src in self._p.can_recv():
if self.__on and src is 0 and len(data) >= 3:
self._process_msg(data[1], data[2], address, ts, data, src)
def change_can_baud(self, kbaud):
self._can_kbaud = kbaud
self._p.set_can_speed_kbps(0, self._can_kbaud)
def add_extra_noise(self, noise_msg, addr=None):
self._extra_noise_msgs.append((addr, noise_msg))
def join(self):
if self.__lin_monitor_thread.is_alive():
self.__lin_monitor_thread.join()
if self.__can_monitor_thread.is_alive():
self.__can_monitor_thread.join()
def set_enable(self, on):
self.__on = on
def start(self):
self.panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
self.__lin_monitor_thread.start()
self.__can_monitor_thread.start()
#########################
# LIN related functions #
#########################
def __lin_monitor(self):
print("STARTING LIN THREAD")
self.panda.set_uart_baud(2, 10400)
self.panda.kline_drain() # Toss whatever was already there
lin_buff = bytearray()
while not self.__stop:
lin_msg = self.panda.serial_read(2)
if not lin_msg:
continue
lin_buff += lin_msg
if not self.__lin_active:
if lin_buff.endswith(b'\xc1\x33\xf1\x81\x66'):
lin_buff = bytearray()
self.__lin_active = True
print("GOT LIN (KWP FAST) WAKEUP SIGNAL")
self._lin_send(0x10, b'\xC1\x8F\xE9')
self.__reset_lin_timeout()
else:
msglen = lin_buff[0] & 0x7
if lin_buff[0] & 0xF8 not in (0x80, 0xC0):
print("Invalid bytes at start of message")
print(" BUFF", lin_buff)
continue
if len(lin_buff) < msglen + 4: continue
if lin_checksum(lin_buff[:-1]) != lin_buff[-1]: continue
self.__lin_process_msg(lin_buff[0] & 0xF8, #Priority
lin_buff[1], lin_buff[2], lin_buff[3:-1])
lin_buff = bytearray()
def _lin_send(self, to_addr, msg):
print(" LIN Reply (%x)" % to_addr, binascii.hexlify(msg))
PHYS_ADDR = 0x80
FUNC_ADDR = 0xC0
RECV = 0xF1
SEND = 0x33 # Car OBD Functional Address
headers = struct.pack("BBB", PHYS_ADDR | len(msg), RECV, to_addr)
self.panda.kline_send(headers + msg)
def __reset_lin_timeout(self):
if self.__lin_timer:
self.__lin_timer.cancel()
self.__lin_timer = threading.Timer(5, self.__lin_timeout_handler)
self.__lin_timer.start()
def __lin_timeout_handler(self):
print("LIN TIMEOUT")
self.__lin_timer = None
self.__lin_active = False
def __lin_process_msg(self, priority, toaddr, fromaddr, data):
self.__reset_lin_timeout()
if not self.__silent and data != b'\x3E':
print("LIN MSG", "Addr:", hex(toaddr), "obdLen:", len(data),
binascii.hexlify(data))
outmsg = None
#if data == b'\x3E':
# print("KEEP ALIVE")
#el
if len(data) > 1:
outmsg = self._process_obd(data[0], data[1])
if outmsg:
obd_header = struct.pack("BB", 0x40 | data[0], data[1])
if len(outmsg) <= 5:
self._lin_send(0x10, obd_header + outmsg)
else:
first_msg_len = min(4, len(outmsg)%4)
self._lin_send(0x10, obd_header + b'\x01' +
b'\x00'*(4-first_msg_len) +
outmsg[:first_msg_len])
for num, i in enumerate(range(first_msg_len, len(outmsg), 4)):
self._lin_send(0x10, obd_header +
struct.pack('B', (num+2)%0x100) + outmsg[i:i+4])
#########################
# CAN related functions #
#########################
def __can_monitor(self):
print("STARTING CAN THREAD")
self.panda.set_can_speed_kbps(0, self.__can_kbaud)
self.panda.can_recv() # Toss whatever was already there
while not self.__stop:
for address, ts, data, src in self.panda.can_recv():
if self.__on and src is 0 and len(data) >= 3:
self.__can_process_msg(data[1], data[2], address, ts, data, src)
def can_mode_11b(self):
self.__can11b = True
self.__can29b = False
@ -78,59 +173,53 @@ class ELMCanCarSimulator(threading.Thread):
self.__can11b = True
self.__can29b = True
def _process_msg(self, mode, pid, address, ts, data, src):
if not self.__silent:
print("MSG", binascii.hexlify(data[1:1+data[0]]), "Addr:", hex(address),
"Mode:", hex(mode)[2:].zfill(2), "PID:", hex(pid)[2:].zfill(2),
"canLen:", len(data), binascii.hexlify(data))
def change_can_baud(self, kbaud):
self.__can_kbaud = kbaud
self.panda.set_can_speed_kbps(0, self.__can_kbaud)
if self._addr_matches(address) and len(data) == 8:
def can_add_extra_noise(self, noise_msg, addr=None):
self.__can_extra_noise_msgs.append((addr, noise_msg))
def _can_send(self, addr, msg):
if not self.__silent:
print(" CAN Reply (%x)" % addr, binascii.hexlify(msg))
self.panda.can_send(addr, msg + b'\x00'*(8-len(msg)), 0)
if self.__can_extra_noise_msgs:
noise = self.__can_extra_noise_msgs.popleft()
self.panda.can_send(noise[0] if noise[0] is not None else addr,
noise[1] + b'\x00'*(8-len(noise[1])), 0)
def _can_addr_matches(self, addr):
if self.__can11b and (addr == 0x7DF or (addr & 0x7F8) == 0x7E0):
return True
if self.__can29b and (addr == 0x18db33f1 or (addr & 0x1FFF00FF) == 0x18da00f1):
return True
return False
def __can_process_msg(self, mode, pid, address, ts, data, src):
if not self.__silent:
print("CAN MSG", binascii.hexlify(data[1:1+data[0]]),
"Addr:", hex(address), "Mode:", hex(mode)[2:].zfill(2),
"PID:", hex(pid)[2:].zfill(2), "canLen:", len(data),
binascii.hexlify(data))
if self._can_addr_matches(address) and len(data) == 8:
outmsg = None
if data[:3] == b'\x30\x00\x00' and len(self._multipart_data):
if data[:3] == b'\x30\x00\x00' and len(self.__can_multipart_data):
if not self.__silent:
print("Request for more data");
outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110
msgnum = 1
while(self._multipart_data):
datalen = min(7, len(self._multipart_data))
msgpiece = struct.pack("B", 0x20 | msgnum) + self._multipart_data[:datalen]
while(self.__can_multipart_data):
datalen = min(7, len(self.__can_multipart_data))
msgpiece = struct.pack("B", 0x20 | msgnum) + self.__can_multipart_data[:datalen]
self._can_send(outaddr, msgpiece)
self._multipart_data = self._multipart_data[7:]
self.__can_multipart_data = self.__can_multipart_data[7:]
msgnum = (msgnum+1)%0x10
time.sleep(0.01)
elif mode == 0x01: # Mode: Show current data
if pid == 0x00: #List supported things
outmsg = b"\xff\xff\xff\xfe"#b"\xBE\x1F\xB8\x10" #Bitfield, random features
elif pid == 0x01: # Monitor Status since DTC cleared
outmsg = b"\x00\x00\x00\x00" #Bitfield, random features
elif pid == 0x04: # Calculated engine load
outmsg = b"\x2f"
elif pid == 0x05: # Engine coolant temperature
outmsg = b"\x3c"
elif pid == 0x0B: # Intake manifold absolute pressure
outmsg = b"\x90"
elif pid == 0x0C: # Engine RPM
outmsg = b"\x1A\xF8"
elif pid == 0x0D: # Vehicle Speed
outmsg = b"\x53"
elif pid == 0x10: # MAF air flow rate
outmsg = b"\x01\xA0"
elif pid == 0x11: # Throttle Position
outmsg = b"\x90"
elif pid == 0x33: # Absolute Barometric Pressure
outmsg = b"\x90"
elif mode == 0x09: # Mode: Request vehicle information
if pid == 0x02: # Show VIN
outmsg = b"1D4GP00R55B123456"
if pid == 0xFD: # test long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80))
outmsg = b'\xAA\xAA\xAA' + b''.join(parts)
if pid == 0xFE: # test very long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(584))
outmsg = b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA'
if pid == 0xFF:
outmsg = b"\xAA"*(0xFFF-3)
else:
outmsg = self._process_obd(mode, pid)
if outmsg:
outaddr = 0x7E8 if address == 0x7DF or address == 0x7E0 else 0x18DAF110
@ -145,18 +234,68 @@ class ELMCanCarSimulator(threading.Thread):
payload_len&0xFF,
0x40|data[1], pid, 1) + outmsg[:first_msg_len]
self._can_send(outaddr, msgpiece)
self._multipart_data = outmsg[first_msg_len:]
self.__can_multipart_data = outmsg[first_msg_len:]
#########################
# General OBD functions #
#########################
def _process_obd(self, mode, pid):
if mode == 0x01: # Mode: Show current data
if pid == 0x00: #List supported things
return b"\xff\xff\xff\xfe"#b"\xBE\x1F\xB8\x10" #Bitfield, random features
elif pid == 0x01: # Monitor Status since DTC cleared
return b"\x00\x00\x00\x00" #Bitfield, random features
elif pid == 0x04: # Calculated engine load
return b"\x2f"
elif pid == 0x05: # Engine coolant temperature
return b"\x3c"
elif pid == 0x0B: # Intake manifold absolute pressure
return b"\x90"
elif pid == 0x0C: # Engine RPM
return b"\x1A\xF8"
elif pid == 0x0D: # Vehicle Speed
return b"\x53"
elif pid == 0x10: # MAF air flow rate
return b"\x01\xA0"
elif pid == 0x11: # Throttle Position
return b"\x90"
elif pid == 0x33: # Absolute Barometric Pressure
return b"\x90"
elif mode == 0x09: # Mode: Request vehicle information
if pid == 0x02: # Show VIN
return b"1D4GP00R55B123456"
if pid == 0xFD: # test long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(80))
return b'\xAA\xAA\xAA' + b''.join(parts)
if pid == 0xFE: # test very long multi message
parts = (b'\xAA\xAA\xAA' + struct.pack(">I", num) for num in range(584))
return b'\xAA\xAA\xAA' + b''.join(parts) + b'\xAA'
if pid == 0xFF:
return b"\xAA"*(0xFFF-3)
if __name__ == "__main__":
serial = os.getenv("SERIAL") if os.getenv("SERIAL") else None
kbaud = int(os.getenv("CANKBAUD")) if os.getenv("CANKBAUD") else 500
bitwidth = int(os.getenv("CANBITWIDTH")) if os.getenv("CANBITWIDTH") else 0
sim = ELMCanCarSimulator(serial, can_kbaud=kbaud)
sim = ELMCarSimulator(serial, can_kbaud=kbaud)
if(bitwidth == 0):
sim.can_mode_11b_29b()
if(bitwidth == 11):
sim.can_mode_11b()
if(bitwidth == 29):
sim.can_mode_29b()
import signal
def signal_handler(signal, frame):
print('\nShutting down simulator')
sim.stop()
sim.join()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
sim.start()
signal.pause()