athenad: retry failed and timed out uploads (#21745)

* retry failed uploads

* test cleanup

* update comment

* also catch SSL error

* use defaults

* sleep in chunks
pull/21781/head
Willem Melching 2021-07-29 11:13:59 +02:00 committed by GitHub
parent ab63d65ae5
commit d5b6746ac5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 12 deletions

View File

@ -39,6 +39,9 @@ LOG_ATTR_NAME = 'user.upload'
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
RECONNECT_TIMEOUT_S = 70
RETRY_DELAY = 10 # seconds
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
dispatcher["echo"] = lambda s: s
recv_queue: Any = queue.Queue()
send_queue: Any = queue.Queue()
@ -46,7 +49,7 @@ upload_queue: Any = queue.Queue()
log_send_queue: Any = queue.Queue()
log_recv_queue: Any = queue.Queue()
cancelled_uploads: Any = set()
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id'])
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count'], defaults=(0,))
def handle_long_poll(ws):
@ -103,7 +106,20 @@ def upload_handler(end_event):
if item.id in cancelled_uploads:
cancelled_uploads.remove(item.id)
continue
_do_upload(item)
try:
_do_upload(item)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e:
cloudlog.warning(f"athena.upload_handler.retry {e} {item}")
if item.retry_count < MAX_RETRY_COUNT:
item = item._replace(retry_count=item.retry_count + 1)
upload_queue.put_nowait(item)
for _ in range(RETRY_DELAY):
time.sleep(1)
if end_event.is_set():
break
except queue.Empty:
pass
except Exception:

View File

@ -16,7 +16,7 @@ from websocket._exceptions import WebSocketConnectionClosedException
from selfdrive import swaglog
from selfdrive.athena import athenad
from selfdrive.athena.athenad import dispatcher
from selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher
from selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server
from cereal import messaging
@ -30,6 +30,12 @@ class TestAthenadMethods(unittest.TestCase):
athenad.Api = MockApi
athenad.LOCAL_PORT_WHITELIST = set([cls.SOCKET_PORT])
def wait_for_upload(self):
now = time.time()
while time.time() - now < 5:
if athenad.upload_queue.qsize() == 0:
break
def test_echo(self):
assert dispatcher["echo"]("bob") == "bob"
@ -105,17 +111,48 @@ class TestAthenadMethods(unittest.TestCase):
athenad.upload_queue.put_nowait(item)
try:
time.sleep(1) # give it time to process to prevent shutdown before upload completes
now = time.time()
while time.time() - now < 5:
if athenad.upload_queue.qsize() == 0:
break
self.wait_for_upload()
time.sleep(0.1)
# TODO: verify that upload actually succeeded
self.assertEqual(athenad.upload_queue.qsize(), 0)
finally:
end_event.set()
athenad.upload_queue = queue.Queue()
os.unlink(fn)
def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue"""
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
athenad.upload_queue.put_nowait(item_no_retry)
self.wait_for_upload()
time.sleep(0.1)
# Check that upload with retry count exceeded is not put back
self.assertEqual(athenad.upload_queue.qsize(), 0)
athenad.upload_queue.put_nowait(item)
self.wait_for_upload()
time.sleep(0.1)
# Check that upload item was put back in the queue with incremented retry count
self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
finally:
end_event.set()
athenad.upload_queue = queue.Queue()
os.unlink(fn)
def test_cancelUpload(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id')
athenad.upload_queue.put_nowait(item)
@ -127,10 +164,9 @@ class TestAthenadMethods(unittest.TestCase):
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
now = time.time()
while time.time() - now < 5:
if athenad.upload_queue.qsize() == 0 and len(athenad.cancelled_uploads) == 0:
break
self.wait_for_upload()
time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 0)
self.assertEqual(len(athenad.cancelled_uploads), 0)
finally: