Search code examples
apache-sparkhadoophadoop-yarnjohnsnowlabs-spark-nlp

Loading large sparknlp pipeline into Apache Spark batch job taking too long


I am using SparkNLP from johnsnowlabs for extracting embeddings from my textual data, below is the pipeline. The size of the model is 1.8g after saving to hdfs

embeddings = BertSentenceEmbeddings.pretrained("labse", "xx") \
      .setInputCols("sentence") \
      .setOutputCol("sentence_embeddings")
nlp_pipeline = Pipeline(stages=[document_assembler, sentence_detector, embeddings])
pipeline_model = nlp_pipeline.fit(spark.createDataFrame([[""]]).toDF("text"))

I saved the pipeline_model into HDFS using pipeline_model.save("hdfs:///<path>").

The above was executed only one time

In another script, i am loading the stored pipeline from HDFS using pipeline_model = PretrainedPipeline.from_disk("hdfs:///<path>").

The above code loads the model but takes too much. I tested it on the spark local model ( no cluster ) but i had high resource 94g RAM, 32 Cores.

Later, i deployed the script on yarn with 12 Executor each with 3 cores and 7g ram. I assigned driver memory of 10g.

The script again takes too much time just to load the saved model from HDFS.

When the spark reaches at this point, it takes too much time

When the spark reaches at this point (see above screenshot), it takes too much time

I thought of an approach

Preloading

The approach which i thought was to somehow pre-load model one time into memory, and when the script wants to apply transformation on dataframe, i can somehow call the reference to the pretrained pipeline and use it on the go, without doing any disk i/o. I searched but i it lead to nowhere.

Please, do let me know what you think of this solution and what would be the best way to achieve this.

Resources on YARN

NodeName Count RAM (each) Cores (each)
Master Node 1 38g 8
Secondary Node 1 38 g 8
Worker Nodes 4 24 g 4
Total 6 172g 32

Thanks


Solution

  • As discussed in the comments, this is a solution based on PyTorch, not SparkNLP. Simplified code:

    # labse_spark.py
    
    LABSE_MODEL, LABSE_TOKENIZER = None
    
    
    def transform(spark, df, input_col='text', output_col='output'):
        spark.sparkContext.addFile('hdfs:///path/to/labse_model')
        output_schema = T.StructType(df.schema.fields + [T.StructField(output_col, T.ArrayType(T.FloatType()))])
    
        rdd = df.rdd.mapPartitions(_map_partitions_func(input_col, output_col))
        res = spark.createDataFrame(data=rdd, schema=output_schema)
        return res
    
    
    def _map_partitions_func(input_col, output_col):
        def executor_func(rows):
            # load everything to memory (partitions should be small, ~1k rows per partition):
            pandas_df = pd.DataFrame([r.asDict() for r in rows])
            global LABSE_MODEL, LABSE_TOKENIZER
            if not (LABSE_TOKENIZER or LABSE_MODEL):  # should happen once per executor core
                LABSE_TOKENIZER = AutoTokenizer.from_pretrained(SparkFiles.get('labse_model'))
                LABSE_MODEL = AutoModel.from_pretrained(SparkFiles.get('labse_model'))
            
            # copied from HF model card:
            encoded_input = LABSE_TOKENIZER(
                pandas_df[input_col].tolist(), padding=True, truncation=True, max_length=64, return_tensors='pt')
            with torch.no_grad():
                model_output = LABSE_MODEL(**encoded_input)
            embeddings = model_output.pooler_output
            embeddings = torch.nn.functional.normalize(embeddings)
    
            pandas_df[output_col] = pd.Series(embeddings.tolist())
            return pandas_df.to_dict('records')
    
        return executor_func