Search code examples
apache-sparkpysparkamazon-emr

PySpark UDF optimization challenge


I am trying to optimize the code below. The when run with 1000 lines of data takes about 12 minutes to complete. Our use case would require data sizes to be around 25K - 50K rows which would make this implementation completely infeasible.

import pyspark.sql.types as Types
import numpy
import spacy
from pyspark.sql.functions import udf

inputPath = "s3://myData/part-*.parquet"
df = spark.read.parquet(inputPath)

test_df = df.select('uid', 'content').limit(1000).repartition(10)

# print(df.rdd.getNumPartitions()) -> 4
# print(test_df.rdd.getNumPartitions()) -> 1

def load_glove(fn):
    vector_dict = {}
    count = 0
    with open(fn) as inf:
        for line in inf:
            count += 1
            eles = line.strip().split()
            token = eles[0]
            try:
                vector_dict[token] = numpy.array([float(x) for x in eles[1:]])
                assert len(vector_dict[token]) == 300
            except:
                print("Exception in load_glove")
                pass
    return vector_dict

# Returning an Array of doubles from the udf
@udf(returnType=Types.ArrayType(Types.FloatType()))
def generateVectorRepresentation(text):
  # TODO: move the load function out if posible, and remove unused modules 
  # nlp = spacy.load('en', disable=['parser', 'tagger'])
  nlp = spacy.load('en', max_length=6000000)
  gloveEmbeddingsPath = "/home/hadoop/short_glove_1000.300d.txt"
  glove_embeddings_dict = load_glove(gloveEmbeddingsPath)
  spacy_doc = nlp(text)
  doc_vec = numpy.array([0.0] * 300)
  doc_vec = numpy.float32(doc_vec)
  wordcount = 0
  for sentence_id, sentence in enumerate(spacy_doc.sents):
      for word in sentence:
          if word.text in glove_embeddings_dict:
              # Pre-convert to glove dictionary to float32 representations
              doc_vec += numpy.float32(glove_embeddings_dict[word.text])
              wordcount += 1

  # Document Vector is the average of all word vectors in the document
  doc_vec = doc_vec/(1.0 * wordcount)
  return doc_vec.tolist()

spark.udf.register("generateVectorRepresentation", generateVectorRepresentation)

document_vector_df = test_df.withColumn("Glove Document Vector", generateVectorRepresentation('content'))

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_document_vector_df = document_vector_df.toPandas()

# print(pandas_document_vector_df)
pandas_document_vector_df.head()

I was wondering if you guys could help answer the questions below

Is the spacy.load() and load_glove() method invoked on every iteration? Is there a way to prepare the load_glove() data once for every worker node instead of once for every line of data? The load_glove method returns a dictionary object which could be as large as 5GB. Is there a way to prepare that on the master node and then pass as a parameter to the UDF?

Appreciate your suggestions. Thanks in advance!


Solution

  • Yes, in the current implementation, all the model-loading code would be executed each time your function is run, which is far from optimal. There's no way to pass it from driver to worker nodes directly, but there's a similar way - initialize model on each worker, but only once. For that, you'll have to use lazy function, that will be executed only when the actual result would be required - so, on workers.

    Try doing something like this:

    # Here we are not loading the model at the loading time, only the worker code
    # will invoke this routine and gets the spacy object. Which means we are loading
    # new spacy models on every executors.
    SPACY_MODEL = None
    def get_spacy_model():
        global SPACY_MODEL
        if not SPACY_MODEL:
           _model = spacy.load('en', max_length=6000000)
        SPACY_MODEL = _model
        return SPACY_MODEL
    
    @udf(returnType=Types.ArrayType(Types.FloatType()))
    def generateVectorRepresentation(text):
      # TODO: move the load function out if posible, and remove unused modules 
      # nlp = spacy.load('en', disable=['parser', 'tagger'])
      nlp = get_spacy_model()
      # your further processing
    

    I think you can try adding glove loading code into a similar function.

    You can try reading more about that here: https://haridas.in/run-spacy-jobs-on-apache-spark.html (this is not my blog, just found this info while trying to do same thing you need with Spacy model).