Search code examples
pythonmultiprocessingpicklepathos

Python Watchdog spawn multiprocessing


I am writing files (at a really fast pace) to a specific directory. I would like to monitor the directory for new files and then spawn a process that runs an external script. Right now, I am getting a pickling error (even though I am using Pathos) of

Can't pickle <type 'Struct'>: it's not found as __builtin__.Struct

I need help fixing the pickling error which may cause me to have to rethink what I am doing, which is fine.

Here's what I have so far:

#!/usr/bin/python

import os
import sys
import argparse
import json
import time
import os
from datetime import datetime
#Test for Pathos
from pathos.multiprocessing import ProcessingPool as Pool
from multiprocessing import cpu_count
from subprocess import check_output
import ConfigParser
import logging
#WatchDog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent

from CodernityDB.database import Database

###
# CONFIGURATION
###

CONFIG GOES HERE BUT REMOVED 

###
# Custom handler for Python Watchdog
# When spawned, it will spawn a new worker into the pool
###
class MyHandler(FileSystemEventHandler):
        def __init__(self):
                self.db = Database("/var/db/test.db")
                try:
                        self.db.open()
                except Exception, e:
                        print str(e)
                        self.db.create()

        def on_created(self, event):
                #print event.src_path
                try:
                        pool.map(doIt, (self.db, event.src_path,))
                except Exception, e:
                        print str(e)

def codernityIt(db, json_):
        try:
                print json_
                db.insert(json_)
        except Exception, e:
                print str(e)
                logging.error(str(e))

def doIt(db, file_):
        try:
                codernityIt(db, json.loads(check_output(['python', '/external/script.py', file_])))
        except Exception, e:
                print str(e)
                logging.error(str(e))

if __name__ == '__main__':
        ###
        # Pool specific Settings
        ###
        pool = Pool(processes=cpu_count())
        event_handler = MyHandler()
        ###
        # Watchdog specific settings
        ###
        observer = Observer()
        observer.schedule(event_handler, path=watchPath, recursive=True)
        observer.start()

        ###
        # This While True loop listens for Keyboard interrupts and will gracefully exit the program if found
        ###
        try:
                while True:
                        time.sleep(1)
        except KeyboardInterrupt:
                observer.unschedule_all()
                observer.stop()
                db.close()
        #observer.join()

Solution

  • Why don't you try inotify? It may help you: https://pypi.python.org/pypi/inotify