I have been tasked lately, to ingest JSON responses onto Databricks Delta-lake. I have to hit the REST API endpoint URL 6500 times with different parameters and pull the responses.
I have tried two modules, ThreadPool and Pool from the multiprocessing library, to make each execution a little quicker.
ThreadPool:
Right now, I've set n_pool = multiprocessing.cpu_count(), will it make any difference, if the cluster auto-scales?
Pool
Py4JError: SparkConf does not exist in the JVM
**OR**
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
if you're using thread pools, they will run only on the driver node, executors will be idle. Instead you need to use Spark itself to parallelize the requests. This is usually done by creating a dataframe with list of URLs (or parameters for URL if base URL is the same), and then use Spark user defined function to do actual requests. Something like this:
import urllib
df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")],
("url", "params"))
@udf("body string, status int")
def do_request(url: str, params: str):
full_url = url + "?" + params # adjust this as required
with urllib.request.urlopen(full_url) as f:
status = f.status
body = f.read().decode("utf-8")
return {'status': status, 'body': body}
res = df.withColumn("result", do_requests(col("url"), col("params")))
This will return dataframe with a new column called result
that will have two fields - status
and body
(JSON answer as string).