athenad: send log files one at a time (#21092)

pull/21016/head^2
Greg Hogan 2021-06-01 15:49:01 -07:00 committed by GitHub
parent 43fd045a3e
commit 1a7b0dfb3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 44 additions and 36 deletions

View File

@ -85,7 +85,7 @@ def jsonrpc_handler(end_event):
cloudlog.debug(f"athena.jsonrpc_handler.call_method {data}")
response = JSONRPCResponseManager.handle(data, dispatcher)
send_queue.put_nowait(response.json)
elif "result" in data and "id" in data:
elif "id" in data and ("result" in data or "error" in data):
log_recv_queue.put_nowait(data)
else:
raise Exception("not a valid request or response")
@ -311,47 +311,55 @@ def log_handler(end_event):
last_scan = 0
while not end_event.is_set():
try:
try:
result = json.loads(log_recv_queue.get(timeout=1))
log_success = result.get("success")
log_entry = result.get("id")
log_path = os.path.join(SWAGLOG_DIR, log_entry)
if log_entry and log_success:
try:
setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME)
except OSError:
pass # file could be deleted by log rotation
except queue.Empty:
pass
curr_scan = sec_since_boot()
if curr_scan - last_scan > 10:
log_files = get_logs_to_send_sorted()
last_scan = curr_scan
# never send last log file because it is the active log
# and only send one log file at a time (most recent first)
if not len(log_files) or not log_send_queue.empty():
continue
# send one log
curr_log = None
if len(log_files) > 0:
log_entry = log_files.pop()
cloudlog.debug(f"athena.log_handler.forward_request {log_entry}")
try:
curr_time = int(time.time())
log_path = os.path.join(SWAGLOG_DIR, log_entry)
setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder))
with open(log_path, "r") as f:
jsonrpc = {
"method": "forwardLogs",
"params": {
"logs": f.read()
},
"jsonrpc": "2.0",
"id": log_entry
}
log_send_queue.put_nowait(json.dumps(jsonrpc))
curr_log = log_entry
except OSError:
pass # file could be deleted by log rotation
log_entry = log_files.pop()
try:
curr_time = int(time.time())
log_path = os.path.join(SWAGLOG_DIR, log_entry)
setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder))
with open(log_path, "r") as f:
jsonrpc = {
"method": "forwardLogs",
"params": {
"logs": f.read()
},
"jsonrpc": "2.0",
"id": log_entry
}
log_send_queue.put_nowait(json.dumps(jsonrpc))
except OSError:
pass # file could be deleted by log rotation
# wait for response up to ~100 seconds
# always read queue at least once to process any old responses that arrive
for _ in range(100):
if end_event.is_set():
break
try:
log_resp = json.loads(log_recv_queue.get(timeout=1))
log_entry = log_resp.get("id")
log_success = "result" in log_resp and log_resp["result"].get("success")
cloudlog.debug(f"athena.log_handler.forward_response {log_entry} {log_success}")
if log_entry and log_success:
log_path = os.path.join(SWAGLOG_DIR, log_entry)
try:
setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME)
except OSError:
pass # file could be deleted by log rotation
if curr_log == log_entry:
break
except queue.Empty:
if curr_log is None:
break
except Exception:
cloudlog.exception("athena.log_handler.exception")