Search code examples
pysparkbatch-processing

How to send a bunch of dataframe records to an API using PySpark


How to batch send dataframe tuples to an API.

headers = {
    'Content-Type': 'application/json',
    'Accept': '*/*'
}

data = {"some_key": "some_value", "another_key": "another_value" }
r = requests.post('https://api.somewhere/batch', params={}, headers=headers, json=data)

If the JSON payload is from a DataFrame in PySpark, how can I take advantage of Spark to batch process this currently single threaded approach?


Solution

  • You can convert a dataframe to JSON:

    def batch_json(row):
        # Anything you want to process with every row/partition
        r = requests.post('https://api.somewhere/batch', params={}, headers=headers, json=row)
        print(r.status_code)
    
    df.toJSON().foreach(batch_json)
    
    # OR 
    # "batch_json" cannot be used as it is, you will have to change it according to your need
    
    df.toJSON().foreachPartition(batch_json)
    

    Quick testing code:

    def batch(row):
        print(row)
    
    df.toJSON().foreach(batch)