ELM327: Tests for ISO 14230-4 (KWP FAST).
parent
47d5627836
commit
47bcf40f7e
|
@ -29,11 +29,13 @@ def send_compare(s, dat, ret, timeout=4):
|
|||
s.send(dat)
|
||||
res = b''
|
||||
while ret.startswith(res) and ret != res:
|
||||
print("Waiting")
|
||||
ready = select.select([s], [], [], timeout)
|
||||
if not ready[0]:
|
||||
print("current recv data:", repr(res))
|
||||
break;
|
||||
res += s.recv(1000)
|
||||
print("final recv data: '%s'" % repr(res))
|
||||
assert ret == res, "Data does not agree (%s) (%s)"%(repr(ret), repr(res))
|
||||
|
||||
def sync_reset(s):
|
||||
|
@ -41,7 +43,7 @@ def sync_reset(s):
|
|||
res = b''
|
||||
while not res.endswith("ELM327 v1.5\r\r>"):
|
||||
res += read_or_fail(s)
|
||||
print("RES IS", repr(res))
|
||||
print("Reset response is '%s'" % repr(res))
|
||||
|
||||
def test_reset():
|
||||
s = socket.create_connection(("192.168.0.10", 35000))
|
||||
|
@ -162,6 +164,110 @@ def test_elm_protocol_failure():
|
|||
finally:
|
||||
s.close()
|
||||
|
||||
def test_elm_protocol_autodetect_ISO14230_KWP_FAST():
|
||||
s = elm_connect()
|
||||
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
||||
sim = elm_car_simulator.ELMCarSimulator(serial, can=False)#, silent=True)
|
||||
sim.start()
|
||||
|
||||
try:
|
||||
sync_reset(s)
|
||||
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
||||
send_compare(s, b'ATH0\r', b'OK\r\r>') # Headers ON
|
||||
send_compare(s, b'ATS0\r', b"OK\r\r>")
|
||||
|
||||
send_compare(s, b'ATSP0\r', b"OK\r\r>")
|
||||
send_compare(s, b'010D\r', b"SEARCHING...\r410D53\r\r>", timeout=10)
|
||||
send_compare(s, b'ATDPN\r', b"A5\r\r>")
|
||||
finally:
|
||||
sim.stop()
|
||||
sim.join()
|
||||
s.close()
|
||||
|
||||
def test_elm_basic_send_lin():
|
||||
s = elm_connect()
|
||||
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
||||
sim = elm_car_simulator.ELMCarSimulator(serial, can=False)#, silent=True)
|
||||
sim.start()
|
||||
|
||||
try:
|
||||
sync_reset(s)
|
||||
send_compare(s, b'ATSP5\r', b"ATSP5\rOK\r\r>") # Set Proto
|
||||
|
||||
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
||||
send_compare(s, b'0100\r', b"BUS INIT: OK\r41 00 FF FF FF FE \r\r>")
|
||||
send_compare(s, b'010D\r', b"41 0D 53 \r\r>")
|
||||
|
||||
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces Off
|
||||
send_compare(s, b'0100\r', b"4100FFFFFFFE\r\r>")
|
||||
send_compare(s, b'010D\r', b"410D53\r\r>")
|
||||
|
||||
send_compare(s, b'ATH1\r', b'OK\r\r>') # Spaces Off Headers On
|
||||
send_compare(s, b'0100\r', b"86F1104100FFFFFFFEC3\r\r>")
|
||||
send_compare(s, b'010D\r', b"83F110410D5325\r\r>")
|
||||
|
||||
send_compare(s, b'ATS1\r', b'OK\r\r>') # Spaces On Headers On
|
||||
send_compare(s, b'0100\r', b"86 F1 10 41 00 FF FF FF FE C3 \r\r>")
|
||||
send_compare(s, b'010D\r', b"83 F1 10 41 0D 53 25 \r\r>")
|
||||
|
||||
send_compare(s, b'1F00\r', b"NO DATA\r\r>") # Unhandled msg, no response.
|
||||
|
||||
# Repeat last check to see if it still works after NO DATA was received
|
||||
send_compare(s, b'0100\r', b"86 F1 10 41 00 FF FF FF FE C3 \r\r>")
|
||||
send_compare(s, b'010D\r', b"83 F1 10 41 0D 53 25 \r\r>")
|
||||
finally:
|
||||
sim.stop()
|
||||
sim.join()
|
||||
s.close()
|
||||
|
||||
def test_elm_send_lin_multiline_msg():
|
||||
s = elm_connect()
|
||||
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
||||
sim = elm_car_simulator.ELMCarSimulator(serial, can=False)
|
||||
sim.start()
|
||||
|
||||
try:
|
||||
sync_reset(s)
|
||||
send_compare(s, b'ATE0\r', b'ATE0\rOK\r\r>') # Echo OFF
|
||||
send_compare(s, b'ATSP5\r', b"OK\r\r>") # Set Proto
|
||||
|
||||
send_compare(s, b'0902\r', # headers OFF, Spaces ON
|
||||
b"BUS INIT: OK\r"
|
||||
"49 02 01 00 00 00 31 \r"
|
||||
"49 02 02 44 34 47 50 \r"
|
||||
"49 02 03 30 30 52 35 \r"
|
||||
"49 02 04 35 42 31 32 \r"
|
||||
"49 02 05 33 34 35 36 \r\r>")
|
||||
|
||||
send_compare(s, b'ATS0\r', b'OK\r\r>') # Spaces OFF
|
||||
send_compare(s, b'0902\r', # Headers OFF, Spaces OFF
|
||||
b"49020100000031\r"
|
||||
"49020244344750\r"
|
||||
"49020330305235\r"
|
||||
"49020435423132\r"
|
||||
"49020533343536\r\r>")
|
||||
|
||||
send_compare(s, b'ATH1\r', b'OK\r\r>') # Headers ON
|
||||
send_compare(s, b'0902\r', # Headers ON, Spaces OFF
|
||||
b"87F1104902010000003105\r"
|
||||
"87F11049020244344750E4\r"
|
||||
"87F11049020330305235BD\r"
|
||||
"87F11049020435423132B1\r"
|
||||
"87F11049020533343536AA\r\r>")
|
||||
|
||||
send_compare(s, b'ATS1\r', b'OK\r\r>') # Spaces ON
|
||||
send_compare(s, b'0902\r', # Headers ON, Spaces ON
|
||||
b"87 F1 10 49 02 01 00 00 00 31 05 \r"
|
||||
"87 F1 10 49 02 02 44 34 47 50 E4 \r"
|
||||
"87 F1 10 49 02 03 30 30 52 35 BD \r"
|
||||
"87 F1 10 49 02 04 35 42 31 32 B1 \r"
|
||||
"87 F1 10 49 02 05 33 34 35 36 AA \r\r>")
|
||||
finally:
|
||||
sim.stop()
|
||||
sim.join()
|
||||
s.close()
|
||||
|
||||
#////////////
|
||||
def test_elm_protocol_autodetect_ISO15765():
|
||||
s = elm_connect()
|
||||
serial = os.getenv("CANSIMSERIAL") if os.getenv("CANSIMSERIAL") else None
|
||||
|
|
|
@ -53,14 +53,19 @@ class ELMCarSimulator():
|
|||
self.__lin_monitor_thread.join()
|
||||
if self.__can_monitor_thread.is_alive():
|
||||
self.__can_monitor_thread.join()
|
||||
if self.__p:
|
||||
print("closing handle")
|
||||
self.__p.close()
|
||||
|
||||
def set_enable(self, on):
|
||||
self.__on = on
|
||||
|
||||
def start(self):
|
||||
self.panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
|
||||
self.__lin_monitor_thread.start()
|
||||
self.__can_monitor_thread.start()
|
||||
if self.__lin_enable:
|
||||
self.__lin_monitor_thread.start()
|
||||
if self.__can_enable:
|
||||
self.__can_monitor_thread.start()
|
||||
|
||||
#########################
|
||||
# LIN related functions #
|
||||
|
@ -79,14 +84,14 @@ class ELMCarSimulator():
|
|||
continue
|
||||
|
||||
lin_buff += lin_msg
|
||||
if not self.__lin_active:
|
||||
if lin_buff.endswith(b'\xc1\x33\xf1\x81\x66'):
|
||||
lin_buff = bytearray()
|
||||
self.__lin_active = True
|
||||
print("GOT LIN (KWP FAST) WAKEUP SIGNAL")
|
||||
self._lin_send(0x10, b'\xC1\x8F\xE9')
|
||||
self.__reset_lin_timeout()
|
||||
else:
|
||||
if lin_buff.endswith(b'\xc1\x33\xf1\x81\x66'):
|
||||
lin_buff = bytearray()
|
||||
self.__lin_active = True
|
||||
print("GOT LIN (KWP FAST) WAKEUP SIGNAL")
|
||||
self._lin_send(0x10, b'\xC1\x8F\xE9')
|
||||
self.__reset_lin_timeout()
|
||||
continue
|
||||
if self.__lin_active:
|
||||
msglen = lin_buff[0] & 0x7
|
||||
if lin_buff[0] & 0xF8 not in (0x80, 0xC0):
|
||||
print("Invalid bytes at start of message")
|
||||
|
|
Loading…
Reference in New Issue