I am trying to run the spark-nlp library offline in Palantir Foundry. We do not have egress configured to make http calls, so I'm attempting to use spark-nlp in offline mode by downloading explain_document_dl from the spark-nlp models hub. This is just a simple example I found in the spark-nlp quick start, I don't really have a goal with it other than to get it to work in Foundry.
Using spark-nlp version 5.1.2 from PYPI, and I followed these oddly specific instructions in the Palantir public docs to load the spark-nlp library so I know this has to be possible.
The method below of loading/extracting a zip file from the Foundry dataset filesystem worked to load BERTopic, so trying to apply it here. There are 6 different versions of explain_document_dl on the models hub for spark-nlp 5.1, and it's not really clear to me how they are different other than the version numbers. I've tried loading all of them unsuccessfully. I also tried loading the pretrained model to see if I could tell which version it was reaching for through http, no luck.
pipeline = PretrainedPipeline('explain_document_dl', lang='en')
I'm not sure if this is a Foundry problem or an issue with how I'm trying to run spark-nlp offline, or both, so I'm including the spark-nlp tag.
from transforms.api import transform, Input, Output
from sparknlp.base import PipelineModel
# from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
# import sparknlp
from zipfile import ZipFile
import tempfile
import os
import shutil
import base64
def download_file(filesystem, input_file_path, local_file_path=None, base64_decode=False):
"""
Download a file from a Foundry dataset to the local filesystem.
If the input_file_path is None, a temporary file is created, which you must delete yourself after using it.
:param filesystem: an instance of transform.api.FileSystem
:param input_file_path: logical path on the Foundry dataset to download from
:param local_file_path: path of the file to download to on the local file system (default=None)
:base64_encode: if set to True, decode data using base64 (default=False)
:return: str path of the downloaded file on the local file system
"""
# Check if a different temp directory is specified in the Spark environment, and use it if so
TEMPDIR_OVERRIDE = os.getenv('FOUNDRY_PYTHON_TEMPDIR')
tempfile.tempdir = TEMPDIR_OVERRIDE if TEMPDIR_OVERRIDE is not None else tempfile.tempdir
if local_file_path is None:
_, local_file_path = tempfile.mkstemp()
if base64_decode:
_, download_file_path = tempfile.mkstemp()
else:
download_file_path = local_file_path
try:
with filesystem.open(input_file_path, 'rb') as f_in, open(download_file_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
if base64_decode:
with open(download_file_path, 'rb') as fin, open(local_file_path, 'wb') as fout:
base64.decode(fin, fout)
return local_file_path
finally:
if base64_decode:
os.remove(download_file_path)
@transform(
raw_model=Input("ri.foundry.main.dataset.6b9128f7-dc4b-4c99-81c8-94fb4b0f9ab4"),
model_output=Output("ri.foundry.main.dataset.67b42a35-5105-41ef-83f4-b9ccfd347d93"),
)
def compute(raw_model, model_output):
# Offline mode
# https://github.com/JohnSnowLabs/spark-nlp#offline
temp_dir = tempfile.mkdtemp()
model_package = download_file(
raw_model.filesystem(),
"explain_document_dl_en_4.4.2_3.2_1685186531034.zip",
"{}/explain_document_dl_en_4.4.2_3.2_1685186531034.zip".format(temp_dir)
)
with ZipFile(model_package, "r") as zObject:
# Extract all files from zip and put them in temp directory
explain_document_dl = tempfile.mkdtemp()
zObject.extractall(path=explain_document_dl)
pipeline = PipelineModel.load(explain_document_dl)
# Your testing dataset
text = "The Mona Lisa is a 16th century oil painting created by Leonardo. It's held at the Louvre in Paris."
# Annotate your testing dataset
result = pipeline.annotate(text)
# What's in the pipeline
list(result.keys())
# Check the results
result['entities']
model_output.write_dataframe(result)
zObject.close()
Error building this transform - I see it's not finding a file it's expecting, but I don't understand what it's looking for since I'm just loading zip files from spark-nlp that I'm figuring have everything needed.
{ "errorCode": "CUSTOM_CLIENT", "errorName": "Spark:JobAborted", "errorInstanceId": "d0ec47f9-628c-411c-be10-ac3b1c81b941", "safeArgs": { "pythonVersion": "3.10.12", "exceptionClass": "java.io.FileNotFoundException", "message": "ServiceException: CUSTOM_CLIENT (Spark:JobAborted)" }, "unsafeArgs": { "stacktrace": "org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4) : java.io.FileNotFoundException: File file:/tmp/tmp67p250jr/metadata/part-00000 does not exist\n\tat
The reason you're getting this issue is that the PipelineModel.load(...)
call requires that the full model folder structure be available on the driver and all executors - your logic is only unzipping and making the path available on the driver. You can fix this either by providing a path that is available on the driver and executors (in Foundry this is a shared HDFS path), or by just not using any executors.
First option:
The simpler approach is definitely to not use any executors and you can achieve this by adding the @configure(profile=["KUBERNETES_NO_EXECUTORS"])
profile to your transforms build. However, this method has the downside of not benefiting from spark distribution.
Second option:
If you need the spark distribution then you can use the hadoop path, but you'll need to upload the unzipped files to the Foundry raw dataset:
# KUBERNETES_NO_EXECUTORS not required for this option
@transform(
raw_model=Input("<rid>"),
model_output=Output("<rid>"),
)
def compute(raw_model, model_output):
path = raw_model.filesystem().hadoop_path + '/<root_for_chosen_model>'
pipeline = PipelineModel.load(path)
# Your testing dataset
text = "The Mona Lisa is a 16th century oil painting created by Leonardo. It's held at the Louvre in Paris."
# Annotate your testing dataset
result = pipeline.annotate(text)
# What's in the pipeline
list(result.keys())
# Check the results
result['entities']
model_output.write_dataframe(result)
zObject.close()
Hopefully this helps!