Search code examples
algorithmarchitectureembeddedschedulingchannel

Scheduling periodic requests to multiple devices using a shared channel


I need to request data periodically from a configurable number of devices at configurable intervals (per device). All devices are connected to a shared data bus, so only one device can send data at the same time.

The devices have very little memory, so each device can only keep the data for a certain period of time before it is overwritten by the next chunk. This means I need to make sure to request data from any given device while it is still available, or else it will be lost.

I am looking for an algorithm that, given a list of devices and their respective timing properties, finds a feasible schedule in order to achieve minimal data loss.

I guess each device could be formally described using the following properties:

data_interval: time it takes for the next chunk of data to become available

max_request_interval: maximum amount of time between requests that will not cause data loss

processing_time: time it takes to send a request and fully receive the corresponding response containing the requested data

Basically, I need to make sure to request data from every device once its data is ready and not yet expired, while keeping in mind the deadlines for all other devices.

Is there some sort of algorithm for this kind of problem? I highly doubt I'm the first person to ever encounter a situation like this. Searching for existing solutions online didn't yield many useful results, mainly because scheduling algorithms are mostly used for operating systems and such, where scheduled processes can be paused and resumed at will. I can't do this in my case, however, since the process of requesting and receiving a chunk of data is atomic, i.e. it can only be performed in its entirety or not at all.


