i am using the below code to read the spark dataframe from hdfs:
from delta import *
from pyspark.sql import SparkSession
builder= SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark=configure_spark_with_delta_pip(builder).getOrCreate()
#change file path here
delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')
delta_df.show(10, truncate=False)
and below code to use the pretrained pipeline:
from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp
# spark session one way
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[4]")\
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()
# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)
# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")
print("-------")
# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')
# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)
#printing the result
print(result)
print("DONE!!!")
i wish to merge these two codes but the two spark sessions are not merging or not working for both tasks together. please help!
i tried merging the .config() options of both spark sessions and it did not work also i tried to create two spark sessions but it did not work
a normal spark session is enough to read other format files but to read a delta file i had to strictly use this option : configure_spark_with_delta_pip(builder)
is there any way to bypass this? or to make the code running?
The configure_spark_with_delta_pip
is just a shortcut to setup correct parameters of the SparkSession... If you look into its source code you'll see following code, you'll see that everything it's doing is configuring the spark.jars.packages
. But because you're using it separately for SparkNLP, you're overwriting Delta's value.
Update 14.04.2022: it wasn't released at time of answer, but available in version 1.2.0
To handle such situations, configure_spark_with_delta_pip
has an additional parameter extra_packages
to specify additional packages to be configured. So in your case the code should look as following:
builder = SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")
my_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2"]
spark=configure_spark_with_delta_pip(builder, extra_packages=my_packages) \
.getOrCreate()
Before that implementation with extra parameters is released, you need to avoid using that function, and simply configure all parameters yourself, like this:
scala_version = "2.12"
delta_version = "1.1.0"
all_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2",
f"io.delta:delta-core_{scala_version}:{delta_version}"]
spark = SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M") \
.config("spark.jars.packages", ",".join(all_packages)) \
.getOrCreate()