I am running pyspark code with JAVA UDF in databricks. I have r6id.xlarge (32g) driver and node of r6id.4xlarge (128) worker. I am reading only one file and my java UDF is just calling an open source X12 java lib to parse the file as a whole Sample code below, this works for files which are less then 100MB
df = spark.read.format('text').option('wholetext', True).load("s3://xxxx/xxxxxxx")
spark.udf.registerJavaFunction("x12_parser","com.abc", pst.StringType())
df.select(expr(x12_parser(input_columns)))
Whenever I parse big file (just one file), I will get error java.lang.OutOfMemoryError: Requested array size exceeds VM limit
. When I parse this file locally, if I increase my heap size to 20g it will work otherwise same error.
but as my worker node is way larger than this.(I am in databricks so no need to configure executor memory and set -Xmx is not permitted)
I also tried to call my function directly like below
import boto3
s3 = boto3.client('s3')
bucket_name = 'xxxx'
key = 'xxxxxxx'
response = s3.get_object(Bucket=bucket_name, Key=key)
contents = response['Body'].read().decode('utf-8')
parser_class = spark._jvm.com.abc.x12_parser()
output = parser_class.call(contents)
This will work fine even when my driver is 4 times smaller than worker without touch java heap size. I tried to play with some spark setting like network timeout and spark.executor.extraJavaOptions -Xms20g -XX:+UseCompressedOops but none of them works.
I can't explain why with a huge worker I can't process the same file I can on much smaller driver or my local
it seems like there is some internal UDF memory limitation. I stopped using UDF and change the code to do map from java side