nopenpilot/selfdrive/loggerd/tests/test_uploader.py

141 lines
4.9 KiB
Python

import time
import threading
import unittest
import logging
import json
from selfdrive.swaglog import cloudlog
import selfdrive.loggerd.uploader as uploader
from common.xattr import getxattr
from selfdrive.loggerd.tests.loggerd_tests_common import UploaderTestCase
class TestLogHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
self.reset()
def reset(self):
self.upload_order = list()
self.upload_ignored = list()
def emit(self, record):
try:
j = json.loads(record.getMessage())
if j["event"] == "upload_success":
self.upload_order.append(j["key"])
if j["event"] == "upload_ignored":
self.upload_ignored.append(j["key"])
except Exception:
pass
log_handler = TestLogHandler()
cloudlog.addHandler(log_handler)
class TestUploader(UploaderTestCase):
def setUp(self):
super(TestUploader, self).setUp()
log_handler.reset()
def start_thread(self):
self.end_event = threading.Event()
self.up_thread = threading.Thread(target=uploader.uploader_fn, args=[self.end_event])
self.up_thread.daemon = True
self.up_thread.start()
def join_thread(self):
self.end_event.set()
self.up_thread.join()
def gen_files(self, lock=False):
f_paths = list()
for t in ["bootlog.bz2", "qlog.bz2", "rlog.bz2", "dcamera.hevc", "fcamera.hevc"]:
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock))
return f_paths
def gen_order(self, seg1, seg2):
keys = [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1]
keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2]
for i in seg1:
keys += [f"{self.seg_format.format(i)}/{f}" for f in ['rlog.bz2', 'fcamera.hevc', 'dcamera.hevc']]
for i in seg2:
keys += [f"{self.seg_format2.format(i)}/{f}" for f in ['rlog.bz2', 'fcamera.hevc', 'dcamera.hevc']]
keys += [f"{self.seg_format.format(i)}/bootlog.bz2" for i in seg1]
keys += [f"{self.seg_format2.format(i)}/bootlog.bz2" for i in seg2]
return keys
def test_upload(self):
f_paths = self.gen_files(lock=False)
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
self.assertFalse(len(log_handler.upload_order) < len(f_paths), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(f_paths), "Some files were uploaded twice")
for f_path in f_paths:
self.assertTrue(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
exp_order = self.gen_order([self.seg_num], [])
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
def test_upload_ignored(self):
self.set_ignore()
f_paths = self.gen_files(lock=False)
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
self.assertTrue(len(log_handler.upload_order) == 0, "Some files were not ignored")
self.assertFalse(len(log_handler.upload_ignored) < len(f_paths), "Some files failed to ignore")
self.assertFalse(len(log_handler.upload_ignored) > len(f_paths), "Some files were ignored twice")
for f_path in f_paths:
self.assertTrue(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "All files not ignored")
exp_order = self.gen_order([self.seg_num], [])
self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order")
def test_upload_files_in_create_order(self):
f_paths = list()
seg1_nums = [0, 1, 2, 10, 20]
for i in seg1_nums:
self.seg_dir = self.seg_format.format(i)
f_paths += self.gen_files()
seg2_nums = [5, 50, 51]
for i in seg2_nums:
self.seg_dir = self.seg_format2.format(i)
f_paths += self.gen_files()
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
self.assertFalse(len(log_handler.upload_order) < len(f_paths), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(f_paths), "Some files were uploaded twice")
for f_path in f_paths:
self.assertTrue(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
exp_order = self.gen_order(seg1_nums, seg2_nums)
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
def test_no_upload_with_lock_file(self):
f_paths = self.gen_files(lock=True)
self.start_thread()
# allow enough time that files should have been uploaded if they would be uploaded
time.sleep(5)
self.join_thread()
for f_path in f_paths:
self.assertFalse(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "File upload when locked")
if __name__ == "__main__":
unittest.main()