Updater tests (#1974)
* refactor exit handling * test update * more reliable? * better * init git in CI * testy tester * CI should work * test overlay reinit * only one * still need to fix loop test * more patience * more patience in CI * no ping in CI * this is cleaner * need to run these in jenkins * clean up * run in jenkins * fix test file path * it's a git repo now * no commit * reinit * remove duplicate * why not git * la * git status * pythonpath * fix * no CI fro now * check overlay consistent * more tests * make more changes in the update commit * sample * no kpull/1991/head
parent
0fd763f902
commit
fe18a014c7
|
@ -0,0 +1,256 @@
|
|||
#!/usr/bin/env python3
|
||||
import datetime
|
||||
import os
|
||||
import time
|
||||
import tempfile
|
||||
import unittest
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
import random
|
||||
|
||||
from common.basedir import BASEDIR
|
||||
from common.params import Params
|
||||
|
||||
|
||||
class TestUpdater(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.updated_proc = None
|
||||
|
||||
self.tmp_dir = tempfile.TemporaryDirectory()
|
||||
org_dir = os.path.join(self.tmp_dir.name, "commaai")
|
||||
|
||||
self.basedir = os.path.join(org_dir, "openpilot")
|
||||
self.git_remote_dir = os.path.join(org_dir, "openpilot_remote")
|
||||
self.staging_dir = os.path.join(org_dir, "safe_staging")
|
||||
for d in [org_dir, self.basedir, self.git_remote_dir, self.staging_dir]:
|
||||
os.mkdir(d)
|
||||
|
||||
self.upper_dir = os.path.join(self.staging_dir, "upper")
|
||||
self.merged_dir = os.path.join(self.staging_dir, "merged")
|
||||
self.finalized_dir = os.path.join(self.staging_dir, "finalized")
|
||||
|
||||
# setup local submodule remotes
|
||||
submodules = subprocess.check_output("git submodule --quiet foreach 'echo $name'",
|
||||
shell=True, cwd=BASEDIR, encoding='utf8').split()
|
||||
for s in submodules:
|
||||
sub_path = os.path.join(org_dir, s.split("_repo")[0])
|
||||
self._run(f"git clone {s} {sub_path}.git", cwd=BASEDIR)
|
||||
|
||||
# setup two git repos, a remote and one we'll run updated in
|
||||
self._run([
|
||||
f"git clone {BASEDIR} {self.git_remote_dir}",
|
||||
f"git clone {self.git_remote_dir} {self.basedir}",
|
||||
f"cd {self.basedir} && git submodule init && git submodule update",
|
||||
f"cd {self.basedir} && scons -j{os.cpu_count()} cereal"
|
||||
])
|
||||
|
||||
self.params = Params(db=os.path.join(self.basedir, "persist/params"))
|
||||
self.params.clear_all()
|
||||
os.sync()
|
||||
|
||||
def tearDown(self):
|
||||
try:
|
||||
if self.updated_proc is not None:
|
||||
self.updated_proc.terminate()
|
||||
self.updated_proc.wait(30)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
self.tmp_dir.cleanup()
|
||||
|
||||
|
||||
# *** test helpers ***
|
||||
|
||||
|
||||
def _run(self, cmd, cwd=None):
|
||||
if not isinstance(cmd, list):
|
||||
cmd = (cmd,)
|
||||
|
||||
for c in cmd:
|
||||
subprocess.check_output(c, cwd=cwd, shell=True)
|
||||
|
||||
def _get_updated_proc(self):
|
||||
os.environ["PYTHONPATH"] = self.basedir
|
||||
os.environ["GIT_AUTHOR_NAME"] = "testy tester"
|
||||
os.environ["GIT_COMMITTER_NAME"] = "testy tester"
|
||||
os.environ["GIT_AUTHOR_EMAIL"] = "testy@tester.test"
|
||||
os.environ["GIT_COMMITTER_EMAIL"] = "testy@tester.test"
|
||||
os.environ["UPDATER_TEST_IP"] = "localhost"
|
||||
os.environ["UPDATER_LOCK_FILE"] = os.path.join(self.tmp_dir.name, "updater.lock")
|
||||
os.environ["UPDATER_STAGING_ROOT"] = self.staging_dir
|
||||
updated_path = os.path.join(self.basedir, "selfdrive/updated.py")
|
||||
return subprocess.Popen(updated_path, env=os.environ)
|
||||
|
||||
def _start_updater(self, offroad=True, nosleep=False):
|
||||
self.params.put("IsOffroad", "1" if offroad else "0")
|
||||
self.updated_proc = self._get_updated_proc()
|
||||
if not nosleep:
|
||||
time.sleep(1)
|
||||
|
||||
def _update_now(self):
|
||||
self.updated_proc.send_signal(signal.SIGHUP)
|
||||
|
||||
# TODO: this should be implemented in params
|
||||
def _read_param(self, key, timeout=1):
|
||||
ret = None
|
||||
start_time = time.monotonic()
|
||||
while ret is None:
|
||||
ret = self.params.get(key, encoding='utf8')
|
||||
if time.monotonic() - start_time > timeout:
|
||||
break
|
||||
time.sleep(0.01)
|
||||
return ret
|
||||
|
||||
def _wait_for_update(self, timeout=30, clear_param=False):
|
||||
if clear_param:
|
||||
self.params.delete("LastUpdateTime")
|
||||
|
||||
self._update_now()
|
||||
t = self._read_param("LastUpdateTime", timeout=timeout)
|
||||
if t is None:
|
||||
raise Exception("timed out waiting for update to complate")
|
||||
|
||||
def _make_commit(self):
|
||||
all_dirs, all_files = [], []
|
||||
for root, dirs, files in os.walk(self.git_remote_dir):
|
||||
if ".git" in root:
|
||||
continue
|
||||
for d in dirs:
|
||||
all_dirs.append(os.path.join(root, d))
|
||||
for f in files:
|
||||
all_files.append(os.path.join(root, f))
|
||||
|
||||
# make a new dir and some new files
|
||||
new_dir = os.path.join(self.git_remote_dir, "this_is_a_new_dir")
|
||||
os.mkdir(new_dir)
|
||||
for _ in range(random.randrange(5, 30)):
|
||||
for d in (new_dir, random.choice(all_dirs)):
|
||||
with tempfile.NamedTemporaryFile(dir=d, delete=False) as f:
|
||||
f.write(os.urandom(random.randrange(1, 1000000)))
|
||||
|
||||
# modify some files
|
||||
for f in random.sample(all_files, random.randrange(5, 50)):
|
||||
with open(f, "w+") as ff:
|
||||
txt = ff.readlines()
|
||||
ff.seek(0)
|
||||
for line in txt:
|
||||
ff.write(line[::-1])
|
||||
|
||||
# remove some files
|
||||
for f in random.sample(all_files, random.randrange(5, 50)):
|
||||
os.remove(f)
|
||||
|
||||
# remove some dirs
|
||||
for d in random.sample(all_dirs, random.randrange(1, 10)):
|
||||
shutil.rmtree(d)
|
||||
|
||||
# commit the changes
|
||||
self._run([
|
||||
"git add -A",
|
||||
"git commit -m 'an update'",
|
||||
], cwd=self.git_remote_dir)
|
||||
|
||||
def _check_update_state(self, update_available):
|
||||
# make sure LastUpdateTime is recent
|
||||
t = self._read_param("LastUpdateTime")
|
||||
last_update_time = datetime.datetime.fromisoformat(t)
|
||||
td = datetime.datetime.utcnow() - last_update_time
|
||||
self.assertLess(td.total_seconds(), 10)
|
||||
self.params.delete("LastUpdateTime")
|
||||
|
||||
# wait a bit for the rest of the params to be written
|
||||
time.sleep(0.1)
|
||||
|
||||
# check params
|
||||
update = self._read_param("UpdateAvailable")
|
||||
self.assertEqual(update == "1", update_available, f"UpdateAvailable: {repr(update)}")
|
||||
self.assertEqual(self._read_param("UpdateFailedCount"), "0")
|
||||
|
||||
# TODO: check that the finalized update actually matches remote
|
||||
# check the .overlay_init and .overlay_consistent flags
|
||||
self.assertTrue(os.path.isfile(os.path.join(self.basedir, ".overlay_init")))
|
||||
self.assertEqual(os.path.isfile(os.path.join(self.finalized_dir, ".overlay_consistent")), update_available)
|
||||
|
||||
|
||||
# *** test cases ***
|
||||
|
||||
|
||||
# Run updated for 100 cycles with no update
|
||||
def test_no_update(self):
|
||||
self._start_updater()
|
||||
for _ in range(100):
|
||||
self._wait_for_update(clear_param=True)
|
||||
self._check_update_state(False)
|
||||
|
||||
# Let the updater run with no update for a cycle, then write an update
|
||||
def test_update(self):
|
||||
self._start_updater()
|
||||
|
||||
# run for a cycle with no update
|
||||
self._wait_for_update(clear_param=True)
|
||||
self._check_update_state(False)
|
||||
|
||||
# write an update to our remote
|
||||
self._make_commit()
|
||||
|
||||
# run for a cycle to get the update
|
||||
self._wait_for_update(timeout=60, clear_param=True)
|
||||
self._check_update_state(True)
|
||||
|
||||
# run another cycle with no update
|
||||
self._wait_for_update(clear_param=True)
|
||||
self._check_update_state(True)
|
||||
|
||||
# Let the updater run for 10 cycles, and write an update every cycle
|
||||
@unittest.skip("need to make this faster")
|
||||
def test_update_loop(self):
|
||||
self._start_updater()
|
||||
|
||||
# run for a cycle with no update
|
||||
self._wait_for_update(clear_param=True)
|
||||
for _ in range(10):
|
||||
time.sleep(0.5)
|
||||
self._make_commit()
|
||||
self._wait_for_update(timeout=90, clear_param=True)
|
||||
self._check_update_state(True)
|
||||
|
||||
# Test overlay re-creation after tracking a new file in basedir's git
|
||||
def test_overlay_reinit(self):
|
||||
self._start_updater()
|
||||
|
||||
overlay_init_fn = os.path.join(self.basedir, ".overlay_init")
|
||||
|
||||
# run for a cycle with no update
|
||||
self._wait_for_update(clear_param=True)
|
||||
self.params.delete("LastUpdateTime")
|
||||
first_mtime = os.path.getmtime(overlay_init_fn)
|
||||
|
||||
# touch a file in the basedir
|
||||
self._run("touch new_file && git add new_file", cwd=self.basedir)
|
||||
|
||||
# run another cycle, should have a new mtime
|
||||
self._wait_for_update(clear_param=True)
|
||||
second_mtime = os.path.getmtime(overlay_init_fn)
|
||||
self.assertTrue(first_mtime != second_mtime)
|
||||
|
||||
# run another cycle, mtime should be same as last cycle
|
||||
self._wait_for_update(clear_param=True)
|
||||
new_mtime = os.path.getmtime(overlay_init_fn)
|
||||
self.assertTrue(second_mtime == new_mtime)
|
||||
|
||||
# Make sure updated exits if another instance is running
|
||||
def test_multiple_instances(self):
|
||||
# start updated and let it run for a cycle
|
||||
self._start_updater()
|
||||
time.sleep(1)
|
||||
self._wait_for_update(clear_param=True)
|
||||
|
||||
# start another instance
|
||||
second_updated = self._get_updated_proc()
|
||||
ret_code = second_updated.wait(timeout=5)
|
||||
self.assertTrue(ret_code is not None)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -28,16 +28,18 @@ import subprocess
|
|||
import psutil
|
||||
import shutil
|
||||
import signal
|
||||
from pathlib import Path
|
||||
import fcntl
|
||||
import threading
|
||||
from cffi import FFI
|
||||
from pathlib import Path
|
||||
|
||||
from common.basedir import BASEDIR
|
||||
from common.params import Params
|
||||
from selfdrive.swaglog import cloudlog
|
||||
|
||||
STAGING_ROOT = "/data/safe_staging"
|
||||
TEST_IP = os.getenv("UPDATER_TEST_IP", "8.8.8.8")
|
||||
LOCK_FILE = os.getenv("UPDATER_LOCK_FILE", "/tmp/safe_staging_overlay.lock")
|
||||
STAGING_ROOT = os.getenv("UPDATER_STAGING_ROOT", "/data/safe_staging")
|
||||
|
||||
OVERLAY_UPPER = os.path.join(STAGING_ROOT, "upper")
|
||||
OVERLAY_METADATA = os.path.join(STAGING_ROOT, "metadata")
|
||||
|
@ -81,20 +83,13 @@ def run(cmd, cwd=None):
|
|||
return subprocess.check_output(cmd, cwd=cwd, stderr=subprocess.STDOUT, encoding='utf8')
|
||||
|
||||
|
||||
def remove_consistent_flag():
|
||||
def set_consistent_flag(consistent):
|
||||
os.system("sync")
|
||||
consistent_file = Path(os.path.join(FINALIZED, ".overlay_consistent"))
|
||||
try:
|
||||
if consistent:
|
||||
consistent_file.touch()
|
||||
elif not consistent and consistent_file.exists():
|
||||
consistent_file.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
os.system("sync")
|
||||
|
||||
|
||||
def set_consistent_flag():
|
||||
consistent_file = Path(os.path.join(FINALIZED, ".overlay_consistent"))
|
||||
os.system("sync")
|
||||
consistent_file.touch()
|
||||
os.system("sync")
|
||||
|
||||
|
||||
|
@ -150,7 +145,7 @@ def init_ovfs():
|
|||
cloudlog.info("preparing new safe staging area")
|
||||
Params().put("UpdateAvailable", "0")
|
||||
|
||||
remove_consistent_flag()
|
||||
set_consistent_flag(False)
|
||||
|
||||
dismount_ovfs()
|
||||
if os.path.isdir(STAGING_ROOT):
|
||||
|
@ -158,6 +153,7 @@ def init_ovfs():
|
|||
|
||||
for dirname in [STAGING_ROOT, OVERLAY_UPPER, OVERLAY_METADATA, OVERLAY_MERGED, FINALIZED]:
|
||||
os.mkdir(dirname, 0o755)
|
||||
|
||||
if not os.lstat(BASEDIR).st_dev == os.lstat(OVERLAY_MERGED).st_dev:
|
||||
raise RuntimeError("base and overlay merge directories are on different filesystems; not valid for overlay FS!")
|
||||
|
||||
|
@ -216,13 +212,13 @@ def attempt_update():
|
|||
|
||||
# Un-set the validity flag to prevent the finalized tree from being
|
||||
# activated later if the finalize step is interrupted
|
||||
remove_consistent_flag()
|
||||
set_consistent_flag(False)
|
||||
|
||||
finalize_from_ovfs()
|
||||
|
||||
# Make sure the validity flag lands on disk LAST, only when the local git
|
||||
# repo and OP install are in a consistent state.
|
||||
set_consistent_flag()
|
||||
set_consistent_flag(True)
|
||||
|
||||
cloudlog.info("update successful!")
|
||||
else:
|
||||
|
@ -232,8 +228,6 @@ def attempt_update():
|
|||
|
||||
|
||||
def main():
|
||||
update_failed_count = 0
|
||||
overlay_init_done = False
|
||||
params = Params()
|
||||
|
||||
if params.get("DisableUpdates") == b"1":
|
||||
|
@ -247,7 +241,7 @@ def main():
|
|||
if psutil.LINUX:
|
||||
p.ionice(psutil.IOPRIO_CLASS_BE, value=7)
|
||||
|
||||
ov_lock_fd = open('/tmp/safe_staging_overlay.lock', 'w')
|
||||
ov_lock_fd = open(LOCK_FILE, 'w')
|
||||
try:
|
||||
fcntl.flock(ov_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except IOError:
|
||||
|
@ -257,33 +251,34 @@ def main():
|
|||
wait_helper = WaitTimeHelper()
|
||||
wait_helper.sleep(30)
|
||||
|
||||
update_failed_count = 0
|
||||
overlay_initialized = False
|
||||
while not wait_helper.shutdown:
|
||||
update_failed_count += 1
|
||||
wait_helper.ready_event.clear()
|
||||
|
||||
# Check for internet every 30s
|
||||
time_wrong = datetime.datetime.utcnow().year < 2019
|
||||
ping_failed = os.system("git ls-remote --tags --quiet") != 0
|
||||
ping_failed = os.system(f"ping -W 4 -c 1 {TEST_IP}") != 0
|
||||
if ping_failed or time_wrong:
|
||||
wait_helper.sleep(30)
|
||||
continue
|
||||
|
||||
# Attempt an update
|
||||
try:
|
||||
# If the git directory has modifcations after we created the overlay
|
||||
# we need to recreate the overlay
|
||||
if overlay_init_done:
|
||||
# Re-create the overlay if BASEDIR/.git has changed since we created the overlay
|
||||
if overlay_initialized:
|
||||
overlay_init_fn = os.path.join(BASEDIR, ".overlay_init")
|
||||
git_dir_path = os.path.join(BASEDIR, ".git")
|
||||
new_files = run(["find", git_dir_path, "-newer", overlay_init_fn])
|
||||
|
||||
if len(new_files.splitlines()):
|
||||
cloudlog.info(".git directory changed, recreating overlay")
|
||||
overlay_init_done = False
|
||||
overlay_initialized = False
|
||||
|
||||
if not overlay_init_done:
|
||||
if not overlay_initialized:
|
||||
init_ovfs()
|
||||
overlay_init_done = True
|
||||
overlay_initialized = True
|
||||
|
||||
if params.get("IsOffroad") == b"1":
|
||||
attempt_update()
|
||||
|
@ -298,11 +293,13 @@ def main():
|
|||
output=e.output,
|
||||
returncode=e.returncode
|
||||
)
|
||||
overlay_init_done = False
|
||||
overlay_initialized = False
|
||||
except Exception:
|
||||
cloudlog.exception("uncaught updated exception, shouldn't happen")
|
||||
|
||||
params.put("UpdateFailedCount", str(update_failed_count))
|
||||
|
||||
# Wait 10 minutes between update attempts
|
||||
wait_helper.sleep(60*10)
|
||||
|
||||
# We've been signaled to shut down
|
||||
|
|
Loading…
Reference in New Issue