Search code examples
pysparkuser-defined-functionsgoogle-cloud-dataprocnamed-entity-recognitionspacy-3

No module named 'spacy' in PySpark


I am attempting to perform some entity extraction, using a custom NER spaCy model. The extraction will be done over a Spark Dataframe, and everything is being orchestrated in a Dataproc cluster (using a Jupyter Notebook, available in the "Workbench"). The code I am using, looks like follows:

# IMPORTANT: NOTICE THIS CODE WAS RUN FROM A JUPYTER NOTEBOOK (!)

import pandas as pd
import numpy as np
import time

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import ArrayType, StringType

spark = SparkSession.builder.appName('SpacyOverPySpark') \
                    .getOrCreate()


# FUNCTIONS DEFINITION

def load_spacy_model():
    import spacy
    print("Loading spacy model...")
    return spacy.load("./spacy_model")  # This model exists locally


@pandas_udf(ArrayType(StringType()))
def entities(list_of_text: pd.Series) -> pd.Series:
    # retrieving the shared nlp object
    nlp = broadcasted_nlp.value
    # batch processing our list of text
    docs = nlp.pipe(list_of_text)
    # entity extraction (`ents` is a list[list[str]])
    ents=[
        [ent.text for ent in doc.ents]
        for doc in docs
    ]
    return pd.Series(ents)


# DUMMY DATA FOR THIS TEST

pdf = pd.DataFrame(
    [
        "Pyhton and Pandas are very important for Automation",
        "Tony Stark is a Electrical Engineer",
        "Pipe welding is a very dangerous task in Oil mining",
        "Nursing is often underwhelmed, but it's very interesting",
        "Software Engineering now opens a lot of doors for you",
        "Civil Engineering can get exiting, as you travel very often",
        "I am a Java Programmer, and I think I'm quite good at what I do",
        "Diane is never bored of doing the same thing all day",
        "My father is a Doctor, and he supports people in condition of poverty",
        "A janitor is required as soon as possible"
    ],
    columns=['postings']
)
sdf=spark.createDataFrame(pdf)


# MAIN CODE

# loading spaCy model and broadcasting it
broadcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())
# Extracting entities
df_new = sdf.withColumn('skills',entities('postings'))
# Displaying results
df_new.show(10, truncate=20)

The error code I am getting, looks similar to this, but the answer does not apply for my case, because it deals with "executing a Pyspark job in Yarn" which is different (or so I think, feel free to correct me). Plus, I have also found this, but the answer is rather vague (I gotta be honest here: the only thing I have done to "restart the spark session" is to run spark.stop() in the last cell of my Jupyter Notebook, and then run the cells above again, feel free to correct me here too).

The code used was heavily inspired by "Answer 2 of 2" in this forum, which makes me wonder if some missing setting is still eluding me (BTW, "Answer 1 of 2" was already tested but did not work). And regarding my specific software versions, they can be found here.

Thank you.

CLARIFICATIONS:

Because some queries or hints generated in the comment section can be lengthy, I have decided to include them here:

  • No. 1: "Which command did you use to create your cluster?" : I used this method, so the command was not visible "at plain sight"; I have just realized however that, when you are about to create the cluster, you have an "EQUIVALENT COMMAND LINE" button, that grants access to such command:

enter image description here

In my case, the Dataproc cluster creation code (automatically generated by GCP) is:

gcloud dataproc clusters create my-cluster \
--enable-component-gateway \
--region us-central1 \
--zone us-central1-c \
--master-machine-type n1-standard-4 \
--master-boot-disk-size 500 \
--num-workers 2 \
--worker-machine-type n1-standard-4 \
--worker-boot-disk-size 500 \
--image-version 2.0-debian10 \
--optional-components JUPYTER \
--metadata PIP_PACKAGES=spacy==3.2.1 \
--project hidden-project-name

Notice how spaCy is installed in the metadata (following these recommendations); however running pip freeze | grep spacy command, right after the Dataproc cluster creation, does not display any result (i.e., spaCy does NOT get installed successfully). To enable it, the official method is used afterwards.

  • No. 2: "Wrong path as possible cause" : Not my case, it actually looks similar to this case (even when I can't say the root case is the same for both):
    • Running which python shows /opt/conda/miniconda3/bin/python as result.
    • Running which spacy (read "Clarification No. 1") shows /opt/conda/miniconda3/bin/spacy as result.

Solution

  • I managed to solve this issue, by combining 2 pieces of information:

    • "Configure Dataproc Python environment", "Dataproc image version 2.0" (as that is the version I am using): available here (special thanks to @Dagang in the comment section).
    • "Create a (Dataproc) cluster": available here.

    In specific, during the Dataproc cluster setup via Google Console, I "installed" spaCy by doing:

    enter image description here

    And when the cluster was already created, I ran the code mentioned in my original post (NO modifications) with the following result:

    enter image description here

    That solves my original question. I am planning to apply my solution on a larger dataset, but I think whatever happen there, is subject of a different thread.