Search code examples
jsonapache-sparkdatabricksdatabricks-unity-catalog

sc parallelize function not working with unity catalog


We have this code worjking fine with databricks runtime 10

request_url = https://.com/?fct=Get
response_task = requests.get(url=request_url, headers=headers)
db1 = spark.sparkContext.parallelize([response_task.text])`
df2 = spark.read.json(db1)

we have this error using runtime 13.3 due to limitation is there another way to serialize [response_task.text] with large volumes:

An error occurred while calling o398.createRDDFromTrustedPath. Trace: py4j.security.Py4JSecurityException: Method public org.apache.spark.api.java.JavaRDD org.apache.spark.sql.SparkSession.createRDDFromTrustedPath(java.lang.String,int) is not whitelisted on class class org.apache.spark.sql.SparkSession at py4j.security.WhitelistingPy4JSecurityManager.checkCall(WhitelistingPy4JSecurityManager.java:473) at py4j.Gateway.invoke(Gateway.java:305) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195) at py4j.ClientServerConnection.run(ClientServerConnection.java:115) at java.lang.Thread.run(Thread.java:750)

response_task = requests.get(url=request_url_gettasks, headers=headers)

response_text = response_task.text

db1 = spark.read.json([response_text])

com.databricks.rpc.UnexpectedHttpException: Got invalid response: 502

This code seems to work (to be tested with volumes)

response_task = requests.get(url=request_url_gettasks, headers=headers)
response_text = response_task.text
task_data = json.loads(response_text)["results"]
schema = StructType([
    StructField("PROJECT_ID", IntegerType()),
    StructField("PROJECT_NAME", StringType()),
    StructField("PROJECT_NUMBER", StringType()),
    StructField("PROJECT_TYPE_NAME", StringType()),
    ....)
df2 = spark.createDataFrame(task_data, schema)

Solution

  • This code seems to work (to be tested with volumes)

    response_task = requests.get(url=request_url_gettasks, headers=headers)
    response_text = response_task.text
    task_data = json.loads(response_text)["results"]
    schema = StructType([
        StructField("PROJECT_ID", IntegerType()),
        StructField("PROJECT_NAME", StringType()),
        StructField("PROJECT_NUMBER", StringType()),
        StructField("PROJECT_TYPE_NAME", StringType())
        )
    df2 = spark.createDataFrame(task_data, schema)