Search code examples
palantir-foundryjohnsnowlabs-spark-nlp

Palantir Foundry - run Spark-NLP library offline


I am trying to run the spark-nlp library offline in Palantir Foundry. We do not have egress configured to make http calls, so I'm attempting to use spark-nlp in offline mode by downloading explain_document_dl from the spark-nlp models hub. This is just a simple example I found in the spark-nlp quick start, I don't really have a goal with it other than to get it to work in Foundry.

Using spark-nlp version 5.1.2 from PYPI, and I followed these oddly specific instructions in the Palantir public docs to load the spark-nlp library so I know this has to be possible.

The method below of loading/extracting a zip file from the Foundry dataset filesystem worked to load BERTopic, so trying to apply it here. There are 6 different versions of explain_document_dl on the models hub for spark-nlp 5.1, and it's not really clear to me how they are different other than the version numbers. I've tried loading all of them unsuccessfully. I also tried loading the pretrained model to see if I could tell which version it was reaching for through http, no luck.

pipeline = PretrainedPipeline('explain_document_dl', lang='en') 

I'm not sure if this is a Foundry problem or an issue with how I'm trying to run spark-nlp offline, or both, so I'm including the spark-nlp tag.

from transforms.api import transform, Input, Output
from sparknlp.base import PipelineModel
# from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
# import sparknlp
from zipfile import ZipFile
import tempfile
import os
import shutil
import base64


def download_file(filesystem, input_file_path, local_file_path=None, base64_decode=False):
    """
    Download a file from a Foundry dataset to the local filesystem.
    If the input_file_path is None, a temporary file is created, which you must delete yourself after using it.
    :param filesystem: an instance of transform.api.FileSystem
    :param input_file_path: logical path on the Foundry dataset to download from
    :param local_file_path: path of the file to download to on the local file system (default=None)
    :base64_encode: if set to True, decode data using base64 (default=False)
    :return: str path of the downloaded file on the local file system
    """

    # Check if a different temp directory is specified in the Spark environment, and use it if so
    TEMPDIR_OVERRIDE = os.getenv('FOUNDRY_PYTHON_TEMPDIR')
    tempfile.tempdir = TEMPDIR_OVERRIDE if TEMPDIR_OVERRIDE is not None else tempfile.tempdir

    if local_file_path is None:
        _, local_file_path = tempfile.mkstemp()

    if base64_decode:
        _, download_file_path = tempfile.mkstemp()
    else:
        download_file_path = local_file_path

    try:
        with filesystem.open(input_file_path, 'rb') as f_in, open(download_file_path, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

        if base64_decode:
            with open(download_file_path, 'rb') as fin, open(local_file_path, 'wb') as fout:
                base64.decode(fin, fout)

        return local_file_path

    finally:
        if base64_decode:
            os.remove(download_file_path)


@transform(
    raw_model=Input("ri.foundry.main.dataset.6b9128f7-dc4b-4c99-81c8-94fb4b0f9ab4"),
    model_output=Output("ri.foundry.main.dataset.67b42a35-5105-41ef-83f4-b9ccfd347d93"),
)
def compute(raw_model, model_output):
    # Offline mode
    # https://github.com/JohnSnowLabs/spark-nlp#offline
    temp_dir = tempfile.mkdtemp()
    model_package = download_file(
        raw_model.filesystem(),
        "explain_document_dl_en_4.4.2_3.2_1685186531034.zip",
        "{}/explain_document_dl_en_4.4.2_3.2_1685186531034.zip".format(temp_dir)
    )
    with ZipFile(model_package, "r") as zObject:
        # Extract all files from zip and put them in temp directory
        explain_document_dl = tempfile.mkdtemp()
        zObject.extractall(path=explain_document_dl)

        pipeline = PipelineModel.load(explain_document_dl)

    # Your testing dataset
    text = "The Mona Lisa is a 16th century oil painting created by Leonardo. It's held at the Louvre in Paris."

    # Annotate your testing dataset
    result = pipeline.annotate(text)

    # What's in the pipeline
    list(result.keys())

    # Check the results
    result['entities']
    model_output.write_dataframe(result)
    zObject.close()

Error building this transform - I see it's not finding a file it's expecting, but I don't understand what it's looking for since I'm just loading zip files from spark-nlp that I'm figuring have everything needed.

{ "errorCode": "CUSTOM_CLIENT", "errorName": "Spark:JobAborted", "errorInstanceId": "d0ec47f9-628c-411c-be10-ac3b1c81b941", "safeArgs": { "pythonVersion": "3.10.12", "exceptionClass": "java.io.FileNotFoundException", "message": "ServiceException: CUSTOM_CLIENT (Spark:JobAborted)" }, "unsafeArgs": { "stacktrace": "org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4) : java.io.FileNotFoundException: File file:/tmp/tmp67p250jr/metadata/part-00000 does not exist\n\tat


Solution

  • The reason you're getting this issue is that the PipelineModel.load(...) call requires that the full model folder structure be available on the driver and all executors - your logic is only unzipping and making the path available on the driver. You can fix this either by providing a path that is available on the driver and executors (in Foundry this is a shared HDFS path), or by just not using any executors.

    First option:

    The simpler approach is definitely to not use any executors and you can achieve this by adding the @configure(profile=["KUBERNETES_NO_EXECUTORS"]) profile to your transforms build. However, this method has the downside of not benefiting from spark distribution.

    Second option:

    If you need the spark distribution then you can use the hadoop path, but you'll need to upload the unzipped files to the Foundry raw dataset:

    1. Unzip the files locally
    2. Upload the directory as is into the foundry dataset (it is critical that you preserving the folder structure as this is required by Spark ML).
    # KUBERNETES_NO_EXECUTORS not required for this option
    @transform(
        raw_model=Input("<rid>"),
        model_output=Output("<rid>"),
    )
    def compute(raw_model, model_output):
        path = raw_model.filesystem().hadoop_path + '/<root_for_chosen_model>'
        pipeline = PipelineModel.load(path)
    
        # Your testing dataset
        text = "The Mona Lisa is a 16th century oil painting created by Leonardo. It's held at the Louvre in Paris."
    
        # Annotate your testing dataset
        result = pipeline.annotate(text)
    
        # What's in the pipeline
        list(result.keys())
    
        # Check the results
        result['entities']
        model_output.write_dataframe(result)
        zObject.close()
    

    Hopefully this helps!