nopenpilot/selfdrive/loggerd/tests/loggerd_tests_common.py

103 lines
2.3 KiB
Python

import os
import errno
import shutil
import random
import tempfile
import unittest
import selfdrive.loggerd.uploader as uploader
def create_random_file(file_path, size_mb, lock=False):
try:
os.mkdir(os.path.dirname(file_path))
except OSError:
pass
lock_path = file_path + ".lock"
if lock:
os.close(os.open(lock_path, os.O_CREAT | os.O_EXCL))
chunks = 128
chunk_bytes = int(size_mb * 1024 * 1024 / chunks)
data = os.urandom(chunk_bytes)
with open(file_path, 'wb') as f:
for _ in range(chunks):
f.write(data)
class MockResponse():
def __init__(self, text, status_code):
self.text = text
self.status_code = status_code
class MockApi():
def __init__(self, dongle_id):
pass
def get(self, *args, **kwargs):
return MockResponse('{"url": "http://localhost/does/not/exist", "headers": {}}', 200)
def get_token(self):
return "fake-token"
class MockApiIgnore():
def __init__(self, dongle_id):
pass
def get(self, *args, **kwargs):
return MockResponse('', 412)
def get_token(self):
return "fake-token"
class MockParams():
def __init__(self):
self.params = {
"DongleId": b"0000000000000000",
"IsOffroad": b"1",
}
def get(self, k, block=False, encoding=None):
val = self.params[k]
if encoding is not None:
return val.decode(encoding)
else:
return val
def get_bool(self, k):
val = self.params[k]
return (val == b'1')
class UploaderTestCase(unittest.TestCase):
f_type = "UNKNOWN"
def set_ignore(self):
uploader.Api = MockApiIgnore
def setUp(self):
self.root = tempfile.mkdtemp()
uploader.ROOT = self.root # Monkey patch root dir
uploader.Api = MockApi
uploader.Params = MockParams
uploader.fake_upload = True
uploader.force_wifi = True
uploader.allow_sleep = False
self.seg_num = random.randint(1, 300)
self.seg_format = "2019-04-18--12-52-54--{}"
self.seg_format2 = "2019-05-18--11-22-33--{}"
self.seg_dir = self.seg_format.format(self.seg_num)
def tearDown(self):
try:
shutil.rmtree(self.root)
except OSError as e:
if e.errno != errno.ENOENT:
raise
def make_file_with_data(self, f_dir, fn, size_mb=.1, lock=False):
file_path = os.path.join(self.root, f_dir, fn)
create_random_file(file_path, size_mb, lock)
return file_path