openpilot/selfdrive/camerad/snapshot/snapshot.py

121 lines
3.5 KiB
Python
Raw Normal View History

2020-01-17 11:52:42 -07:00
#!/usr/bin/env python3
import os
import subprocess
import time
import numpy as np
2020-01-17 11:52:42 -07:00
from PIL import Image
from typing import List
import cereal.messaging as messaging
2020-01-17 11:52:42 -07:00
from common.params import Params
from common.realtime import DT_MDL
from common.transformations.camera import eon_f_frame_size, eon_d_frame_size, tici_f_frame_size
from selfdrive.hardware import TICI
from selfdrive.controls.lib.alertmanager import set_offroad_alert
from selfdrive.manager.process_config import managed_processes
LM_THRESH = 120 # defined in selfdrive/camerad/imgproc/utils.h
2020-01-17 11:52:42 -07:00
def jpeg_write(fn, dat):
img = Image.fromarray(dat)
img.save(fn, "JPEG")
def extract_image(dat, frame_sizes):
img = np.frombuffer(dat, dtype=np.uint8)
w, h = frame_sizes[len(img) // 3]
b = img[::3].reshape(h, w)
g = img[1::3].reshape(h, w)
r = img[2::3].reshape(h, w)
return np.dstack([r, g, b])
def rois_in_focus(lapres: List[float]) -> float:
sz = len(lapres)
return sum([1. / sz for sharpness in
lapres if sharpness >= LM_THRESH])
def get_snapshots(frame="roadCameraState", front_frame="driverCameraState", focus_perc_threshold=0.):
frame_sizes = [eon_f_frame_size, eon_d_frame_size, tici_f_frame_size]
frame_sizes = {w * h: (w, h) for (w, h) in frame_sizes}
sockets = []
if frame is not None:
sockets.append(frame)
if front_frame is not None:
sockets.append(front_frame)
# wait 4 sec from camerad startup for focus and exposure
sm = messaging.SubMaster(sockets)
while sm[sockets[0]].frameId < int(4. / DT_MDL):
sm.update()
start_t = time.monotonic()
while time.monotonic() - start_t < 10:
sm.update()
if min(sm.rcv_frame.values()) > 1 and rois_in_focus(sm[frame].sharpnessScore) >= focus_perc_threshold:
break
rear = extract_image(sm[frame].image, frame_sizes) if frame is not None else None
front = extract_image(sm[front_frame].image, frame_sizes) if front_frame is not None else None
return rear, front
2020-01-17 11:52:42 -07:00
def snapshot():
params = Params()
front_camera_allowed = params.get_bool("RecordFront")
2020-01-17 11:52:42 -07:00
if (not params.get_bool("IsOffroad")) or params.get_bool("IsTakingSnapshot"):
print("Already taking snapshot")
return None, None
params.put_bool("IsTakingSnapshot", True)
set_offroad_alert("Offroad_IsTakingSnapshot", True)
2020-01-17 11:52:42 -07:00
time.sleep(2.0) # Give thermald time to read the param, or if just started give camerad time to start
# Check if camerad is already started
try:
subprocess.check_call(["pgrep", "camerad"])
print("Camerad already running")
params.put_bool("IsTakingSnapshot", False)
2020-01-17 11:52:42 -07:00
params.delete("Offroad_IsTakingSnapshot")
return None, None
except subprocess.CalledProcessError:
pass
2020-01-17 11:52:42 -07:00
os.environ["SEND_ROAD"] = "1"
os.environ["SEND_WIDE_ROAD"] = "1"
if front_camera_allowed:
os.environ["SEND_DRIVER"] = "1"
2020-01-17 11:52:42 -07:00
managed_processes['camerad'].start()
frame = "wideRoadCameraState" if TICI else "roadCameraState"
front_frame = "driverCameraState" if front_camera_allowed else None
focus_perc_threshold = 0. if TICI else 10 / 12.
2020-01-17 11:52:42 -07:00
rear, front = get_snapshots(frame, front_frame, focus_perc_threshold)
managed_processes['camerad'].stop()
2020-01-17 11:52:42 -07:00
params.put_bool("IsTakingSnapshot", False)
set_offroad_alert("Offroad_IsTakingSnapshot", False)
if not front_camera_allowed:
front = None
return rear, front
2020-01-17 11:52:42 -07:00
if __name__ == "__main__":
pic, fpic = snapshot()
if pic is not None:
print(pic.shape)
jpeg_write("/tmp/back.jpg", pic)
if fpic is not None:
jpeg_write("/tmp/front.jpg", fpic)
else:
print("Error taking snapshot")