Search code examples
pythonpython-2.7subprocessschedulerexecfile

Custom Scheduler to have sequential + semi-sequential scripts with timeouts/kill switches?


Below is a big section of my code and basically if you scroll down to the execute_subscripts() function you can see I've got two scripts running via execfile which work beautifully, they show prints, they save traceback errors to an error file.

I'm trying to turn the second script into one that doesn't wait for itself to finish before moving onto the next script.

As you can see I have attempted to use subprocess with Popen to launch a silent, hidden window... however it doesn't seem to run and I have no idea how to use the p.communicate() function correctly to retrieve tracebacks and/or prints.

I also... need help creating some sort of timeout/kill switch so if a subscript either via Popen or the execfile route doesn't complete within 5 minutes for it to skip over it for that loop or retry and skip over if it immediately fails again.

I understand that I probably shouldn't be using strftime for the times.... however that part works fine for me so I don't see the need to change it.

from datetime import date, timedelta
from sched import scheduler
from time import time, sleep, strftime
import random
import traceback
import subprocess

s = scheduler(time, sleep)
random.seed()

def periodically(runtime, intsmall, intlarge, function):

     ## Get current time
    currenttime = strftime('%H:%M:%S')

    ## If currenttime is anywhere between 23:40 and 23:50 then...
    if currenttime > '23:40:00' and currenttime < '23:50:00':

        ## Open the error logging file as the variable "errors"
        errors = open('MISC/ERROR(S).txt', 'a')

        ## Try to...
        try:
            ## Call the clear subscript.
            execfile("SUBSCRIPTS/CLEAR.py", {})
        ## On exception (fail)...
        except Exception:
            ## Write the entire traceback error to file...
            errors.write(traceback.format_exc() + '\n')
            errors.write("\n\n")

        ## Close and exit the error logging file. 
        errors.close()

        ## Update time
        currenttime = strftime('%H:%M:%S')

    ## Idle time
    while currenttime >= '23:40:00' and currenttime <= '23:59:59' or currenttime >= '00:00:00' and currenttime <= '11:30:00':

        ## Update time
        currenttime = strftime('%H:%M:%S')
        print currenttime, "Idling..."
        sleep(10)

        ## Update time
        currenttime = strftime('%H:%M:%S')

    ## Initiate the scheduler.
    runtime += random.randrange(intsmall, intlarge)
    s.enter(runtime, 1, function, ())
    s.run()

def execute_subscripts():

    st = time()
    print "Running..."
    errors = open('MISC/ERROR(S).txt', 'a')

    try: 
        execfile("SUBSCRIPTS/TESTSCRIPT.py", {})
    except Exception:
        errors.write(traceback.format_exc() + '\n')
        errors.write("\n\n")

    try: 
        execfile("SUBSCRIPTS/TEST.py", {})
    except Exception:
        errors.write(traceback.format_exc() + '\n')
        errors.write("\n\n")
##    subprocess.Popen(["pythonw", "SUBSCRIPTS/TEST.py", "0"], shell=True)

    try: 
        execfile("SUBSCRIPTS/TESTSCRIPTTest.py", {})
    except Exception:
        errors.write(traceback.format_exc() + '\n')
        errors.write("\n\n")

    try: 
        execfile("SUBSCRIPTS/TESTTESTTEST.py", {})
    except Exception:
        errors.write(traceback.format_exc() + '\n')
        errors.write("\n\n")

    errors.close()
    print """The whole routine took %.3f seconds""" % (time() - st)

while True:
    periodically(50, -25, +90, execute_subscripts)

Any ideas would be much appreciated

Added a bounty, hopefully someone knows how to achieve this.

Thanks in advance
Hyflex

Example of what I want the script to be able to do...

  1. Subscript 1 - Run in background, send prints and errorsfrom subscript1.py to main.py, don't wait for it to finish, go to subscript 2, timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

  2. Subscript 2 - Run in background, send prints and errors from subscript2.py to main.py, wait for it to finish before going onto subscript 3, timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

  3. Subscript 3 - Run in background, send prints and errors from subscript3.py to main.py, wait for it to finish before going onto subscript 4, timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

  4. Subscript 4 - Run in background, send prints and errors from subscript4.py to main.py, don't wait for it to finish, go to subscript 5, timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

  5. Subscript 5 - Run in background, send prints and errors from subscript5.py to main.py, wait for it to finish before going onto next subscript (or in this case, end of loop), timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

