import os import subprocess from cffi import FFI from common.basedir import BASEDIR # Initialize visiontest. Ignore output. _visiond_dir = os.path.dirname(os.path.abspath(__file__)) _libvisiontest = "libvisiontest.so" try: # because this crashes sometimes when running pipeline subprocess.check_output(["make", "-C", _visiond_dir, "-f", os.path.join(_visiond_dir, "visiontest.mk"), _libvisiontest]) except Exception: pass class VisionTest(): """A version of the vision model that can be run on a desktop. WARNING: This class is not thread safe. VisionTest objects cannot be created or used on multiple threads simultaneously. """ ffi = FFI() ffi.cdef(""" typedef unsigned char uint8_t; struct VisionTest; typedef struct VisionTest VisionTest; VisionTest* visiontest_create(int temporal_model, int disable_model, int input_width, int input_height, int model_input_width, int model_input_height); void visiontest_destroy(VisionTest* visiontest); void visiontest_transform(VisionTest* vt, const uint8_t* yuv_data, uint8_t* out_y, uint8_t* out_u, uint8_t* out_v, const float* transform); """) clib = ffi.dlopen(os.path.join(_visiond_dir, _libvisiontest)) def __init__(self, input_size, model_input_size, model): """Create a wrapper around visiond for off-device python code. Inputs: input_size: The size of YUV images passed to transform. model_input_size: The size of YUV images passed to the model. model: The name of the model to use. "temporal", "yuv", or None to disable the model (used to disable OpenCL). """ self._input_size = input_size self._model_input_size = model_input_size if model is None: disable_model = 1 temporal_model = 0 elif model == "yuv": disable_model = 0 temporal_model = 0 elif model == "temporal": disable_model = 0 temporal_model = 1 else: raise ValueError(f"Bad model name: {model}") prevdir = os.getcwd() os.chdir(_visiond_dir) # tmp hack to find kernels os.environ['BASEDIR'] = BASEDIR self._visiontest_c = self.clib.visiontest_create( temporal_model, disable_model, self._input_size[0], self._input_size[1], self._model_input_size[0], self._model_input_size[1]) os.chdir(prevdir) @property def input_size(self): return self._input_size @property def model_input_size(self): return self._model_input_size def transform(self, yuv_data, transform): y_len = self.model_input_size[0] * self.model_input_size[1] t_y_ptr = bytearray(y_len) t_u_ptr = bytearray(y_len // 4) t_v_ptr = bytearray(y_len // 4) self.transform_output_buffer(yuv_data, t_y_ptr, t_u_ptr, t_v_ptr, transform) return t_y_ptr, t_u_ptr, t_v_ptr def transform_contiguous(self, yuv_data, transform): y_ol = self.model_input_size[0] * self.model_input_size[1] uv_ol = y_ol // 4 result = bytearray(y_ol * 3 // 2) result_view = memoryview(result) t_y_ptr = result_view[:y_ol] t_u_ptr = result_view[y_ol:y_ol + uv_ol] t_v_ptr = result_view[y_ol + uv_ol:] self.transform_output_buffer(yuv_data, t_y_ptr, t_u_ptr, t_v_ptr, transform) return result def transform_output_buffer(self, yuv_data, y_out, u_out, v_out, transform): assert len(yuv_data) == self.input_size[0] * self.input_size[1] * 3 / 2 cast = self.ffi.cast from_buffer = self.ffi.from_buffer yuv_ptr = cast("unsigned char*", from_buffer(yuv_data)) transform_ptr = self.ffi.new("float[]", transform) y_out_ptr = cast("unsigned char*", from_buffer(y_out)) u_out_ptr = cast("unsigned char*", from_buffer(u_out)) v_out_ptr = cast("unsigned char*", from_buffer(v_out)) self.clib.visiontest_transform(self._visiontest_c, yuv_ptr, y_out_ptr, u_out_ptr, v_out_ptr, transform_ptr) def close(self): self.clib.visiontest_destroy(self._visiontest_c) self._visiontest_c = None def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() if __name__ == "__main__": VisionTest((560, 304), (320, 160), "temporal")