I need to request data periodically from a configurable number of devices at configurable intervals (per device). All devices are connected to a shared data bus, so only one device can send data at the same time.
The devices have very little memory, so each device can only keep the data for a certain period of time before it is overwritten by the next chunk. This means I need to make sure to request data from any given device while it is still available, or else it will be lost.
I am looking for an algorithm that, given a list of devices and their respective timing properties, finds a feasible schedule in order to achieve minimal data loss.
I guess each device could be formally described using the following properties:
data_interval
: time it takes for the next chunk of data to become available
max_request_interval
: maximum amount of time between requests that will not cause data loss
processing_time
: time it takes to send a request and fully receive the corresponding response containing the requested data
Basically, I need to make sure to request data from every device once its data is ready and not yet expired, while keeping in mind the deadlines for all other devices.
Is there some sort of algorithm for this kind of problem? I highly doubt I'm the first person to ever encounter a situation like this. Searching for existing solutions online didn't yield many useful results, mainly because scheduling algorithms are mostly used for operating systems and such, where scheduled processes can be paused and resumed at will. I can't do this in my case, however, since the process of requesting and receiving a chunk of data is atomic, i.e. it can only be performed in its entirety or not at all.
I solved this problem using non-preemptive deadline monotonic scheduling.
Here's some python code for anyone interested:
"""This module implements non-preemptive deadline monotonic scheduling (NPDMS) to compute a schedule of periodic,
non-preemptable requests to slave devices connected to a shared data bus"""
from math import gcd
from functools import reduce
from typing import List
class Slave:
def __init__(self, name: str, period: int, processing_time: int, offset=0, deadline=None):
self.name = name
self.period = int(period)
self.processing_time = int(processing_time)
self.offset = int(offset)
if self.offset >= self.period:
raise ValueError("Slave %s: offset must be < period" % name)
self.deadline = int(deadline) if deadline else self.period
if self.deadline > self.period:
raise ValueError("Slave %s: deadline must be <= period" % name)
class Request:
def __init__(self, slave: Slave, start_time: int):
self.slave = slave
self.start_time = start_time
self.end_time = start_time + slave.processing_time
self.duration = self.end_time - self.start_time
def overlaps_with(self, other: 'Request'):
min_duration = self.duration + other.duration
start = min(other.start_time, self.start_time)
end = max(other.end_time, self.end_time)
effective_duration = end - start
return effective_duration < min_duration
class Scenario:
def __init__(self, *slaves: Slave):
self.slaves = list(slaves)
self.slaves.sort(key=lambda slave: slave.deadline)
# LCM of all slave periods
self.cycle_period = reduce(lambda a, b: a * b // gcd(a, b), [slave.period for slave in slaves])
def compute_schedule(self, resolution=1) -> 'Schedule':
request_pool = []
for t in range(0, self.cycle_period, resolution):
for slave in self.slaves:
if (t - slave.offset) % slave.period == 0 and t >= slave.offset:
request_pool.append(Request(slave, t))
request_pool.reverse()
scheduled_requests = []
current_request = request_pool.pop()
t = current_request.start_time
while t < self.cycle_period:
ongoing_request = Request(current_request.slave, t)
while ongoing_request.start_time <= t < ongoing_request.end_time:
t += resolution
scheduled_requests.append(ongoing_request)
if len(request_pool):
current_request = request_pool.pop()
t = max(current_request.start_time, t)
else:
current_request = None
break
if current_request:
request_pool.append(current_request)
return Schedule(self, scheduled_requests, request_pool)
class Schedule:
def __init__(self, scenario: Scenario, requests: List[Request], unscheduled: List[Request] = None):
self.scenario = scenario
self.requests = requests
self.unscheduled_requests = unscheduled if unscheduled else []
self._utilization = 0
for slave in self.scenario.slaves:
self._utilization += float(slave.processing_time) / float(slave.period)
self._missed_deadlines_dict = {}
for slave in self.scenario.slaves:
periods = scenario.cycle_period // slave.period
missed_deadlines = []
for period in range(periods):
start = period * slave.period
end = start + slave.period
request = self._find_request(slave, start, end)
if request:
if request.start_time < (start + slave.offset) or request.end_time > start + slave.deadline:
missed_deadlines.append(request)
if missed_deadlines:
self._missed_deadlines_dict[slave] = missed_deadlines
self._overlapping_requests = []
for i in range(0, len(requests)):
if i == 0:
continue
previous_request = requests[i - 1]
current_request = requests[i]
if current_request.overlaps_with(previous_request):
self._overlapping_requests.append((current_request, previous_request))
self._incomplete_requests = []
for request in self.requests:
if request.duration < request.slave.processing_time:
self._incomplete_requests.append(request)
@property
def is_feasible(self) -> bool:
return self.utilization <= 1 \
and not self.has_missed_deadlines \
and not self.has_overlapping_requests \
and not self.has_unscheduled_requests \
and not self.has_incomplete_requests
@property
def utilization(self) -> float:
return self._utilization
@property
def has_missed_deadlines(self) -> bool:
return len(self._missed_deadlines_dict) > 0
@property
def has_overlapping_requests(self) -> bool:
return len(self._overlapping_requests) > 0
@property
def has_unscheduled_requests(self) -> bool:
return len(self.unscheduled_requests) > 0
@property
def has_incomplete_requests(self) -> bool:
return len(self._incomplete_requests) > 0
def _find_request(self, slave, start, end) -> [Request, None]:
for r in self.requests:
if r.slave == slave and r.start_time >= start and r.end_time < end:
return r
return None
def read_scenario(file) -> Scenario:
from csv import DictReader
return Scenario(*[Slave(**row) for row in DictReader(file)])
def write_schedule(schedule: Schedule, file):
from csv import DictWriter
writer = DictWriter(file, fieldnames=["name", "start", "end"])
writer.writeheader()
for request in schedule.requests:
writer.writerow({"name": request.slave.name, "start": request.start_time, "end": request.end_time})
if __name__ == '__main__':
import argparse
import sys
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
description='Use non-preemptive deadline monotonic scheduling (NPDMS) to\n'
'compute a schedule of periodic, non-preemptable requests to\n'
'slave devices connected to a shared data bus.\n\n'
'Prints the computed schedule to stdout as CSV. Returns with\n'
'exit code 0 if the schedule is feasible, else 1.')
parser.add_argument("csv_file", metavar="SCENARIO", type=str,
help="A csv file describing the scenario, i.e. a list\n"
"of slave devices with the following properties:\n"
"* name: name/id of the slave device\n\n"
"* period: duration of the period of time during\n"
" which requests must be dispatched\n\n"
"* processing_time: amount of time it takes to\n"
" fully process a request (worst-case)\n\n"
"* offset: offset for initial phase-shifting\n"
" (default: 0)\n\n"
"* deadline: amount of time during which data is\n"
" available after the start of each period\n"
" (default: <period>)")
parser.add_argument("-r", "--resolution", type=int, default=1,
help="The resolution used to simulate the passage of time (default: 1)")
args = parser.parse_args()
with open(args.csv_file, 'r') as f:
schedule = read_scenario(f).compute_schedule(args.resolution)
write_schedule(schedule, sys.stdout)
exit(0 if schedule.is_feasible else 1)