openpilot/tools/lib/framereader.py

1159 lines
33 KiB
Python

import os
import sys
import glob
import json
import time
import struct
import tempfile
import threading
import xml.etree.ElementTree as ET
import numpy as np
if sys.version_info >= (3,0):
import queue
import pickle
from io import BytesIO as StringIO
else:
import Queue as queue
import cPickle as pickle
from cStringIO import StringIO
import subprocess
from aenum import Enum
from lru import LRU
from functools import wraps
from tools.lib.cache import cache_path_for_file_path
from tools.lib.exceptions import DataUnreadableError
try:
from xx.chffr.lib.filereader import FileReader
except ImportError:
from tools.lib.filereader import FileReader
from tools.lib.file_helpers import atomic_write_in_dir
from tools.lib.mkvparse import mkvindex
from tools.lib.route import Route
H264_SLICE_P = 0
H264_SLICE_B = 1
H264_SLICE_I = 2
HEVC_SLICE_B = 0
HEVC_SLICE_P = 1
HEVC_SLICE_I = 2
SLICE_I = 2 # hevc and h264 are the same :)
class FrameType(Enum):
raw = 1
h265_stream = 2
h264_mp4 = 3
h264_pstream = 4
ffv1_mkv = 5
ffvhuff_mkv = 6
def fingerprint_video(fn):
with FileReader(fn) as f:
header = f.read(4)
if len(header) == 0:
raise DataUnreadableError("%s is empty" % fn)
elif header == b"\x00\xc0\x12\x00":
return FrameType.raw
elif header == b"\x00\x00\x00\x01":
if 'hevc' in fn:
return FrameType.h265_stream
elif os.path.basename(fn) in ("camera", "acamera"):
return FrameType.h264_pstream
else:
raise NotImplementedError(fn)
elif header == b"\x00\x00\x00\x1c":
return FrameType.h264_mp4
elif header == b"\x1a\x45\xdf\xa3":
return FrameType.ffv1_mkv
else:
raise NotImplementedError(fn)
def ffprobe(fn, fmt=None):
cmd = ["ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format", "-show_streams"]
if fmt:
cmd += ["-format", fmt]
cmd += [fn]
try:
ffprobe_output = subprocess.check_output(cmd)
except subprocess.CalledProcessError as e:
raise DataUnreadableError(fn)
return json.loads(ffprobe_output)
def vidindex(fn, typ):
vidindex_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "vidindex")
vidindex = os.path.join(vidindex_dir, "vidindex")
subprocess.check_call(["make"], cwd=vidindex_dir, stdout=open("/dev/null","w"))
with tempfile.NamedTemporaryFile() as prefix_f, \
tempfile.NamedTemporaryFile() as index_f:
try:
subprocess.check_call([vidindex, typ, fn, prefix_f.name, index_f.name])
except subprocess.CalledProcessError as e:
raise DataUnreadableError("vidindex failed on file %s" % fn)
with open(index_f.name, "rb") as f:
index = f.read()
with open(prefix_f.name, "rb") as f:
prefix = f.read()
index = np.frombuffer(index, np.uint32).reshape(-1, 2)
assert index[-1, 0] == 0xFFFFFFFF
assert index[-1, 1] == os.path.getsize(fn)
return index, prefix
def cache_fn(func):
@wraps(func)
def cache_inner(fn, *args, **kwargs):
cache_prefix = kwargs.pop('cache_prefix', None)
cache_path = cache_path_for_file_path(fn, cache_prefix)
if cache_path and os.path.exists(cache_path):
with open(cache_path, "rb") as cache_file:
cache_value = pickle.load(cache_file)
else:
cache_value = func(fn, *args, **kwargs)
if cache_path:
with atomic_write_in_dir(cache_path, mode="wb", overwrite=True) as cache_file:
pickle.dump(cache_value, cache_file, -1)
return cache_value
return cache_inner
@cache_fn
def index_stream(fn, typ):
assert typ in ("hevc", "h264")
with FileReader(fn) as f:
assert os.path.exists(f.name), fn
index, prefix = vidindex(f.name, typ)
probe = ffprobe(f.name, typ)
return {
'index': index,
'global_prefix': prefix,
'probe': probe
}
@cache_fn
def index_mp4(fn):
with FileReader(fn) as f:
return vidindex_mp4(f.name)
@cache_fn
def index_mkv(fn):
with FileReader(fn) as f:
probe = ffprobe(f.name, "matroska")
with open(f.name, "rb") as d_f:
config_record, index = mkvindex.mkvindex(d_f)
return {
'probe': probe,
'config_record': config_record,
'index': index
}
def index_videos(camera_paths, cache_prefix=None):
"""Requires that paths in camera_paths are contiguous and of the same type."""
if len(camera_paths) < 1:
raise ValueError("must provide at least one video to index")
frame_type = fingerprint_video(camera_paths[0])
if frame_type == FrameType.h264_pstream:
index_pstream(camera_paths, "h264", cache_prefix)
else:
for fn in camera_paths:
index_video(fn, frame_type, cache_prefix)
def index_video(fn, frame_type=None, cache_prefix=None):
cache_path = cache_path_for_file_path(fn, cache_prefix)
if os.path.exists(cache_path):
return
if frame_type is None:
frame_type = fingerprint_video(fn[0])
if frame_type == FrameType.h264_pstream:
#hack: try to index the whole route now
route = Route.from_file_path(fn)
camera_paths = route.camera_paths()
if fn not in camera_paths:
raise DataUnreadableError("Not a contiguous route camera file: {}".format(fn))
print("no pstream cache for %s, indexing route %s now" % (fn, route.name))
index_pstream(route.camera_paths(), "h264", cache_prefix)
elif frame_type == FrameType.h265_stream:
index_stream(fn, "hevc", cache_prefix=cache_prefix)
elif frame_type == FrameType.h264_mp4:
index_mp4(fn, cache_prefix=cache_prefix)
def get_video_index(fn, frame_type, cache_prefix=None):
cache_path = cache_path_for_file_path(fn, cache_prefix)
if not os.path.exists(cache_path):
index_video(fn, frame_type, cache_prefix)
if not os.path.exists(cache_path):
return None
with open(cache_path, "rb") as cache_file:
return pickle.load(cache_file)
def pstream_predecompress(fns, probe, indexes, global_prefix, cache_prefix, multithreaded=False):
assert len(fns) == len(indexes)
out_fns = [cache_path_for_file_path(fn, cache_prefix, extension=".predecom.mkv") for fn in fns]
out_exists = map(os.path.exists, out_fns)
if all(out_exists):
return
w = probe['streams'][0]['width']
h = probe['streams'][0]['height']
frame_size = w*h*3/2 # yuv420p
decompress_proc = subprocess.Popen(
["ffmpeg",
"-threads", "0" if multithreaded else "1",
"-vsync", "0",
"-f", "h264",
"-i", "pipe:0",
"-threads", "0" if multithreaded else "1",
"-f", "rawvideo",
"-pix_fmt", "yuv420p",
"pipe:1"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=open("/dev/null", "wb"))
def write_thread():
for fn in fns:
with FileReader(fn) as f:
decompress_proc.stdin.write(f.read())
decompress_proc.stdin.close()
def read_frame():
frame = None
try:
frame = decompress_proc.stdout.read(frame_size)
except (IOError, ValueError):
pass
if frame is None or frame == "" or len(frame) != frame_size:
raise DataUnreadableError("pre-decompression failed for %s" % fn)
return frame
t = threading.Thread(target=write_thread)
t.daemon = True
t.start()
try:
for fn, out_fn, out_exist, index in zip(fns, out_fns, out_exists, indexes):
if out_exist:
for fi in range(index.shape[0]-1):
read_frame()
continue
with atomic_write_in_dir(out_fn, mode="w+b", overwrite=True) as out_tmp:
compress_proc = subprocess.Popen(
["ffmpeg",
"-threads", "0" if multithreaded else "1",
"-y",
"-vsync", "0",
"-f", "rawvideo",
"-pix_fmt", "yuv420p",
"-s", "%dx%d" % (w, h),
"-i", "pipe:0",
"-threads", "0" if multithreaded else "1",
"-f", "matroska",
"-vcodec", "ffv1",
"-g", "0",
out_tmp.name],
stdin=subprocess.PIPE, stderr=open("/dev/null", "wb"))
try:
for fi in range(index.shape[0]-1):
frame = read_frame()
compress_proc.stdin.write(frame)
compress_proc.stdin.close()
except:
compress_proc.kill()
raise
assert compress_proc.wait() == 0
cache_path = cache_path_for_file_path(fn, cache_prefix)
with atomic_write_in_dir(cache_path, mode="wb", overwrite=True) as cache_file:
pickle.dump({
'predecom': os.path.basename(out_fn),
'index': index,
'probe': probe,
'global_prefix': global_prefix,
}, cache_file, -1)
except:
decompress_proc.kill()
raise
finally:
t.join()
rc = decompress_proc.wait()
if rc != 0:
raise DataUnreadableError(fns[0])
def index_pstream(fns, typ, cache_prefix=None):
if typ != "h264":
raise NotImplementedError(typ)
if not fns:
raise DataUnreadableError("chffr h264 requires contiguous files")
out_fns = [cache_path_for_file_path(fn, cache_prefix) for fn in fns]
out_exists = map(os.path.exists, out_fns)
if all(out_exists): return
# load existing index files to avoid re-doing work
existing_indexes = []
for out_fn, exists in zip(out_fns, out_exists):
existing = None
if exists:
with open(out_fn, "rb") as cache_file:
existing = pickle.load(cache_file)
existing_indexes.append(existing)
# probe the first file
if existing_indexes[0]:
probe = existing_indexes[0]['probe']
else:
with FileReader(fns[0]) as f:
probe = ffprobe(f.name, typ)
global_prefix = None
# get the video index of all the segments in this stream
indexes = []
for i, fn in enumerate(fns):
if existing_indexes[i]:
index = existing_indexes[i]['index']
prefix = existing_indexes[i]['global_prefix']
else:
with FileReader(fn) as f:
index, prefix = vidindex(f.name, typ)
if i == 0:
# assert prefix
if not prefix:
raise DataUnreadableError("vidindex failed for %s" % fn)
global_prefix = prefix
indexes.append(index)
assert global_prefix
if np.sum(indexes[0][:, 0] == H264_SLICE_I) <= 1:
print("pstream %s is unseekable. pre-decompressing all the segments..." % (fns[0]))
pstream_predecompress(fns, probe, indexes, global_prefix, cache_prefix)
return
# generate what's required to make each segment self-contained
# (the partial GOP from the end of each segments are put asside to add
# to the start of the following segment)
prefix_data = ["" for _ in fns]
prefix_index = [[] for _ in fns]
for i in range(len(fns)-1):
if indexes[i+1][0, 0] == H264_SLICE_I and indexes[i+1][0, 1] <= 1:
# next file happens to start with a i-frame, dont need use this file's end
continue
index = indexes[i]
if i == 0 and np.sum(index[:, 0] == H264_SLICE_I) <= 1:
raise NotImplementedError("No I-frames in pstream.")
# find the last GOP in the index
frame_b = len(index)-1
while frame_b > 0 and index[frame_b, 0] != H264_SLICE_I:
frame_b -= 1
assert frame_b >= 0
assert index[frame_b, 0] == H264_SLICE_I
end_len = len(index)-frame_b
with FileReader(fns[i]) as vid:
vid.seek(index[frame_b, 1])
end_data = vid.read()
prefix_data[i+1] = end_data
prefix_index[i+1] = index[frame_b:-1]
# indexes[i] = index[:frame_b]
for i, fn in enumerate(fns):
cache_path = out_fns[i]
if os.path.exists(cache_path):
continue
segment_index = {
'index': indexes[i],
'global_prefix': global_prefix,
'probe': probe,
'prefix_frame_data': prefix_data[i], # data to prefix the first GOP with
'num_prefix_frames': len(prefix_index[i]), # number of frames to skip in the first GOP
}
with atomic_write_in_dir(cache_path, mode="wb", overwrite=True) as cache_file:
pickle.dump(segment_index, cache_file, -1)
def read_file_check_size(f, sz, cookie):
buff = bytearray(sz)
bytes_read = f.readinto(buff)
assert bytes_read == sz, (bytes_read, sz)
return buff
import signal
import ctypes
def _set_pdeathsig(sig=signal.SIGTERM):
def f():
libc = ctypes.CDLL('libc.so.6')
return libc.prctl(1, sig)
return f
def vidindex_mp4(fn):
try:
xmls = subprocess.check_output(["MP4Box", fn, "-diso", "-out", "/dev/stdout"])
except subprocess.CalledProcessError as e:
raise DataUnreadableError(fn)
tree = ET.fromstring(xmls)
def parse_content(s):
assert s.startswith("data:application/octet-string,")
return s[len("data:application/octet-string,"):].decode("hex")
avc_element = tree.find(".//AVCSampleEntryBox")
width = int(avc_element.attrib['Width'])
height = int(avc_element.attrib['Height'])
sps_element = avc_element.find(".//AVCDecoderConfigurationRecord/SequenceParameterSet")
pps_element = avc_element.find(".//AVCDecoderConfigurationRecord/PictureParameterSet")
sps = parse_content(sps_element.attrib['content'])
pps = parse_content(pps_element.attrib['content'])
media_header = tree.find("MovieBox/TrackBox/MediaBox/MediaHeaderBox")
time_scale = int(media_header.attrib['TimeScale'])
sample_sizes = [
int(entry.attrib['Size']) for entry in tree.findall(
"MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/SampleSizeBox/SampleSizeEntry")
]
sample_dependency = [
entry.attrib['dependsOnOther'] == "yes" for entry in tree.findall(
"MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/SampleDependencyTypeBox/SampleDependencyEntry")
]
assert len(sample_sizes) == len(sample_dependency)
chunk_offsets = [
int(entry.attrib['offset']) for entry in tree.findall(
"MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/ChunkOffsetBox/ChunkEntry")
]
sample_chunk_table = [
(int(entry.attrib['FirstChunk'])-1, int(entry.attrib['SamplesPerChunk'])) for entry in tree.findall(
"MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/SampleToChunkBox/SampleToChunkEntry")
]
sample_offsets = [None for _ in sample_sizes]
sample_i = 0
for i, (first_chunk, samples_per_chunk) in enumerate(sample_chunk_table):
if i == len(sample_chunk_table)-1:
last_chunk = len(chunk_offsets)-1
else:
last_chunk = sample_chunk_table[i+1][0]-1
for k in range(first_chunk, last_chunk+1):
sample_offset = chunk_offsets[k]
for _ in range(samples_per_chunk):
sample_offsets[sample_i] = sample_offset
sample_offset += sample_sizes[sample_i]
sample_i += 1
assert sample_i == len(sample_sizes)
pts_offset_table = [
( int(entry.attrib['CompositionOffset']), int(entry.attrib['SampleCount']) ) for entry in tree.findall(
"MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/CompositionOffsetBox/CompositionOffsetEntry")
]
sample_pts_offset = [0 for _ in sample_sizes]
sample_i = 0
for dt, count in pts_offset_table:
for _ in range(count):
sample_pts_offset[sample_i] = dt
sample_i += 1
sample_time_table = [
( int(entry.attrib['SampleDelta']), int(entry.attrib['SampleCount']) ) for entry in tree.findall(
"MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/TimeToSampleBox/TimeToSampleEntry")
]
sample_time = [None for _ in sample_sizes]
cur_ts = 0
sample_i = 0
for dt, count in sample_time_table:
for _ in range(count):
sample_time[sample_i] = (cur_ts + sample_pts_offset[sample_i]) * 1000 / time_scale
cur_ts += dt
sample_i += 1
sample_time.sort() # because we ony decode GOPs in PTS order
return {
'width': width,
'height': height,
'sample_offsets': sample_offsets,
'sample_sizes': sample_sizes,
'sample_dependency': sample_dependency,
'sample_time': sample_time,
'sps': sps,
'pps': pps
}
class BaseFrameReader(object):
# properties: frame_type, frame_count, w, h
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def close(self):
pass
def get(self, num, count=1, pix_fmt="yuv420p"):
raise NotImplementedError
def FrameReader(fn, cache_prefix=None, readahead=False, readbehind=False, multithreaded=True):
frame_type = fingerprint_video(fn)
if frame_type == FrameType.raw:
return RawFrameReader(fn)
elif frame_type in (FrameType.h265_stream, FrameType.h264_pstream):
index_data = get_video_index(fn, frame_type, cache_prefix)
if index_data is not None and "predecom" in index_data:
cache_path = cache_path_for_file_path(fn, cache_prefix)
return MKVFrameReader(
os.path.join(os.path.dirname(cache_path), index_data["predecom"]))
else:
return StreamFrameReader(fn, frame_type, index_data,
readahead=readahead, readbehind=readbehind, multithreaded=multithreaded)
elif frame_type == FrameType.h264_mp4:
return MP4FrameReader(fn, readahead=readahead)
elif frame_type == FrameType.ffv1_mkv:
return MKVFrameReader(fn)
else:
raise NotImplementedError(frame_type)
def rgb24toyuv420(rgb):
yuv_from_rgb = np.array([[ 0.299 , 0.587 , 0.114 ],
[-0.14714119, -0.28886916, 0.43601035 ],
[ 0.61497538, -0.51496512, -0.10001026 ]])
img = np.dot(rgb.reshape(-1, 3), yuv_from_rgb.T).reshape(rgb.shape)
y_len = img.shape[0] * img.shape[1]
uv_len = y_len / 4
ys = img[:, :, 0]
us = (img[::2, ::2, 1] + img[1::2, ::2, 1] + img[::2, 1::2, 1] + img[1::2, 1::2, 1]) / 4 + 128
vs = (img[::2, ::2, 2] + img[1::2, ::2, 2] + img[::2, 1::2, 2] + img[1::2, 1::2, 2]) / 4 + 128
yuv420 = np.empty(y_len + 2 * uv_len, dtype=img.dtype)
yuv420[:y_len] = ys.reshape(-1)
yuv420[y_len:y_len + uv_len] = us.reshape(-1)
yuv420[y_len + uv_len:y_len + 2 * uv_len] = vs.reshape(-1)
return yuv420.clip(0,255).astype('uint8')
class RawData(object):
def __init__(self, f):
self.f = _io.FileIO(f, 'rb')
self.lenn = struct.unpack("I", self.f.read(4))[0]
self.count = os.path.getsize(f) / (self.lenn+4)
def read(self, i):
self.f.seek((self.lenn+4)*i + 4)
return self.f.read(self.lenn)
class RawFrameReader(BaseFrameReader):
def __init__(self, fn):
# raw camera
self.fn = fn
self.frame_type = FrameType.raw
self.rawfile = RawData(self.fn)
self.frame_count = self.rawfile.count
self.w, self.h = 640, 480
def load_and_debayer(self, img):
img = np.frombuffer(img, dtype='uint8').reshape(960, 1280)
cimg = np.dstack([img[0::2, 1::2], (
(img[0::2, 0::2].astype("uint16") + img[1::2, 1::2].astype("uint16"))
>> 1).astype("uint8"), img[1::2, 0::2]])
return cimg
def get(self, num, count=1, pix_fmt="yuv420p"):
assert self.frame_count is not None
assert num+count <= self.frame_count
if pix_fmt not in ("yuv420p", "rgb24"):
raise ValueError("Unsupported pixel format %r" % pix_fmt)
app = []
for i in range(num, num+count):
dat = self.rawfile.read(i)
rgb_dat = self.load_and_debayer(dat)
if pix_fmt == "rgb24":
app.append(rgb_dat)
elif pix_fmt == "yuv420p":
app.append(rgb24toyuv420(rgb_dat))
else:
raise NotImplementedError
return app
def decompress_video_data(rawdat, vid_fmt, w, h, pix_fmt, multithreaded=False):
# using a tempfile is much faster than proc.communicate for some reason
with tempfile.TemporaryFile() as tmpf:
tmpf.write(rawdat)
tmpf.seek(0)
proc = subprocess.Popen(
["ffmpeg",
"-threads", "0" if multithreaded else "1",
"-vsync", "0",
"-f", vid_fmt,
"-flags2", "showall",
"-i", "pipe:0",
"-threads", "0" if multithreaded else "1",
"-f", "rawvideo",
"-pix_fmt", pix_fmt,
"pipe:1"],
stdin=tmpf, stdout=subprocess.PIPE, stderr=open("/dev/null"))
# dat = proc.communicate()[0]
dat = proc.stdout.read()
if proc.wait() != 0:
raise DataUnreadableError("ffmpeg failed")
if pix_fmt == "rgb24":
ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, h, w, 3)
elif pix_fmt == "yuv420p":
ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, (h*w*3//2))
elif pix_fmt == "yuv444p":
ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, 3, h, w)
else:
raise NotImplementedError
return ret
class VideoStreamDecompressor(object):
def __init__(self, vid_fmt, w, h, pix_fmt, multithreaded=False):
self.vid_fmt = vid_fmt
self.w = w
self.h = h
self.pix_fmt = pix_fmt
if pix_fmt == "yuv420p":
self.out_size = w*h*3//2 # yuv420p
elif pix_fmt in ("rgb24", "yuv444p"):
self.out_size = w*h*3
else:
raise NotImplementedError
self.out_q = queue.Queue()
self.proc = subprocess.Popen(
["ffmpeg",
"-threads", "0" if multithreaded else "1",
# "-avioflags", "direct",
"-analyzeduration", "0",
"-probesize", "32",
"-flush_packets", "0",
# "-fflags", "nobuffer",
"-vsync", "0",
"-f", vid_fmt,
"-i", "pipe:0",
"-threads", "0" if multithreaded else "1",
"-f", "rawvideo",
"-pix_fmt", pix_fmt,
"pipe:1"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=open("/dev/null", "wb"))
def read_thread():
while True:
r = self.proc.stdout.read(self.out_size)
if len(r) == 0:
break
assert len(r) == self.out_size
self.out_q.put(r)
self.t = threading.Thread(target=read_thread)
self.t.daemon = True
self.t.start()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def write(self, rawdat):
self.proc.stdin.write(rawdat)
self.proc.stdin.flush()
def read(self):
dat = self.out_q.get(block=True)
if self.pix_fmt == "rgb24":
ret = np.frombuffer(dat, dtype=np.uint8).reshape((self.h, self.w, 3))
elif self.pix_fmt == "yuv420p":
ret = np.frombuffer(dat, dtype=np.uint8)
elif self.pix_fmt == "yuv444p":
ret = np.frombuffer(dat, dtype=np.uint8).reshape((3, self.h, self.w))
else:
assert False
return ret
def eos(self):
self.proc.stdin.close()
def close(self):
self.proc.stdin.close()
self.t.join()
self.proc.wait()
assert self.proc.wait() == 0
class MKVFrameReader(BaseFrameReader):
def __init__(self, fn):
self.fn = fn
#print("MKVFrameReader", fn)
index_data = index_mkv(fn)
stream = index_data['probe']['streams'][0]
self.w = stream['width']
self.h = stream['height']
if stream['codec_name'] == 'ffv1':
self.frame_type = FrameType.ffv1_mkv
elif stream['codec_name'] == 'ffvhuff':
self.frame_type = FrameType.ffvhuff_mkv
else:
raise NotImplementedError
self.config_record = index_data['config_record']
self.index = index_data['index']
self.frame_count = len(self.index)
def get(self, num, count=1, pix_fmt="yuv420p"):
assert 0 < num+count <= self.frame_count
frame_dats = []
with FileReader(self.fn) as f:
for i in range(num, num+count):
pos, length, _ = self.index[i]
f.seek(pos)
frame_dats.append(f.read(length))
of = StringIO()
mkvindex.simple_gen(of, self.config_record, self.w, self.h, frame_dats)
r = decompress_video_data(of.getvalue(), "matroska", self.w, self.h, pix_fmt)
assert len(r) == count
return r
class GOPReader(object):
def get_gop(self, num):
# returns (start_frame_num, num_frames, frames_to_skip, gop_data)
raise NotImplementedError
class DoNothingContextManager(object):
def __enter__(self): return self
def __exit__(*x): pass
class GOPFrameReader(BaseFrameReader):
#FrameReader with caching and readahead for formats that are group-of-picture based
def __init__(self, readahead=False, readbehind=False, multithreaded=True):
self.open_ = True
self.multithreaded = multithreaded
self.readahead = readahead
self.readbehind = readbehind
self.frame_cache = LRU(64)
if self.readahead:
self.cache_lock = threading.RLock()
self.readahead_last = None
self.readahead_len = 30
self.readahead_c = threading.Condition()
self.readahead_thread = threading.Thread(target=self._readahead_thread)
self.readahead_thread.daemon = True
self.readahead_thread.start()
else:
self.cache_lock = DoNothingContextManager()
def close(self):
if not self.open_:
return
self.open_ = False
if self.readahead:
self.readahead_c.acquire()
self.readahead_c.notify()
self.readahead_c.release()
self.readahead_thread.join()
def _readahead_thread(self):
while True:
self.readahead_c.acquire()
try:
if not self.open_:
break
self.readahead_c.wait()
finally:
self.readahead_c.release()
if not self.open_:
break
assert self.readahead_last
num, pix_fmt = self.readahead_last
if self.readbehind:
for k in range(num-1, max(0, num-self.readahead_len), -1):
self._get_one(k, pix_fmt)
else:
for k in range(num, min(self.frame_count, num+self.readahead_len)):
self._get_one(k, pix_fmt)
def _get_one(self, num, pix_fmt):
assert num < self.frame_count
if (num, pix_fmt) in self.frame_cache:
return self.frame_cache[(num, pix_fmt)]
with self.cache_lock:
if (num, pix_fmt) in self.frame_cache:
return self.frame_cache[(num, pix_fmt)]
frame_b, num_frames, skip_frames, rawdat = self.get_gop(num)
ret = decompress_video_data(rawdat, self.vid_fmt, self.w, self.h, pix_fmt,
multithreaded=self.multithreaded)
ret = ret[skip_frames:]
assert ret.shape[0] == num_frames
for i in range(ret.shape[0]):
self.frame_cache[(frame_b+i, pix_fmt)] = ret[i]
return self.frame_cache[(num, pix_fmt)]
def get(self, num, count=1, pix_fmt="yuv420p"):
assert self.frame_count is not None
if num + count > self.frame_count:
raise ValueError("{} > {}".format(num + count, self.frame_count))
if pix_fmt not in ("yuv420p", "rgb24", "yuv444p"):
raise ValueError("Unsupported pixel format %r" % pix_fmt)
ret = [self._get_one(num + i, pix_fmt) for i in range(count)]
if self.readahead:
self.readahead_last = (num+count, pix_fmt)
self.readahead_c.acquire()
self.readahead_c.notify()
self.readahead_c.release()
return ret
class MP4GOPReader(GOPReader):
def __init__(self, fn):
self.fn = fn
self.frame_type = FrameType.h264_mp4
self.index = index_mp4(fn)
self.w = self.index['width']
self.h = self.index['height']
self.sample_sizes = self.index['sample_sizes']
self.sample_offsets = self.index['sample_offsets']
self.sample_dependency = self.index['sample_dependency']
self.vid_fmt = "h264"
self.frame_count = len(self.sample_sizes)
self.prefix = "\x00\x00\x00\x01"+self.index['sps']+"\x00\x00\x00\x01"+self.index['pps']
def _lookup_gop(self, num):
frame_b = num
while frame_b > 0 and self.sample_dependency[frame_b]:
frame_b -= 1
frame_e = num+1
while frame_e < (len(self.sample_dependency)-1) and self.sample_dependency[frame_e]:
frame_e += 1
return (frame_b, frame_e)
def get_gop(self, num):
frame_b, frame_e = self._lookup_gop(num)
assert frame_b <= num < frame_e
num_frames = frame_e-frame_b
with FileReader(self.fn) as f:
rawdat = []
sample_i = frame_b
while sample_i < frame_e:
size = self.sample_sizes[sample_i]
start_offset = self.sample_offsets[sample_i]
# try to read contiguously because a read could actually be a http request
sample_i += 1
while sample_i < frame_e and size < 10000000 and start_offset+size == self.sample_offsets[sample_i]:
size += self.sample_sizes[sample_i]
sample_i += 1
f.seek(start_offset)
sampledat = f.read(size)
# read length-prefixed NALUs and output in Annex-B
i = 0
while i < len(sampledat):
nal_len, = struct.unpack(">I", sampledat[i:i+4])
rawdat.append("\x00\x00\x00\x01"+sampledat[i+4:i+4+nal_len])
i = i+4+nal_len
assert i == len(sampledat)
rawdat = self.prefix+''.join(rawdat)
return frame_b, num_frames, 0, rawdat
class MP4FrameReader(MP4GOPReader, GOPFrameReader):
def __init__(self, fn, readahead=False):
MP4GOPReader.__init__(self, fn)
GOPFrameReader.__init__(self, readahead)
class StreamGOPReader(GOPReader):
def __init__(self, fn, frame_type, index_data):
self.fn = fn
self.frame_type = frame_type
self.frame_count = None
self.w, self.h = None, None
self.prefix = None
self.index = None
self.index = index_data['index']
self.prefix = index_data['global_prefix']
probe = index_data['probe']
if self.frame_type == FrameType.h265_stream:
self.prefix_frame_data = None
self.num_prefix_frames = 0
self.vid_fmt = "hevc"
elif self.frame_type == FrameType.h264_pstream:
self.prefix_frame_data = index_data['prefix_frame_data']
self.num_prefix_frames = index_data['num_prefix_frames']
self.vid_fmt = "h264"
i = 0
while i < self.index.shape[0] and self.index[i, 0] != SLICE_I:
i += 1
self.first_iframe = i
if self.frame_type == FrameType.h265_stream:
assert self.first_iframe == 0
self.frame_count = len(self.index)-1
self.w = probe['streams'][0]['width']
self.h = probe['streams'][0]['height']
def _lookup_gop(self, num):
frame_b = num
while frame_b > 0 and self.index[frame_b, 0] != SLICE_I:
frame_b -= 1
frame_e = num+1
while frame_e < (len(self.index)-1) and self.index[frame_e, 0] != SLICE_I:
frame_e += 1
offset_b = self.index[frame_b, 1]
offset_e = self.index[frame_e, 1]
return (frame_b, frame_e, offset_b, offset_e)
def get_gop(self, num):
frame_b, frame_e, offset_b, offset_e = self._lookup_gop(num)
assert frame_b <= num < frame_e
num_frames = frame_e-frame_b
with FileReader(self.fn) as f:
f.seek(offset_b)
rawdat = f.read(offset_e-offset_b)
if num < self.first_iframe:
assert self.prefix_frame_data
rawdat = self.prefix_frame_data + rawdat
rawdat = self.prefix + rawdat
skip_frames = 0
if num < self.first_iframe:
skip_frames = self.num_prefix_frames
return frame_b, num_frames, skip_frames, rawdat
class StreamFrameReader(StreamGOPReader, GOPFrameReader):
def __init__(self, fn, frame_type, index_data, readahead=False, readbehind=False, multithreaded=False):
StreamGOPReader.__init__(self, fn, frame_type, index_data)
GOPFrameReader.__init__(self, readahead, readbehind, multithreaded)
def GOPFrameIterator(gop_reader, pix_fmt, multithreaded=True):
# this is really ugly. ill think about how to refactor it when i can think good
IN_FLIGHT_GOPS = 6 # should be enough that the stream decompressor starts returning data
with VideoStreamDecompressor(
gop_reader.vid_fmt, gop_reader.w, gop_reader.h, pix_fmt, multithreaded) as dec:
read_work = []
def readthing():
# print read_work, dec.out_q.qsize()
outf = dec.read()
read_thing = read_work[0]
if read_thing[0] > 0:
read_thing[0] -= 1
else:
assert read_thing[1] > 0
yield outf
read_thing[1] -= 1
if read_thing[1] == 0:
read_work.pop(0)
i = 0
while i < gop_reader.frame_count:
frame_b, num_frames, skip_frames, gop_data = gop_reader.get_gop(i)
dec.write(gop_data)
i += num_frames
read_work.append([skip_frames, num_frames])
while len(read_work) >= IN_FLIGHT_GOPS:
for v in readthing(): yield v
dec.eos()
while read_work:
for v in readthing(): yield v
def FrameIterator(fn, pix_fmt, **kwargs):
fr = FrameReader(fn, **kwargs)
if isinstance(fr, GOPReader):
for v in GOPFrameIterator(fr, pix_fmt, kwargs.get("multithreaded", True)): yield v
else:
for i in range(fr.frame_count):
yield fr.get(i, pix_fmt=pix_fmt)[0]
def FrameWriter(ofn, frames, vid_fmt=FrameType.ffvhuff_mkv, pix_fmt="rgb24", framerate=20, multithreaded=False):
if pix_fmt not in ("rgb24", "yuv420p"):
raise NotImplementedError
if vid_fmt == FrameType.ffv1_mkv:
assert ofn.endswith(".mkv")
vcodec = "ffv1"
elif vid_fmt == FrameType.ffvhuff_mkv:
assert ofn.endswith(".mkv")
vcodec = "ffvhuff"
else:
raise NotImplementedError
frame_gen = iter(frames)
first_frame = next(frame_gen)
# assert len(frames) > 1
if pix_fmt == "rgb24":
h, w = first_frame.shape[:2]
elif pix_fmt == "yuv420p":
w = first_frame.shape[1]
h = 2*first_frame.shape[0]//3
else:
raise NotImplementedError
compress_proc = subprocess.Popen(
["ffmpeg",
"-threads", "0" if multithreaded else "1",
"-y",
"-framerate", str(framerate),
"-vsync", "0",
"-f", "rawvideo",
"-pix_fmt", pix_fmt,
"-s", "%dx%d" % (w, h),
"-i", "pipe:0",
"-threads", "0" if multithreaded else "1",
"-f", "matroska",
"-vcodec", vcodec,
"-g", "0",
ofn],
stdin=subprocess.PIPE, stderr=open("/dev/null", "wb"))
try:
compress_proc.stdin.write(first_frame.tobytes())
for frame in frame_gen:
compress_proc.stdin.write(frame.tobytes())
compress_proc.stdin.close()
except:
compress_proc.kill()
raise
assert compress_proc.wait() == 0
if __name__ == "__main__":
fn = "cd:/1c79456b0c90f15a/2017-05-10--08-17-00/2/fcamera.hevc"
f = FrameReader(fn)
# print f.get(0, 1).shape
# print f.get(15, 1).shape
for v in GOPFrameIterator(f, "yuv420p"):
print(v)