Prints and Traceback for shx2

[pid=9940] main running command: C:\Python27\python.exe SUB/subscript1.py (is_bg=False)
[pid=9940] main running command: C:\Python27\python.exe SUB/subscript1.py (is_bg=True)

Traceback (most recent call last):
  File "C:\Test\main.py", line 21, in <module>
    bg_proc1 = run_subscript(cmd, is_bg = True)
  File "C:\Test\main.py", line 10, in run_subscript
    return (cmd > sys.stdout) & BG  # run in background
  File "C:\Python27\lib\site-packages\plumbum\commands\modifiers.py", line 81, in __rand__
    return Future(cmd.popen(), self.retcode)
  File "C:\Python27\lib\site-packages\plumbum\commands\base.py", line 317, in popen
    return self.cmd.popen(args, **kwargs)
  File "C:\Python27\lib\site-packages\plumbum\commands\base.py", line 233, in popen
    return self.cmd.popen(self.args + list(args), **kwargs)
  File "C:\Python27\lib\site-packages\plumbum\machines\local.py", line 104, in popen
    **kwargs)
  File "C:\Python27\lib\site-packages\plumbum\machines\local.py", line 253, in _popen
    stderr = stderr, cwd = str(cwd), env = env, **kwargs)  # bufsize = 4096
  File "C:\Python27\lib\subprocess.py", line 703, in __init__
    errread, errwrite) = self._get_handles(stdin, stdout, stderr)
  File "C:\Python27\lib\subprocess.py", line 851, in _get_handles
    c2pwrite = msvcrt.get_osfhandle(stdout.fileno())
UnsupportedOperation: fileno

EDIT: http://i.imgur.com/rmXtrOq.png

             | --> # Sub 1.py # --> Sequential with timeout --> Started: 11:30.00 --> Estimated Completion: 11:30.01 (1 Second) --> Timeout at 11:30:10 (10 Seconds) --> # Sub 2.py # --> Sequential with timeout --> Started: 11:30.02 (or after time Sub 1.py's timeout) --> Estimated Completion: 11:30.03 (1 Second) --> Timeout at 11:30:13 (10 Seconds) --> # Sub 3.py # --> Sequential with timeout --> Started: 11:30.04 (or after time Sub 2.py's timeout) --> Estimated Completion: 11:30.08 (3 Seconds) --> Timeout at 11:30:18 (10 Seconds)
             |                                                                                                                                                  ^                                                                                                                                                                             ^
             |                                                                                                                                                  |                                                                                                                                                                             |
             | --------------------------------------------------------------------------------------------------------------------------------------------------                                                                                                                                                                             |
             | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
             |
Scheduler -->|
             | --> Sub 4.py --> Nonsequential with timeout --> Started: 11:30.00 --> Estimated Completion: 11:30.05 (5 Seconds) --> Timeout at 11:30:10 (15 Seconds)
             |
             | --> Sub 5.py --> Nonsequential with timeout --> Started: 11:30.00 --> Estimated Completion: 11:30.02 (2 Seconds) --> Timeout at 11:30:10 (10 Seconds)
             |
             | --> Sub 6.py --> Nonsequential with timeout --> Started: 11:30.00 --> Estimated Completion: 11:30.10 (10 Seconds) --> Timeout at 11:30:10 (25 Seconds)

Hopefully this helps with a visual representation of what I'm trying to acheive


