diff --git a/common/tests/test_xattr.py b/common/tests/test_xattr.py new file mode 100644 index 000000000..d8f0197d3 --- /dev/null +++ b/common/tests/test_xattr.py @@ -0,0 +1,46 @@ +import os +import tempfile +import shutil +import unittest + +from common.xattr import getxattr, setxattr, listxattr, removexattr + +class TestParams(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.tmpfn = os.path.join(self.tmpdir, 'test.txt') + open(self.tmpfn, 'w').close() + #print("using", self.tmpfn) + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def test_getxattr_none(self): + a = getxattr(self.tmpfn, 'user.test') + assert a is None + + def test_listxattr_none(self): + l = listxattr(self.tmpfn) + assert l == [] + + def test_setxattr(self): + setxattr(self.tmpfn, 'user.test', b'123') + a = getxattr(self.tmpfn, 'user.test') + assert a == b'123' + + def test_listxattr(self): + setxattr(self.tmpfn, 'user.test1', b'123') + setxattr(self.tmpfn, 'user.test2', b'123') + l = listxattr(self.tmpfn) + assert l == ['user.test1', 'user.test2'] + + def test_removexattr(self): + setxattr(self.tmpfn, 'user.test', b'123') + a = getxattr(self.tmpfn, 'user.test') + assert a == b'123' + removexattr(self.tmpfn, 'user.test') + a = getxattr(self.tmpfn, 'user.test') + assert a is None + +if __name__ == "__main__": + unittest.main() diff --git a/common/xattr.py b/common/xattr.py new file mode 100644 index 000000000..fa61b9e0f --- /dev/null +++ b/common/xattr.py @@ -0,0 +1,45 @@ +import os +from cffi import FFI + +# Workaround for the EON/termux build of Python having os.*xattr removed. +ffi = FFI() +ffi.cdef(""" +int setxattr(const char *path, const char *name, const void *value, size_t size, int flags); +ssize_t getxattr(const char *path, const char *name, void *value, size_t size); +ssize_t listxattr(const char *path, char *list, size_t size); +int removexattr(const char *path, const char *name); +""") +libc = ffi.dlopen(None) + +def setxattr(path, name, value, flags=0): + path = path.encode() + name = name.encode() + if libc.setxattr(path, name, value, len(value), flags) == -1: + raise OSError(ffi.errno, f"{os.strerror(ffi.errno)}: setxattr({path}, {name}, {value}, {flags})") + +def getxattr(path, name, size=128): + path = path.encode() + name = name.encode() + value = ffi.new(f"char[{size}]") + l = libc.getxattr(path, name, value, size) + if l == -1: + # errno 61 means attribute hasn't been set + if ffi.errno == 61: + return None + raise OSError(ffi.errno, f"{os.strerror(ffi.errno)}: getxattr({path}, {name}, {size})") + return ffi.buffer(value)[:l] + +def listxattr(path, size=128): + path = path.encode() + attrs = ffi.new(f"char[{size}]") + l = libc.listxattr(path, attrs, size) + if l == -1: + raise OSError(ffi.errno, f"{os.strerror(ffi.errno)}: listxattr({path}, {size})") + # attrs is b'\0' delimited values (so chop off trailing empty item) + return [a.decode() for a in ffi.buffer(attrs)[:l].split(b"\0")[0:-1]] + +def removexattr(path, name): + path = path.encode() + name = name.encode() + if libc.removexattr(path, name) == -1: + raise OSError(ffi.errno, f"{os.strerror(ffi.errno)}: removexattr({path}, {name})") diff --git a/release/files_common b/release/files_common index 4cda2ac6d..60923e724 100644 --- a/release/files_common +++ b/release/files_common @@ -24,6 +24,7 @@ common/file_helpers.py common/logging_extra.py common/numpy_fast.py common/params.py +common/xattr.py common/profiler.py common/testing.py common/basedir.py diff --git a/selfdrive/loggerd/tests/test_uploader.py b/selfdrive/loggerd/tests/test_uploader.py index ed9a97222..c37816473 100644 --- a/selfdrive/loggerd/tests/test_uploader.py +++ b/selfdrive/loggerd/tests/test_uploader.py @@ -1,13 +1,13 @@ -import os import time import threading +import unittest import logging import json from selfdrive.swaglog import cloudlog import selfdrive.loggerd.uploader as uploader -from common.timeout import Timeout +from common.xattr import getxattr from selfdrive.loggerd.tests.loggerd_tests_common import UploaderTestCase @@ -69,14 +69,14 @@ class TestUploader(UploaderTestCase): f_paths = self.gen_files(lock=False) self.start_thread() - - with Timeout(5, "Timeout waiting for file to be uploaded"): - while len(os.listdir(self.root)): - time.sleep(0.01) + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) self.join_thread() + self.assertFalse(len(log_handler.upload_order) < len(f_paths), "Some files failed to upload") + self.assertFalse(len(log_handler.upload_order) > len(f_paths), "Some files were uploaded twice") for f_path in f_paths: - self.assertFalse(os.path.exists(f_path), "All files not uploaded") + self.assertTrue(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "All files not uploaded") exp_order = self.gen_order([self.seg_num], []) self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") @@ -92,15 +92,14 @@ class TestUploader(UploaderTestCase): f_paths += self.gen_files() self.start_thread() - - with Timeout(5, "Timeout waiting for file to be upload"): - while len(os.listdir(self.root)): - time.sleep(0.01) - + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) self.join_thread() + self.assertFalse(len(log_handler.upload_order) < len(f_paths), "Some files failed to upload") + self.assertFalse(len(log_handler.upload_order) > len(f_paths), "Some files were uploaded twice") for f_path in f_paths: - self.assertFalse(os.path.exists(f_path), "All files not uploaded") + self.assertTrue(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "All files not uploaded") exp_order = self.gen_order(seg1_nums, seg2_nums) self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") @@ -113,4 +112,8 @@ class TestUploader(UploaderTestCase): self.join_thread() for f_path in f_paths: - self.assertTrue(os.path.exists(f_path), "File upload when locked") + self.assertFalse(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "File upload when locked") + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/loggerd/uploader.py b/selfdrive/loggerd/uploader.py index dba2287b8..db1003a33 100644 --- a/selfdrive/loggerd/uploader.py +++ b/selfdrive/loggerd/uploader.py @@ -17,6 +17,10 @@ from selfdrive.loggerd.config import ROOT from common import android from common.params import Params from common.api import Api +from common.xattr import getxattr, setxattr + +UPLOAD_ATTR_NAME = 'user.upload' +UPLOAD_ATTR_VALUE = b'1' fake_upload = os.getenv("FAKEUPLOAD") is not None @@ -72,7 +76,7 @@ def is_on_wifi(): if result is None: return True return 'WIFI' in result - except Exception: + except BaseException: cloudlog.exception("is_on_wifi failed") return False @@ -86,7 +90,7 @@ def is_on_hotspot(): is_entune = result.startswith('10.0.2.') return (is_android or is_ios or is_entune) - except: + except BaseException: return False class Uploader(): @@ -103,16 +107,6 @@ class Uploader(): self.immediate_priority = {"qlog.bz2": 0, "qcamera.ts": 1} self.high_priority = {"rlog.bz2": 0, "fcamera.hevc": 1, "dcamera.hevc": 2} - def clean_dirs(self): - try: - for logname in os.listdir(self.root): - path = os.path.join(self.root, logname) - # remove empty directories - if not os.listdir(path): - os.rmdir(path) - except OSError: - cloudlog.exception("clean_dirs failed") - def get_upload_sort(self, name): if name in self.immediate_priority: return self.immediate_priority[name] @@ -135,6 +129,14 @@ class Uploader(): for name in sorted(names, key=self.get_upload_sort): key = os.path.join(logname, name) fn = os.path.join(path, name) + # skip files already uploaded + try: + is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) + except OSError: + cloudlog.event("uploader_getxattr_failed", exc=self.last_exc, key=key, fn=fn) + is_uploaded = True # deleter could have deleted + if is_uploaded: + continue yield (name, key, fn) @@ -175,7 +177,7 @@ class Uploader(): else: with open(fn, "rb") as f: self.last_resp = requests.put(url, data=f, headers=headers, timeout=10) - except Exception as e: + except BaseException as e: self.last_exc = (e, traceback.format_exc()) raise @@ -185,7 +187,7 @@ class Uploader(): try: self.do_upload(key, fn) - except Exception: + except BaseException: pass return self.last_resp @@ -202,28 +204,27 @@ class Uploader(): cloudlog.info("checking %r with size %r", key, sz) if sz == 0: - # can't upload files of 0 size - os.unlink(fn) # delete the file + try: + # tag files of 0 size as uploaded + setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) + except OSError: + cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz) success = True else: cloudlog.info("uploading %r", fn) stat = self.normal_upload(key, fn) if stat is not None and stat.status_code in (200, 201): cloudlog.event("upload_success", key=key, fn=fn, sz=sz) - - # delete the file try: - os.unlink(fn) + # tag file as uploaded + setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) except OSError: - cloudlog.event("delete_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz) - + cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz) success = True else: cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz) success = False - self.clean_dirs() - return success def uploader_fn(exit_event):