uploader: do not delete files after uploading (#1253)

* cffi xattr function wrapper

* xattr wrapper error handling

* xattr tests

* use xattr for tracking files uploaded

* uploader xattr exception handling

* update uploader tests

* remove unused import

* update release build

* xattrs.py -> xattr.py
albatross
Greg Hogan 2020-03-19 18:09:26 -07:00 committed by GitHub
parent 6f7320a046
commit f21d0f325e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 133 additions and 37 deletions

View File

@ -0,0 +1,46 @@
import os
import tempfile
import shutil
import unittest
from common.xattr import getxattr, setxattr, listxattr, removexattr
class TestParams(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.tmpfn = os.path.join(self.tmpdir, 'test.txt')
open(self.tmpfn, 'w').close()
#print("using", self.tmpfn)
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_getxattr_none(self):
a = getxattr(self.tmpfn, 'user.test')
assert a is None
def test_listxattr_none(self):
l = listxattr(self.tmpfn)
assert l == []
def test_setxattr(self):
setxattr(self.tmpfn, 'user.test', b'123')
a = getxattr(self.tmpfn, 'user.test')
assert a == b'123'
def test_listxattr(self):
setxattr(self.tmpfn, 'user.test1', b'123')
setxattr(self.tmpfn, 'user.test2', b'123')
l = listxattr(self.tmpfn)
assert l == ['user.test1', 'user.test2']
def test_removexattr(self):
setxattr(self.tmpfn, 'user.test', b'123')
a = getxattr(self.tmpfn, 'user.test')
assert a == b'123'
removexattr(self.tmpfn, 'user.test')
a = getxattr(self.tmpfn, 'user.test')
assert a is None
if __name__ == "__main__":
unittest.main()

45
common/xattr.py 100644
View File

@ -0,0 +1,45 @@
import os
from cffi import FFI
# Workaround for the EON/termux build of Python having os.*xattr removed.
ffi = FFI()
ffi.cdef("""
int setxattr(const char *path, const char *name, const void *value, size_t size, int flags);
ssize_t getxattr(const char *path, const char *name, void *value, size_t size);
ssize_t listxattr(const char *path, char *list, size_t size);
int removexattr(const char *path, const char *name);
""")
libc = ffi.dlopen(None)
def setxattr(path, name, value, flags=0):
path = path.encode()
name = name.encode()
if libc.setxattr(path, name, value, len(value), flags) == -1:
raise OSError(ffi.errno, f"{os.strerror(ffi.errno)}: setxattr({path}, {name}, {value}, {flags})")
def getxattr(path, name, size=128):
path = path.encode()
name = name.encode()
value = ffi.new(f"char[{size}]")
l = libc.getxattr(path, name, value, size)
if l == -1:
# errno 61 means attribute hasn't been set
if ffi.errno == 61:
return None
raise OSError(ffi.errno, f"{os.strerror(ffi.errno)}: getxattr({path}, {name}, {size})")
return ffi.buffer(value)[:l]
def listxattr(path, size=128):
path = path.encode()
attrs = ffi.new(f"char[{size}]")
l = libc.listxattr(path, attrs, size)
if l == -1:
raise OSError(ffi.errno, f"{os.strerror(ffi.errno)}: listxattr({path}, {size})")
# attrs is b'\0' delimited values (so chop off trailing empty item)
return [a.decode() for a in ffi.buffer(attrs)[:l].split(b"\0")[0:-1]]
def removexattr(path, name):
path = path.encode()
name = name.encode()
if libc.removexattr(path, name) == -1:
raise OSError(ffi.errno, f"{os.strerror(ffi.errno)}: removexattr({path}, {name})")

View File

@ -24,6 +24,7 @@ common/file_helpers.py
common/logging_extra.py common/logging_extra.py
common/numpy_fast.py common/numpy_fast.py
common/params.py common/params.py
common/xattr.py
common/profiler.py common/profiler.py
common/testing.py common/testing.py
common/basedir.py common/basedir.py

View File

@ -1,13 +1,13 @@
import os
import time import time
import threading import threading
import unittest
import logging import logging
import json import json
from selfdrive.swaglog import cloudlog from selfdrive.swaglog import cloudlog
import selfdrive.loggerd.uploader as uploader import selfdrive.loggerd.uploader as uploader
from common.timeout import Timeout from common.xattr import getxattr
from selfdrive.loggerd.tests.loggerd_tests_common import UploaderTestCase from selfdrive.loggerd.tests.loggerd_tests_common import UploaderTestCase
@ -69,14 +69,14 @@ class TestUploader(UploaderTestCase):
f_paths = self.gen_files(lock=False) f_paths = self.gen_files(lock=False)
self.start_thread() self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
with Timeout(5, "Timeout waiting for file to be uploaded"): time.sleep(5)
while len(os.listdir(self.root)):
time.sleep(0.01)
self.join_thread() self.join_thread()
self.assertFalse(len(log_handler.upload_order) < len(f_paths), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(f_paths), "Some files were uploaded twice")
for f_path in f_paths: for f_path in f_paths:
self.assertFalse(os.path.exists(f_path), "All files not uploaded") self.assertTrue(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
exp_order = self.gen_order([self.seg_num], []) exp_order = self.gen_order([self.seg_num], [])
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
@ -92,15 +92,14 @@ class TestUploader(UploaderTestCase):
f_paths += self.gen_files() f_paths += self.gen_files()
self.start_thread() self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
with Timeout(5, "Timeout waiting for file to be upload"): time.sleep(5)
while len(os.listdir(self.root)):
time.sleep(0.01)
self.join_thread() self.join_thread()
self.assertFalse(len(log_handler.upload_order) < len(f_paths), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(f_paths), "Some files were uploaded twice")
for f_path in f_paths: for f_path in f_paths:
self.assertFalse(os.path.exists(f_path), "All files not uploaded") self.assertTrue(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
exp_order = self.gen_order(seg1_nums, seg2_nums) exp_order = self.gen_order(seg1_nums, seg2_nums)
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
@ -113,4 +112,8 @@ class TestUploader(UploaderTestCase):
self.join_thread() self.join_thread()
for f_path in f_paths: for f_path in f_paths:
self.assertTrue(os.path.exists(f_path), "File upload when locked") self.assertFalse(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "File upload when locked")
if __name__ == "__main__":
unittest.main()

View File

@ -17,6 +17,10 @@ from selfdrive.loggerd.config import ROOT
from common import android from common import android
from common.params import Params from common.params import Params
from common.api import Api from common.api import Api
from common.xattr import getxattr, setxattr
UPLOAD_ATTR_NAME = 'user.upload'
UPLOAD_ATTR_VALUE = b'1'
fake_upload = os.getenv("FAKEUPLOAD") is not None fake_upload = os.getenv("FAKEUPLOAD") is not None
@ -72,7 +76,7 @@ def is_on_wifi():
if result is None: if result is None:
return True return True
return 'WIFI' in result return 'WIFI' in result
except Exception: except BaseException:
cloudlog.exception("is_on_wifi failed") cloudlog.exception("is_on_wifi failed")
return False return False
@ -86,7 +90,7 @@ def is_on_hotspot():
is_entune = result.startswith('10.0.2.') is_entune = result.startswith('10.0.2.')
return (is_android or is_ios or is_entune) return (is_android or is_ios or is_entune)
except: except BaseException:
return False return False
class Uploader(): class Uploader():
@ -103,16 +107,6 @@ class Uploader():
self.immediate_priority = {"qlog.bz2": 0, "qcamera.ts": 1} self.immediate_priority = {"qlog.bz2": 0, "qcamera.ts": 1}
self.high_priority = {"rlog.bz2": 0, "fcamera.hevc": 1, "dcamera.hevc": 2} self.high_priority = {"rlog.bz2": 0, "fcamera.hevc": 1, "dcamera.hevc": 2}
def clean_dirs(self):
try:
for logname in os.listdir(self.root):
path = os.path.join(self.root, logname)
# remove empty directories
if not os.listdir(path):
os.rmdir(path)
except OSError:
cloudlog.exception("clean_dirs failed")
def get_upload_sort(self, name): def get_upload_sort(self, name):
if name in self.immediate_priority: if name in self.immediate_priority:
return self.immediate_priority[name] return self.immediate_priority[name]
@ -135,6 +129,14 @@ class Uploader():
for name in sorted(names, key=self.get_upload_sort): for name in sorted(names, key=self.get_upload_sort):
key = os.path.join(logname, name) key = os.path.join(logname, name)
fn = os.path.join(path, name) fn = os.path.join(path, name)
# skip files already uploaded
try:
is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME)
except OSError:
cloudlog.event("uploader_getxattr_failed", exc=self.last_exc, key=key, fn=fn)
is_uploaded = True # deleter could have deleted
if is_uploaded:
continue
yield (name, key, fn) yield (name, key, fn)
@ -175,7 +177,7 @@ class Uploader():
else: else:
with open(fn, "rb") as f: with open(fn, "rb") as f:
self.last_resp = requests.put(url, data=f, headers=headers, timeout=10) self.last_resp = requests.put(url, data=f, headers=headers, timeout=10)
except Exception as e: except BaseException as e:
self.last_exc = (e, traceback.format_exc()) self.last_exc = (e, traceback.format_exc())
raise raise
@ -185,7 +187,7 @@ class Uploader():
try: try:
self.do_upload(key, fn) self.do_upload(key, fn)
except Exception: except BaseException:
pass pass
return self.last_resp return self.last_resp
@ -202,28 +204,27 @@ class Uploader():
cloudlog.info("checking %r with size %r", key, sz) cloudlog.info("checking %r with size %r", key, sz)
if sz == 0: if sz == 0:
# can't upload files of 0 size try:
os.unlink(fn) # delete the file # tag files of 0 size as uploaded
setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE)
except OSError:
cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz)
success = True success = True
else: else:
cloudlog.info("uploading %r", fn) cloudlog.info("uploading %r", fn)
stat = self.normal_upload(key, fn) stat = self.normal_upload(key, fn)
if stat is not None and stat.status_code in (200, 201): if stat is not None and stat.status_code in (200, 201):
cloudlog.event("upload_success", key=key, fn=fn, sz=sz) cloudlog.event("upload_success", key=key, fn=fn, sz=sz)
# delete the file
try: try:
os.unlink(fn) # tag file as uploaded
setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE)
except OSError: except OSError:
cloudlog.event("delete_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz) cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz)
success = True success = True
else: else:
cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz) cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz)
success = False success = False
self.clean_dirs()
return success return success
def uploader_fn(exit_event): def uploader_fn(exit_event):