Solution

  • If I understood what you are trying to do, subprocess.Popen() is the way to go. Here's a simple class which I think can provide all functionality you want:

    from time import sleep
    import subprocess
    import datetime
    import os
    
    class Worker:
    
        def __init__(self, cmd):
    
            print datetime.datetime.now(), ":: starting subprocess :: %s"%cmd
            self.cmd = cmd
            self.log = "[running :: %s]\n"%cmd
            self.subp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            self.start_time = datetime.datetime.now()
    
        def wait_to_finish(self, timeout_seconds = None):
    
            while True:
                retcode = self.subp.poll()
                if retcode is not None:
                    self.get_process_output()
                    self.log += "\n[subprocess finished, return code: %d]\n"%retcode
                    print datetime.datetime.now(), ":: subprocess %s exited, retcode=%d"%(self.cmd, retcode)
                    return
                else:
                    # process hasn't finished yet
                    sleep(1)
                    if timeout_seconds is not None:
                        cur_time = datetime.datetime.now()
                        if (cur_time - self.start_time).seconds > timeout_seconds:
                            print datetime.datetime.now(), ":: subprocess %s :: killing after %d seconds"%(self.cmd, timeout_seconds)
                            self.kill()
                            return
    
        def still_running(self):
            return (self.subp.poll() is None)
    
        def kill(self):
            self.subp.terminate()
            self.get_process_output()
            self.log += "\n[subprocess killed by explicit request]\n"
            return
    
        def get_process_output(self):
            out, err = self.subp.communicate()
            self.log += out
            self.log += err
    

    You give the command and the class starts it in the background. You can then wait while it finishes, with optional timeout (counted from the time process has been started). You can get process output, and if needed explicitly kill the process.

    Here's just a quick example showing it's functionality:

    # Start two subprocesses in the background
    worker1 = Worker([r'c:\python26\python.exe', 'sub1.py'])
    worker2 = Worker([r'c:\python26\python.exe', 'sub2.py'])
    
    # Wait for both to finish, kill after 10 seconds timeout
    worker1.wait_to_finish(timeout_seconds = 10)
    worker2.wait_to_finish(timeout_seconds = 10)
    
    # Start another subprocess giving it 5 seconds to finish
    worker3 = Worker([r'c:\python26\python.exe', 'sub3.py'])
    worker3.wait_to_finish(timeout_seconds = 5)
    
    print "----LOG1----\n" + worker1.log
    print "----LOG2----\n" + worker2.log
    print "----LOG3----\n" + worker3.log
    

    sub1.py:

    from time import sleep
    print "sub1 output: start"
    sleep(5)
    print "sub1 output: finish"
    

    sub2.py:

    print "sub2 output: start"
    erroneous_command()
    

    sub3.py:

    from time import sleep
    import sys
    print "sub3 output: start, sleeping 15 sec"
    sys.stdout.flush()
    sleep(15)
    print "sub3 output: finish"
    

    Here's the output:

    2013-11-06 15:31:17.296000 :: starting subprocess :: ['c:\\python26\\python.exe', 'sub1.py']
    2013-11-06 15:31:17.300000 :: starting subprocess :: ['c:\\python26\\python.exe', 'sub2.py']
    2013-11-06 15:31:23.306000 :: subprocess ['c:\\python26\\python.exe', 'sub1.py'] exited, retcode=0
    2013-11-06 15:31:23.309000 :: subprocess ['c:\\python26\\python.exe', 'sub2.py'] exited, retcode=1
    2013-11-06 15:31:23.310000 :: starting subprocess :: ['c:\\python26\\python.exe', 'sub3.py']
    2013-11-06 15:31:29.314000 :: subprocess ['c:\\python26\\python.exe', 'sub3.py'] :: killing after 5 seconds
    ----LOG1----
    [running :: ['c:\\python26\\python.exe', 'sub1.py']]
    sub1 output: start
    sub1 output: finish
    
    [subprocess finished, return code: 0]
    
    ----LOG2----
    [running :: ['c:\\python26\\python.exe', 'sub2.py']]
    sub2 output: start
    Traceback (most recent call last):
      File "sub2.py", line 2, in <module>
        erroneous_command()
    NameError: name 'erroneous_command' is not defined
    
    [subprocess finished, return code: 1]
    
    ----LOG3----
    [running :: ['c:\\python26\\python.exe', 'sub3.py']]
    sub3 output: start, sleeping 15 sec
    
    [subprocess killed by explicit request]
    

    As far as implementing the scheduling goes, I can suggest couple of options but the choice really depends on what your task is:

    1) If you can specify the precise scheduling at any point in time, then you can implement a fully synchronous scheduler:

    while True:
        # check time
        # check currently running processes :: workerX.still_running()
        #   -> if some are past their timeout, kill them workerX.kill()
        # start new subprocesses according to your scheduling logic
        sleep(1)
    

    2) If you have several well-defined sequences of scripts which you want just "fire-and-forget" every 10 seconds, then put each sequence in its own .py script (with 'import Worker'), and start all sequences every 10 seconds, also periodically checking which sequences have exited to collect their logs.

    3) If your sequences are defined dynamically and you prefer "fire-and-forget" approach, then threads would be best approach.