Solution

  • I solved this problem using non-preemptive deadline monotonic scheduling.

    Here's some python code for anyone interested:

    """This module implements non-preemptive deadline monotonic scheduling (NPDMS) to compute a schedule of periodic,
    non-preemptable requests to slave devices connected to a shared data bus"""
    
    from math import gcd
    from functools import reduce
    from typing import List
    
    
    class Slave:
    
        def __init__(self, name: str, period: int, processing_time: int, offset=0, deadline=None):
            self.name = name
            self.period = int(period)
            self.processing_time = int(processing_time)
            self.offset = int(offset)
            if self.offset >= self.period:
                raise ValueError("Slave %s: offset must be < period" % name)
            self.deadline = int(deadline) if deadline else self.period
            if self.deadline > self.period:
                raise ValueError("Slave %s: deadline must be <= period" % name)
    
    
    class Request:
    
        def __init__(self, slave: Slave, start_time: int):
            self.slave = slave
            self.start_time = start_time
            self.end_time = start_time + slave.processing_time
            self.duration = self.end_time - self.start_time
    
        def overlaps_with(self, other: 'Request'):
            min_duration = self.duration + other.duration
            start = min(other.start_time, self.start_time)
            end = max(other.end_time, self.end_time)
            effective_duration = end - start
            return effective_duration < min_duration
    
    
    class Scenario:
    
        def __init__(self, *slaves: Slave):
            self.slaves = list(slaves)
            self.slaves.sort(key=lambda slave: slave.deadline)
            # LCM of all slave periods
            self.cycle_period = reduce(lambda a, b: a * b // gcd(a, b), [slave.period for slave in slaves])
    
        def compute_schedule(self, resolution=1) -> 'Schedule':
            request_pool = []
            for t in range(0, self.cycle_period, resolution):
                for slave in self.slaves:
                    if (t - slave.offset) % slave.period == 0 and t >= slave.offset:
                        request_pool.append(Request(slave, t))
            request_pool.reverse()
    
            scheduled_requests = []
            current_request = request_pool.pop()
            t = current_request.start_time
            while t < self.cycle_period:
                ongoing_request = Request(current_request.slave, t)
                while ongoing_request.start_time <= t < ongoing_request.end_time:
                    t += resolution
                scheduled_requests.append(ongoing_request)
                if len(request_pool):
                    current_request = request_pool.pop()
                    t = max(current_request.start_time, t)
                else:
                    current_request = None
                    break
    
            if current_request:
                request_pool.append(current_request)
    
            return Schedule(self, scheduled_requests, request_pool)
    
    
    class Schedule:
    
        def __init__(self, scenario: Scenario, requests: List[Request], unscheduled: List[Request] = None):
            self.scenario = scenario
            self.requests = requests
            self.unscheduled_requests = unscheduled if unscheduled else []
    
            self._utilization = 0
            for slave in self.scenario.slaves:
                self._utilization += float(slave.processing_time) / float(slave.period)
    
            self._missed_deadlines_dict = {}
            for slave in self.scenario.slaves:
                periods = scenario.cycle_period // slave.period
                missed_deadlines = []
                for period in range(periods):
                    start = period * slave.period
                    end = start + slave.period
                    request = self._find_request(slave, start, end)
                    if request:
                        if request.start_time < (start + slave.offset) or request.end_time > start + slave.deadline:
                            missed_deadlines.append(request)
                if missed_deadlines:
                    self._missed_deadlines_dict[slave] = missed_deadlines
    
            self._overlapping_requests = []
            for i in range(0, len(requests)):
                if i == 0:
                    continue
                previous_request = requests[i - 1]
                current_request = requests[i]
                if current_request.overlaps_with(previous_request):
                    self._overlapping_requests.append((current_request, previous_request))
    
            self._incomplete_requests = []
            for request in self.requests:
                if request.duration < request.slave.processing_time:
                    self._incomplete_requests.append(request)
    
        @property
        def is_feasible(self) -> bool:
            return self.utilization <= 1 \
                   and not self.has_missed_deadlines \
                   and not self.has_overlapping_requests \
                   and not self.has_unscheduled_requests \
                   and not self.has_incomplete_requests
    
        @property
        def utilization(self) -> float:
            return self._utilization
    
        @property
        def has_missed_deadlines(self) -> bool:
            return len(self._missed_deadlines_dict) > 0
    
        @property
        def has_overlapping_requests(self) -> bool:
            return len(self._overlapping_requests) > 0
    
        @property
        def has_unscheduled_requests(self) -> bool:
            return len(self.unscheduled_requests) > 0
    
        @property
        def has_incomplete_requests(self) -> bool:
            return len(self._incomplete_requests) > 0
    
        def _find_request(self, slave, start, end) -> [Request, None]:
            for r in self.requests:
                if r.slave == slave and r.start_time >= start and r.end_time < end:
                    return r
            return None
    
    
    def read_scenario(file) -> Scenario:
        from csv import DictReader
        return Scenario(*[Slave(**row) for row in DictReader(file)])
    
    
    def write_schedule(schedule: Schedule, file):
        from csv import DictWriter
        writer = DictWriter(file, fieldnames=["name", "start", "end"])
        writer.writeheader()
        for request in schedule.requests:
            writer.writerow({"name": request.slave.name, "start": request.start_time, "end": request.end_time})
    
    
    if __name__ == '__main__':
        import argparse
        import sys
    
        parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
                                         description='Use non-preemptive deadline monotonic scheduling (NPDMS) to\n'
                                                     'compute a schedule of periodic, non-preemptable requests to\n'
                                                     'slave devices connected to a shared data bus.\n\n'
                                                     'Prints the computed schedule to stdout as CSV. Returns with\n'
                                                     'exit code 0 if the schedule is feasible, else 1.')
        parser.add_argument("csv_file", metavar="SCENARIO", type=str,
                            help="A csv file describing the scenario, i.e. a list\n"
                                 "of slave devices with the following properties:\n"
                                 "* name:            name/id of the slave device\n\n"
                                 "* period:          duration of the period of time during\n"
                                 "                   which requests must be dispatched\n\n"
                                 "* processing_time: amount of time it takes to\n"
                                 "                   fully process a request (worst-case)\n\n"
                                 "* offset:          offset for initial phase-shifting\n"
                                 "                   (default: 0)\n\n"
                                 "* deadline:        amount of time during which data is\n"
                                 "                   available after the start of each period\n"
                                 "                   (default: <period>)")
    
        parser.add_argument("-r", "--resolution", type=int, default=1,
                            help="The resolution used to simulate the passage of time (default: 1)")
    
        args = parser.parse_args()
    
        with open(args.csv_file, 'r') as f:
            schedule = read_scenario(f).compute_schedule(args.resolution)
            write_schedule(schedule, sys.stdout)
            exit(0 if schedule.is_feasible else 1)