I am attempting to perform some entity extraction, using a custom NER spaCy model. The extraction will be done over a Spark Dataframe, and everything is being orchestrated in a Dataproc cluster (using a Jupyter Notebook, available in the "Workbench"). The code I am using, looks like follows:
# IMPORTANT: NOTICE THIS CODE WAS RUN FROM A JUPYTER NOTEBOOK (!)
import pandas as pd
import numpy as np
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import ArrayType, StringType
spark = SparkSession.builder.appName('SpacyOverPySpark') \
.getOrCreate()
# FUNCTIONS DEFINITION
def load_spacy_model():
import spacy
print("Loading spacy model...")
return spacy.load("./spacy_model") # This model exists locally
@pandas_udf(ArrayType(StringType()))
def entities(list_of_text: pd.Series) -> pd.Series:
# retrieving the shared nlp object
nlp = broadcasted_nlp.value
# batch processing our list of text
docs = nlp.pipe(list_of_text)
# entity extraction (`ents` is a list[list[str]])
ents=[
[ent.text for ent in doc.ents]
for doc in docs
]
return pd.Series(ents)
# DUMMY DATA FOR THIS TEST
pdf = pd.DataFrame(
[
"Pyhton and Pandas are very important for Automation",
"Tony Stark is a Electrical Engineer",
"Pipe welding is a very dangerous task in Oil mining",
"Nursing is often underwhelmed, but it's very interesting",
"Software Engineering now opens a lot of doors for you",
"Civil Engineering can get exiting, as you travel very often",
"I am a Java Programmer, and I think I'm quite good at what I do",
"Diane is never bored of doing the same thing all day",
"My father is a Doctor, and he supports people in condition of poverty",
"A janitor is required as soon as possible"
],
columns=['postings']
)
sdf=spark.createDataFrame(pdf)
# MAIN CODE
# loading spaCy model and broadcasting it
broadcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())
# Extracting entities
df_new = sdf.withColumn('skills',entities('postings'))
# Displaying results
df_new.show(10, truncate=20)
The error code I am getting, looks similar to this, but the answer does not apply for my case, because it deals with "executing a Pyspark job in Yarn" which is different (or so I think, feel free to correct me). Plus, I have also found this, but the answer is rather vague (I gotta be honest here: the only thing I have done to "restart the spark session" is to run spark.stop()
in the last cell of my Jupyter Notebook, and then run the cells above again, feel free to correct me here too).
The code used was heavily inspired by "Answer 2 of 2" in this forum, which makes me wonder if some missing setting is still eluding me (BTW, "Answer 1 of 2" was already tested but did not work). And regarding my specific software versions, they can be found here.
Thank you.
CLARIFICATIONS:
Because some queries or hints generated in the comment section can be lengthy, I have decided to include them here:
In my case, the Dataproc cluster creation code (automatically generated by GCP) is:
gcloud dataproc clusters create my-cluster \
--enable-component-gateway \
--region us-central1 \
--zone us-central1-c \
--master-machine-type n1-standard-4 \
--master-boot-disk-size 500 \
--num-workers 2 \
--worker-machine-type n1-standard-4 \
--worker-boot-disk-size 500 \
--image-version 2.0-debian10 \
--optional-components JUPYTER \
--metadata PIP_PACKAGES=spacy==3.2.1 \
--project hidden-project-name
Notice how spaCy
is installed in the metadata (following these recommendations); however running pip freeze | grep spacy
command, right after the Dataproc cluster creation, does not display any result (i.e., spaCy does NOT get installed successfully). To enable it, the official method is used afterwards.
which python
shows /opt/conda/miniconda3/bin/python
as result.which spacy
(read "Clarification No. 1") shows /opt/conda/miniconda3/bin/spacy
as result.I managed to solve this issue, by combining 2 pieces of information:
In specific, during the Dataproc cluster setup via Google Console, I "installed" spaCy by doing:
And when the cluster was already created, I ran the code mentioned in my original post (NO modifications) with the following result:
That solves my original question. I am planning to apply my solution on a larger dataset, but I think whatever happen there, is subject of a different thread.