Refactor overlap method into the pass_predictor module
parent
54d4ee8895
commit
44c37aaff2
|
@ -0,0 +1,34 @@
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
|
||||||
|
def overlap(satpass, scheduledpasses, wait_time_seconds):
|
||||||
|
"""Check if this pass overlaps with already scheduled passes"""
|
||||||
|
# No overlap
|
||||||
|
overlap = False
|
||||||
|
|
||||||
|
# Add wait time
|
||||||
|
tr = satpass['tr']
|
||||||
|
ts = satpass['ts'] + timedelta(seconds=wait_time_seconds)
|
||||||
|
|
||||||
|
# Loop over scheduled passes
|
||||||
|
for scheduledpass in scheduledpasses:
|
||||||
|
# Test pass falls within scheduled pass
|
||||||
|
if tr >= scheduledpass['tr'] and ts < scheduledpass['ts'] + timedelta(
|
||||||
|
seconds=wait_time_seconds):
|
||||||
|
overlap = True
|
||||||
|
# Scheduled pass falls within test pass
|
||||||
|
elif scheduledpass['tr'] >= tr and scheduledpass['ts'] + timedelta(
|
||||||
|
seconds=wait_time_seconds) < ts:
|
||||||
|
overlap = True
|
||||||
|
# Pass start falls within pass
|
||||||
|
elif tr >= scheduledpass['tr'] and tr < scheduledpass['ts'] + timedelta(
|
||||||
|
seconds=wait_time_seconds):
|
||||||
|
overlap = True
|
||||||
|
# Pass end falls within end
|
||||||
|
elif ts >= scheduledpass['tr'] and ts < scheduledpass['ts'] + timedelta(
|
||||||
|
seconds=wait_time_seconds):
|
||||||
|
overlap = True
|
||||||
|
if overlap:
|
||||||
|
break
|
||||||
|
|
||||||
|
return overlap
|
35
utils.py
35
utils.py
|
@ -10,6 +10,8 @@ from tqdm import tqdm
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from auto_scheduler.pass_predictor import overlap
|
||||||
|
|
||||||
|
|
||||||
def get_paginated_endpoint(url, max_entries=None):
|
def get_paginated_endpoint(url, max_entries=None):
|
||||||
r = requests.get(url=url)
|
r = requests.get(url=url)
|
||||||
|
@ -141,39 +143,6 @@ def get_scheduled_passes_from_network(ground_station, tmin, tmax):
|
||||||
return scheduledpasses
|
return scheduledpasses
|
||||||
|
|
||||||
|
|
||||||
def overlap(satpass, scheduledpasses, wait_time_seconds):
|
|
||||||
"""Check if this pass overlaps with already scheduled passes"""
|
|
||||||
# No overlap
|
|
||||||
overlap = False
|
|
||||||
|
|
||||||
# Add wait time
|
|
||||||
tr = satpass['tr']
|
|
||||||
ts = satpass['ts'] + timedelta(seconds=wait_time_seconds)
|
|
||||||
|
|
||||||
# Loop over scheduled passes
|
|
||||||
for scheduledpass in scheduledpasses:
|
|
||||||
# Test pass falls within scheduled pass
|
|
||||||
if tr >= scheduledpass['tr'] and ts < scheduledpass['ts'] + timedelta(
|
|
||||||
seconds=wait_time_seconds):
|
|
||||||
overlap = True
|
|
||||||
# Scheduled pass falls within test pass
|
|
||||||
elif scheduledpass['tr'] >= tr and scheduledpass['ts'] + timedelta(
|
|
||||||
seconds=wait_time_seconds) < ts:
|
|
||||||
overlap = True
|
|
||||||
# Pass start falls within pass
|
|
||||||
elif tr >= scheduledpass['tr'] and tr < scheduledpass['ts'] + timedelta(
|
|
||||||
seconds=wait_time_seconds):
|
|
||||||
overlap = True
|
|
||||||
# Pass end falls within end
|
|
||||||
elif ts >= scheduledpass['tr'] and ts < scheduledpass['ts'] + timedelta(
|
|
||||||
seconds=wait_time_seconds):
|
|
||||||
overlap = True
|
|
||||||
if overlap:
|
|
||||||
break
|
|
||||||
|
|
||||||
return overlap
|
|
||||||
|
|
||||||
|
|
||||||
def ordered_scheduler(passes, scheduledpasses, wait_time_seconds):
|
def ordered_scheduler(passes, scheduledpasses, wait_time_seconds):
|
||||||
"""Loop through a list of ordered passes and schedule each next one that fits"""
|
"""Loop through a list of ordered passes and schedule each next one that fits"""
|
||||||
# Loop over passes
|
# Loop over passes
|
||||||
|
|
Loading…
Reference in New Issue