Search code examples
pythonparallel-processingluigi

luigi batch module used in is straight batch Tasks


I have 500 links to download and want to batch them by for example 10 items.

How this pseudo code would be like?

class BatchJobTask(luigi.Task)
    items = luigi.Parameter()
    def run(self):
        listURLs = []
        with ('urls_chunk', 'r') as urls
            for line in urls:
                listURLs.append('http://ggg'+line+'.org')
            10_urls = listURLs[0:items] #10 items here
            for i in 10_urls:
                req = request.get(url)
                req.contents
    def output(self):
        return self.LocalTarger("downloaded_filelist.txt")

class BatchWorker(luigi.Task)
    def run(self)
        # Here I should run BatchJobTask from 0 to 10, next 11 - 21 new etc...

How it would be?


Solution

  • Here is an approach to doing something like what you want, but with the list of strings stored as separate lines in a file.

    import luigi
    import requests
    
    BATCH_SIZE = 10
    
    
    class BatchProcessor(luigi.Task):
        items = luigi.ListParameter()
        max = luigi.IntParameter()
    
        def requires(self):
            return None
    
        def output(self):
            return luigi.LocalTarget('processed'+str(max)+'.txt')
    
        def run(self):
            for item in self.items:
                req = requests.get('http://www.'+item+'.org')
                # do something useful here
                req.contents
            open("processed"+str(max)+".txt",'w').close()
    
    
    class BatchCreator(luigi.Task):
        file_with_urls = luigi.Parameter()
    
        def requires(self):
            required_tasks = []
            f = open(self.file_with_urls)
            batch_index = 0
            total_index = 0
            lines = []
            while True:
                line = f.readline()
                if not line: break
                total_index += 1
                if batch_index < BATCH_SIZE:
                    lines.append(line)
                    batch_index += 1
                else:
                    required_tasks.append(BatchProcessor(batch=lines))
                    lines = [line]
                    batch_index = 1
            return required_tasks
    
        def output(self):
            return luigi.LocalTarget(str(self.file_with_urls) + 'processed')
    
        def run(self):
            open(str(self.file_with_urls) + 'processed', 'w').close()