Search code examples
pythonapache-sparklambdapysparkrdd

Spark - missing 1 required position argument (lambda function)


I'm trying to distribute some text extraction from PDFs between multiple servers using Spark. This is using a custom Python module I made and is an implementation of this question. The 'extractTextFromPdf' function takes 2 arguments: a string representing the path to the file, and a configuration file used to determine various extraction constraints. In this case the config file is just a simple YAML file sitting in the same folder as the Python script running the extraction and the files are just duplicated between Spark servers.

The main issue I have is being able to call my extract function using the filename as the first argument, rather than the file's content. This is the basic script I have as of now, running it on 2 PDFs in the files folder:

#!/usr/bin/env python3

import ScannedTextExtractor.STE as STE

from pyspark import SparkContext
sc = SparkContext("local", "STE")

input = sc.binaryFiles("/home/ubuntu/files")
processed = input.map(lambda filename, content: (STE.extractTextFromPdf(filename,'ste-config.yaml'), content))

print("Results:")
print(processed.take(2))

This creates the lambda error Missing 1 position argument: 'content'. I don't really care about using the PDFs raw content and since the argument to my extraction function is just the path to the PDF, not the actual PDF content itself, I tried to just give 1 argument to the lambda function. e.g.

processed = input.map(lambda filename: STE.extractTextFromPdf(filename,'ste-config.yaml'))

But then I get issues because with this setup Spark sets the PDF content (as a byte stream) as this singular argument, but my module expects a string with the path to the PDF as the first arg, not the whole byte content of the PDF.

I printed the RDD of the binary file loading by the SparkContext and I can see that in there is both the filename and the file content (a byte stream of the PDF) in the RDD. But how do I use it with my custom Python module that expects the following snytax:

STE.extractTextFromPDF('/path/to/pdf','/path/to/config-file')

I've tried multiple permutations of the lambda function, I've triple checked Spark's RDD and SparkContext APIs. I can't seem to get it working.


Solution

  • If you only want the path, not the content then you should not use sc.binaryFiles. In that case you should parallelize the paths and then have the Python code load each file individually, as so:

    paths = ['/path/to/file1', '/path/to/file2']
    input = sc.parallelize(paths)
    processed = input.map(lambda path: (path, processFile(path)))
    

    This assumes of course that each executor Python process can access the files directly. This wouldn't work with HDFS or S3 for instance. Can your library not take binary content directly?