1
0
Fork 0
tinygrab/extra/helpers.py

73 lines
1.7 KiB
Python
Raw Normal View History

import multiprocessing, subprocess
import cloudpickle
from typing import Any
2023-12-04 22:01:04 -07:00
def _early_exec_process(qin, qout):
2023-12-04 22:01:04 -07:00
while True:
path, inp = qin.get()
try:
qout.put(subprocess.check_output(path, input=inp))
except Exception as e:
qout.put(e)
2023-05-06 12:56:09 -06:00
def enable_early_exec():
2023-12-04 22:01:04 -07:00
qin: multiprocessing.Queue = multiprocessing.Queue()
qout: multiprocessing.Queue = multiprocessing.Queue()
p = multiprocessing.Process(target=_early_exec_process, args=(qin, qout))
p.daemon = True
p.start()
def early_exec(x):
qin.put(x)
ret = qout.get()
if isinstance(ret, Exception):
raise ret
else:
return ret
return early_exec
def proc(itermaker, q) -> None:
2023-12-04 22:01:04 -07:00
try:
for x in itermaker():
q.put(x)
except Exception as e:
q.put(e)
finally:
q.put(None)
q.close()
class _CloudpickleFunctionWrapper:
2023-12-04 22:01:04 -07:00
def __init__(self, fn):
self.fn = fn
def __getstate__(self):
return cloudpickle.dumps(self.fn)
def __setstate__(self, pfn):
self.fn = cloudpickle.loads(pfn)
def __call__(self, *args, **kwargs) -> Any:
return self.fn(*args, **kwargs)
2023-06-27 22:23:26 -06:00
def cross_process(itermaker, maxsize=16):
2023-12-04 22:01:04 -07:00
q: multiprocessing.Queue = multiprocessing.Queue(maxsize)
# multiprocessing uses pickle which cannot dump lambdas, so use cloudpickle.
p = multiprocessing.Process(
target=proc, args=(_CloudpickleFunctionWrapper(itermaker), q)
)
p.start()
while True:
ret = q.get()
if isinstance(ret, Exception):
raise ret
elif ret is None:
break
else:
yield ret