nopenpilot/selfdrive/hardware/tici/agnos.py

255 lines
7.9 KiB
Python
Executable File

#!/usr/bin/env python3
import json
import lzma
import hashlib
import requests
import struct
import subprocess
import time
import os
from typing import Generator
SPARSE_CHUNK_FMT = struct.Struct('H2xI4x')
class StreamingDecompressor:
def __init__(self, url: str) -> None:
self.buf = b""
self.req = requests.get(url, stream=True, headers={'Accept-Encoding': None})
self.it = self.req.iter_content(chunk_size=1024 * 1024)
self.decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_AUTO)
self.eof = False
self.sha256 = hashlib.sha256()
def read(self, length: int) -> bytes:
while len(self.buf) < length:
self.req.raise_for_status()
try:
compressed = next(self.it)
except StopIteration:
self.eof = True
break
out = self.decompressor.decompress(compressed)
self.buf += out
result = self.buf[:length]
self.buf = self.buf[length:]
self.sha256.update(result)
return result
def unsparsify(f: StreamingDecompressor) -> Generator[bytes, None, None]:
# https://source.android.com/devices/bootloader/images#sparse-format
magic = struct.unpack("I", f.read(4))[0]
assert(magic == 0xed26ff3a)
# Version
major = struct.unpack("H", f.read(2))[0]
minor = struct.unpack("H", f.read(2))[0]
assert(major == 1 and minor == 0)
f.read(2) # file header size
f.read(2) # chunk header size
block_sz = struct.unpack("I", f.read(4))[0]
f.read(4) # total blocks
num_chunks = struct.unpack("I", f.read(4))[0]
f.read(4) # crc checksum
for _ in range(num_chunks):
chunk_type, out_blocks = SPARSE_CHUNK_FMT.unpack(f.read(12))
if chunk_type == 0xcac1: # Raw
# TODO: yield in smaller chunks. Yielding only block_sz is too slow. Largest observed data chunk is 252 MB.
yield f.read(out_blocks * block_sz)
elif chunk_type == 0xcac2: # Fill
filler = f.read(4) * (block_sz // 4)
for _ in range(out_blocks):
yield filler
elif chunk_type == 0xcac3: # Don't care
yield b""
else:
raise Exception("Unhandled sparse chunk type")
# noop wrapper with same API as unsparsify() for non sparse images
def noop(f: StreamingDecompressor) -> Generator[bytes, None, None]:
while not f.eof:
yield f.read(1024 * 1024)
def get_target_slot_number() -> int:
current_slot = subprocess.check_output(["abctl", "--boot_slot"], encoding='utf-8').strip()
return 1 if current_slot == "_a" else 0
def slot_number_to_suffix(slot_number: int) -> str:
assert slot_number in (0, 1)
return '_a' if slot_number == 0 else '_b'
def get_partition_path(target_slot_number: int, partition: dict) -> str:
path = f"/dev/disk/by-partlabel/{partition['name']}"
if partition.get('has_ab', True):
path += slot_number_to_suffix(target_slot_number)
return path
def verify_partition(target_slot_number: int, partition: dict) -> bool:
full_check = partition['full_check']
path = get_partition_path(target_slot_number, partition)
partition_size = partition['size']
with open(path, 'rb+') as out:
if full_check:
raw_hash = hashlib.sha256()
pos = 0
chunk_size = 1024 * 1024
while pos < partition_size:
n = min(chunk_size, partition_size - pos)
raw_hash.update(out.read(n))
pos += n
return raw_hash.hexdigest().lower() == partition['hash_raw'].lower()
else:
out.seek(partition_size)
return out.read(64) == partition['hash_raw'].lower().encode()
def clear_partition_hash(target_slot_number: int, partition: dict) -> None:
path = get_partition_path(target_slot_number, partition)
with open(path, 'wb+') as out:
partition_size = partition['size']
out.seek(partition_size)
out.write(b"\x00" * 64)
os.sync()
def flash_partition(target_slot_number: int, partition: dict, cloudlog):
cloudlog.info(f"Downloading and writing {partition['name']}")
if verify_partition(target_slot_number, partition):
cloudlog.info(f"Already flashed {partition['name']}")
return
downloader = StreamingDecompressor(partition['url'])
# Clear hash before flashing in case we get interrupted
full_check = partition['full_check']
if not full_check:
clear_partition_hash(target_slot_number, partition)
path = get_partition_path(target_slot_number, partition)
with open(path, 'wb+') as out:
# Flash partition
last_p = 0
raw_hash = hashlib.sha256()
f = unsparsify if partition['sparse'] else noop
for chunk in f(downloader):
raw_hash.update(chunk)
out.write(chunk)
p = int(out.tell() / partition['size'] * 100)
if p != last_p:
last_p = p
print(f"Installing {partition['name']}: {p}", flush=True)
if raw_hash.hexdigest().lower() != partition['hash_raw'].lower():
raise Exception(f"Raw hash mismatch '{raw_hash.hexdigest().lower()}'")
if downloader.sha256.hexdigest().lower() != partition['hash'].lower():
raise Exception("Uncompressed hash mismatch")
if out.tell() != partition['size']:
raise Exception("Uncompressed size mismatch")
# Write hash after successfull flash
os.sync()
if not full_check:
out.write(partition['hash_raw'].lower().encode())
def swap(manifest_path: str, target_slot_number: int, cloudlog) -> None:
update = json.load(open(manifest_path))
for partition in update:
if not partition.get('full_check', False):
clear_partition_hash(target_slot_number, partition)
while True:
out = subprocess.check_output(f"abctl --set_active {target_slot_number}", shell=True, stderr=subprocess.STDOUT, encoding='utf8')
if ("No such file or directory" not in out) and ("lun as boot lun" in out):
cloudlog.info(f"Swap successfull {out}")
break
else:
cloudlog.error(f"Swap failed {out}")
def flash_agnos_update(manifest_path: str, target_slot_number: int, cloudlog) -> None:
update = json.load(open(manifest_path))
cloudlog.info(f"Target slot {target_slot_number}")
# set target slot as unbootable
os.system(f"abctl --set_unbootable {target_slot_number}")
for partition in update:
success = False
for retries in range(10):
try:
flash_partition(target_slot_number, partition, cloudlog)
success = True
break
except requests.exceptions.RequestException:
cloudlog.exception("Failed")
cloudlog.info(f"Failed to download {partition['name']}, retrying ({retries})")
time.sleep(10)
if not success:
cloudlog.info(f"Failed to flash {partition['name']}, aborting")
raise Exception("Maximum retries exceeded")
cloudlog.info(f"AGNOS ready on slot {target_slot_number}")
def verify_agnos_update(manifest_path: str, target_slot_number: int) -> bool:
update = json.load(open(manifest_path))
return all(verify_partition(target_slot_number, partition) for partition in update)
if __name__ == "__main__":
import logging
import argparse
parser = argparse.ArgumentParser(description="Flash and verify AGNOS update",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--verify", action="store_true", help="Verify and perform swap if update ready")
parser.add_argument("--swap", action="store_true", help="Verify and perform swap, downloads if necessary")
parser.add_argument("manifest", help="Manifest json")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
target_slot_number = get_target_slot_number()
if args.verify:
if verify_agnos_update(args.manifest, target_slot_number):
swap(args.manifest, target_slot_number, logging)
exit(0)
exit(1)
elif args.swap:
while not verify_agnos_update(args.manifest, target_slot_number):
logging.error("Verification failed. Flashing AGNOS")
flash_agnos_update(args.manifest, target_slot_number, logging)
logging.warning(f"Verification succeeded. Swapping to slot {target_slot_number}")
swap(args.manifest, target_slot_number, logging)
else:
flash_agnos_update(args.manifest, target_slot_number, logging)