Speedup URLFile (#1888)

* add parallel download support to URLFile

* make python 3.8 happy

* Fix chunk size

* Automatic number of threads

* No daemon threads in unlogger

* Cache length

* dont touch old filereader

* Remove debug info

* remove debug script

* Ignore type
pull/1865/head
Willem Melching 2020-07-20 17:10:08 +02:00 committed by GitHub
parent ebadb39e42
commit c70700758d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 4 deletions

View File

@ -1,4 +1,10 @@
from common.url_file import URLFile
import os
if "COMMA_PARALLEL_DOWNLOADS" in os.environ:
from tools.lib.url_file_parallel import URLFileParallel as URLFile
else:
from tools.lib.url_file import URLFile # type: ignore
def FileReader(fn, debug=False):
if fn.startswith("http://") or fn.startswith("https://"):

View File

@ -0,0 +1,81 @@
# pylint: skip-file
import os
import pycurl
from tools.lib.url_file import URLFile
from io import BytesIO
from tenacity import retry, wait_random_exponential, stop_after_attempt
from multiprocessing import Pool
class URLFileParallel(URLFile):
def __init__(self, url, debug=False):
self._length = None
self._url = url
self._pos = 0
self._local_file = None
def get_curl(self):
curl = pycurl.Curl()
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.TIMEOUT_MS, 500000)
curl.setopt(pycurl.FOLLOWLOCATION, True)
return curl
@retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True)
def get_length(self):
if self._length is not None:
return self._length
c = self.get_curl()
c.setopt(pycurl.URL, self._url)
c.setopt(c.NOBODY, 1)
c.perform()
length = int(c.getinfo(c.CONTENT_LENGTH_DOWNLOAD))
self._length = length
return length
@retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True)
def download_chunk(self, start, end=None):
if end is None:
trange = f'bytes={start}-'
else:
trange = f'bytes={start}-{end}'
dats = BytesIO()
c = self.get_curl()
c.setopt(pycurl.URL, self._url)
c.setopt(pycurl.WRITEDATA, dats)
c.setopt(pycurl.HTTPHEADER, ["Range: " + trange, "Connection: keep-alive"])
c.perform()
response_code = c.getinfo(pycurl.RESPONSE_CODE)
if response_code != 206 and response_code != 200:
raise Exception("Error {} ({}): {}".format(response_code, self._url, repr(dats.getvalue())[:500]))
return dats.getvalue()
def read(self, ll=None):
start = self._pos
end = None if ll is None else self._pos + ll - 1
max_threads = int(os.environ.get("COMMA_PARALLEL_DOWNLOADS", "0"))
end = self.get_length() if end is None else end
threads = min((end - start) // (512 * 1024), max_threads) # At least 512k per thread
if threads > 1:
chunk_size = (end - start) // threads
chunks = [
(start + chunk_size * i,
start + chunk_size * (i + 1) - 1 if i != threads - 1 else end)
for i in range(threads)]
with Pool(threads) as pool:
ret = b"".join(pool.starmap(self.download_chunk, chunks))
else:
ret = self.download_chunk(start, end)
self._pos += len(ret)
return ret

View File

@ -413,9 +413,6 @@ def main(argv):
args=(command_address, forward_commands_address, data_address, args.realtime,
_get_address_mapping(args), args.publish_time_length, args.bind_early, args.no_loop))
for p in subprocesses.values():
p.daemon = True
subprocesses["data"].start()
subprocesses["control"].start()