Python: Replace more lists with generators (#23116)
* Replace lists with generators v2 * Replace set with {} Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Replace more set() with {} Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>pull/23128/head
parent
b8c42d01eb
commit
6951b3271d
|
@ -4,7 +4,7 @@ import ast
|
|||
import stat
|
||||
import subprocess
|
||||
|
||||
fouts = set([x.decode('utf-8') for x in subprocess.check_output(['git', 'ls-files']).strip().split()])
|
||||
fouts = {x.decode('utf-8') for x in subprocess.check_output(['git', 'ls-files']).strip().split()}
|
||||
|
||||
pyf = []
|
||||
for d in ["cereal", "common", "scripts", "selfdrive", "tools"]:
|
||||
|
|
|
@ -10,6 +10,6 @@ with open(os.path.join(BASEDIR, "docs/CARS.md")) as f:
|
|||
cars = [l for l in lines if l.strip().startswith("|") and l.strip().endswith("|") and
|
||||
"Make" not in l and any(c.isalpha() for c in l)]
|
||||
|
||||
make_count = Counter([l.split('|')[1].split('|')[0].strip() for l in cars])
|
||||
make_count = Counter(l.split('|')[1].split('|')[0].strip() for l in cars)
|
||||
print("\n", "*"*20, len(cars), "total", "*"*20, "\n")
|
||||
pprint(make_count)
|
||||
|
|
|
@ -37,7 +37,7 @@ def extract_image(buf, w, h, stride):
|
|||
|
||||
|
||||
def rois_in_focus(lapres: List[float]) -> float:
|
||||
return sum([1. / len(lapres) for sharpness in lapres if sharpness >= LM_THRESH])
|
||||
return sum(1. / len(lapres) for sharpness in lapres if sharpness >= LM_THRESH)
|
||||
|
||||
|
||||
def get_snapshots(frame="roadCameraState", front_frame="driverCameraState", focus_perc_threshold=0.):
|
||||
|
|
|
@ -102,7 +102,7 @@ def create_acc_commands(packer, enabled, accel, jerk, idx, lead_visible, set_spe
|
|||
"CR_VSM_Alive": idx % 0xF,
|
||||
}
|
||||
scc12_dat = packer.make_can_msg("SCC12", 0, scc12_values)[2]
|
||||
scc12_values["CR_VSM_ChkSum"] = 0x10 - sum([sum(divmod(i, 16)) for i in scc12_dat]) % 0x10
|
||||
scc12_values["CR_VSM_ChkSum"] = 0x10 - sum(sum(divmod(i, 16)) for i in scc12_dat) % 0x10
|
||||
|
||||
commands.append(packer.make_can_msg("SCC12", 0, scc12_values))
|
||||
|
||||
|
@ -127,7 +127,7 @@ def create_acc_commands(packer, enabled, accel, jerk, idx, lead_visible, set_spe
|
|||
"FCA_Status": 1, # AEB disabled
|
||||
}
|
||||
fca11_dat = packer.make_can_msg("FCA11", 0, fca11_values)[2]
|
||||
fca11_values["CR_FCA_ChkSum"] = 0x10 - sum([sum(divmod(i, 16)) for i in fca11_dat]) % 0x10
|
||||
fca11_values["CR_FCA_ChkSum"] = 0x10 - sum(sum(divmod(i, 16)) for i in fca11_dat) % 0x10
|
||||
commands.append(packer.make_can_msg("FCA11", 0, fca11_values))
|
||||
|
||||
return commands
|
||||
|
|
|
@ -35,7 +35,7 @@ class TestFwFingerprint(unittest.TestCase):
|
|||
passed = True
|
||||
for car_model, ecus in FW_VERSIONS.items():
|
||||
for ecu, ecu_fw in ecus.items():
|
||||
duplicates = set([fw for fw in ecu_fw if ecu_fw.count(fw) > 1])
|
||||
duplicates = {fw for fw in ecu_fw if ecu_fw.count(fw) > 1}
|
||||
if len(duplicates):
|
||||
print(car_model, ECU_NAME[ecu[0]], duplicates)
|
||||
passed = False
|
||||
|
|
|
@ -119,5 +119,5 @@ if __name__ == "__main__":
|
|||
for x in l:
|
||||
print(x[2])
|
||||
print('avg sum: {0:.2%} over {1} samples {2} seconds\n'.format(
|
||||
sum([stat['avg']['total'] for k, stat in stats.items()]), i, i * SLEEP_INTERVAL
|
||||
sum(stat['avg']['total'] for k, stat in stats.items()), i, i * SLEEP_INTERVAL
|
||||
))
|
||||
|
|
|
@ -25,7 +25,7 @@ def make_pie(msgs, typ):
|
|||
print()
|
||||
|
||||
sizes_large = [(k, sz) for (k, sz) in sizes if sz >= total * MIN_SIZE / 100]
|
||||
sizes_large += [('other', sum([sz for (_, sz) in sizes if sz < total * MIN_SIZE / 100]))]
|
||||
sizes_large += [('other', sum(sz for (_, sz) in sizes if sz < total * MIN_SIZE / 100))]
|
||||
|
||||
labels, sizes = zip(*sizes_large)
|
||||
|
||||
|
|
|
@ -22,10 +22,10 @@ class TestLogcatdAndroid(unittest.TestCase):
|
|||
# write some log messages
|
||||
sent_msgs = {}
|
||||
for __ in range(random.randint(5, 50)):
|
||||
msg = ''.join([random.choice(string.ascii_letters) for _ in range(random.randrange(2, 50))])
|
||||
msg = ''.join(random.choice(string.ascii_letters) for _ in range(random.randrange(2, 50)))
|
||||
if msg in sent_msgs:
|
||||
continue
|
||||
sent_msgs[msg] = ''.join([random.choice(string.ascii_letters) for _ in range(random.randrange(2, 20))])
|
||||
sent_msgs[msg] = ''.join(random.choice(string.ascii_letters) for _ in range(random.randrange(2, 20)))
|
||||
os.system(f"log -t '{sent_msgs[msg]}' '{msg}'")
|
||||
|
||||
time.sleep(1)
|
||||
|
|
|
@ -131,13 +131,13 @@ class TestLoggerd(unittest.TestCase):
|
|||
route_path = str(self._get_latest_log_dir()).rsplit("--", 1)[0]
|
||||
for n in range(num_segs):
|
||||
p = Path(f"{route_path}--{n}")
|
||||
logged = set([f.name for f in p.iterdir() if f.is_file()])
|
||||
logged = {f.name for f in p.iterdir() if f.is_file()}
|
||||
diff = logged ^ expected_files
|
||||
self.assertEqual(len(diff), 0, f"didn't get all expected files. run={_} seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}")
|
||||
|
||||
def test_bootlog(self):
|
||||
# generate bootlog with fake launch log
|
||||
launch_log = ''.join([str(random.choice(string.printable)) for _ in range(100)])
|
||||
launch_log = ''.join(str(random.choice(string.printable)) for _ in range(100))
|
||||
with open("/tmp/launch_log", "w") as f:
|
||||
f.write(launch_log)
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ def build(spinner, dirty=False):
|
|||
# Show TextWindow
|
||||
spinner.close()
|
||||
if not os.getenv("CI"):
|
||||
error_s = "\n \n".join(["\n".join(textwrap.wrap(e, 65)) for e in errors])
|
||||
error_s = "\n \n".join("\n".join(textwrap.wrap(e, 65)) for e in errors)
|
||||
with TextWindow("openpilot failed to build\n \n" + error_s) as t:
|
||||
t.wait_for_exit()
|
||||
exit(1)
|
||||
|
|
|
@ -154,8 +154,8 @@ def manager_thread():
|
|||
|
||||
started_prev = started
|
||||
|
||||
running = ' '.join(["%s%s\u001b[0m" % ("\u001b[32m" if p.proc.is_alive() else "\u001b[31m", p.name)
|
||||
for p in managed_processes.values() if p.proc])
|
||||
running = ' '.join("%s%s\u001b[0m" % ("\u001b[32m" if p.proc.is_alive() else "\u001b[31m", p.name)
|
||||
for p in managed_processes.values() if p.proc)
|
||||
print(running)
|
||||
cloudlog.debug(running)
|
||||
|
||||
|
|
|
@ -65,7 +65,7 @@ if use_thneed and arch in ("aarch64", "larch64"):
|
|||
compiler = lenv.Program('thneed/compile', ["thneed/compile.cc"]+common_model, LIBS=libs)
|
||||
cmd = f"cd {Dir('.').abspath} && {compiler[0].abspath} ../../models/supercombo.dlc ../../models/supercombo.thneed --binary"
|
||||
|
||||
lib_paths = ':'.join([Dir(p).abspath for p in lenv["LIBPATH"]])
|
||||
lib_paths = ':'.join(Dir(p).abspath for p in lenv["LIBPATH"])
|
||||
cenv = Environment(ENV={'LD_LIBRARY_PATH': f"{lib_paths}:{lenv['ENV']['LD_LIBRARY_PATH']}"})
|
||||
cenv.Command("../../models/supercombo.thneed", ["../../models/supercombo.dlc", compiler], cmd)
|
||||
|
||||
|
|
|
@ -171,7 +171,7 @@ class TestMonitoring(unittest.TestCase):
|
|||
# - dm should stay quiet when not engaged
|
||||
def test_pure_dashcam_user(self):
|
||||
events, _ = self._run_seq(always_distracted, always_false, always_false, always_false)
|
||||
self.assertTrue(np.sum([len(event) for event in events]) == 0)
|
||||
self.assertTrue(sum(len(event) for event in events) == 0)
|
||||
|
||||
# engaged, car stops at traffic light, down to orange, no action, then car starts moving
|
||||
# - should only reach green when stopped, but continues counting down on launch
|
||||
|
|
|
@ -18,7 +18,7 @@ EPSILON = sys.float_info.epsilon
|
|||
|
||||
|
||||
def save_log(dest, log_msgs, compress=True):
|
||||
dat = b"".join([msg.as_builder().to_bytes() for msg in tqdm(log_msgs)])
|
||||
dat = b"".join(msg.as_builder().to_bytes() for msg in tqdm(log_msgs))
|
||||
|
||||
if compress:
|
||||
dat = bz2.compress(dat)
|
||||
|
@ -61,8 +61,8 @@ def compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=Non
|
|||
log1, log2 = [list(filter(lambda m: m.which() not in ignore_msgs, log)) for log in (log1, log2)]
|
||||
|
||||
if len(log1) != len(log2):
|
||||
cnt1 = Counter([m.which() for m in log1])
|
||||
cnt2 = Counter([m.which() for m in log2])
|
||||
cnt1 = Counter(m.which() for m in log1)
|
||||
cnt2 = Counter(m.which() for m in log2)
|
||||
raise Exception(f"logs are not same length: {len(log1)} VS {len(log2)}\n\t\t{cnt1}\n\t\t{cnt2}")
|
||||
|
||||
diff = []
|
||||
|
|
|
@ -190,7 +190,7 @@ class TestOnroad(unittest.TestCase):
|
|||
total_size = sum(len(m.as_builder().to_bytes()) for m in msgs)
|
||||
self.assertLess(total_size, 3.5e5)
|
||||
|
||||
cnt = Counter([json.loads(m.logMessage)['filename'] for m in msgs])
|
||||
cnt = Counter(json.loads(m.logMessage)['filename'] for m in msgs)
|
||||
big_logs = [f for f, n in cnt.most_common(3) if n / sum(cnt.values()) > 30.]
|
||||
self.assertEqual(len(big_logs), 0, f"Log spam: {big_logs}")
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ SOUNDS = {
|
|||
def get_total_writes():
|
||||
audio_flinger = subprocess.check_output('dumpsys media.audio_flinger', shell=True, encoding='utf-8').strip()
|
||||
write_lines = [l for l in audio_flinger.split('\n') if l.strip().startswith('Total writes')]
|
||||
return sum([int(l.split(':')[1]) for l in write_lines])
|
||||
return sum(int(l.split(':')[1]) for l in write_lines)
|
||||
|
||||
class TestSoundd(unittest.TestCase):
|
||||
def test_sound_card_init(self):
|
||||
|
|
|
@ -64,7 +64,7 @@ def joystick_thread(use_keyboard):
|
|||
dat.testJoystick.axes = [joystick.axes_values[a] for a in joystick.axes_values]
|
||||
dat.testJoystick.buttons = [joystick.cancel]
|
||||
joystick_sock.send(dat.to_bytes())
|
||||
print('\n' + ', '.join([f'{name}: {round(v, 3)}' for name, v in joystick.axes_values.items()]))
|
||||
print('\n' + ', '.join(f'{name}: {round(v, 3)}' for name, v in joystick.axes_values.items()))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Reference in New Issue