athena: unsuccessful upload handler requests. (#23620)

* Retry unsuccessful upload handler requests.

* test both cases

Co-authored-by: Willem Melching <willem.melching@gmail.com>
pull/23243/head
Ryan 2022-01-26 09:23:59 -05:00 committed by GitHub
parent a62e914090
commit 1d4191956b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 20 deletions

View File

@ -14,7 +14,7 @@ import time
import tempfile
from collections import namedtuple
from functools import partial
from typing import Any
from typing import Any, Dict
import requests
from jsonrpc import JSONRPCResponseManager, dispatcher
@ -55,7 +55,7 @@ log_recv_queue: Any = queue.Queue()
cancelled_uploads: Any = set()
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress'], defaults=(0, False, 0))
cur_upload_items = {}
cur_upload_items: Dict[int, Any] = {}
class UploadQueueCache():
@ -128,7 +128,26 @@ def jsonrpc_handler(end_event):
send_queue.put_nowait(json.dumps({"error": str(e)}))
def upload_handler(end_event):
def retry_upload(tid: int, end_event: threading.Event) -> None:
if cur_upload_items[tid].retry_count < MAX_RETRY_COUNT:
item = cur_upload_items[tid]
item = item._replace(
retry_count=item.retry_count + 1,
progress=0,
current=False
)
upload_queue.put_nowait(item)
UploadQueueCache.cache(upload_queue)
cur_upload_items[tid] = None
for _ in range(RETRY_DELAY):
time.sleep(1)
if end_event.is_set():
break
def upload_handler(end_event: threading.Event) -> None:
tid = threading.get_ident()
while not end_event.is_set():
@ -145,27 +164,15 @@ def upload_handler(end_event):
def cb(sz, cur):
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1)
_do_upload(cur_upload_items[tid], cb)
response = _do_upload(cur_upload_items[tid], cb)
if response.status_code not in (200, 201, 403, 412):
cloudlog.warning(f"athena.upload_handler.retry {response.status_code} {cur_upload_items[tid]}")
retry_upload(tid, end_event)
UploadQueueCache.cache(upload_queue)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e:
cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}")
if cur_upload_items[tid].retry_count < MAX_RETRY_COUNT:
item = cur_upload_items[tid]
item = item._replace(
retry_count=item.retry_count + 1,
progress=0,
current=False
)
upload_queue.put_nowait(item)
UploadQueueCache.cache(upload_queue)
cur_upload_items[tid] = None
for _ in range(RETRY_DELAY):
time.sleep(1)
if end_event.is_set():
break
retry_upload(tid, end_event)
except queue.Empty:
pass

View File

@ -166,6 +166,31 @@ class TestAthenadMethods(unittest.TestCase):
finally:
end_event.set()
@with_http_server
@mock.patch('requests.put')
def test_upload_handler_retry(self, host, mock_put):
for status, retry in ((500, True), (412, False)):
mock_put.return_value.status_code = status
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
athenad.upload_queue.put_nowait(item)
try:
self.wait_for_upload()
time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0)
finally:
end_event.set()
if retry:
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue"""
fn = os.path.join(athenad.ROOT, 'qlog.bz2')