Search code examples
pythonapache-sparkpysparkazure-hdinsight

Pyspark: Read data from table and write to File


I am using HDInsight spark cluster to run my Pyspark code. Am trying to read data from a postgres table and write to a file like below. pgsql_df is returning DataFrameReader instead of DataFrame. So i am unable to write the DataFrame to file. Why is "spark.read" returning DataFrameReader. What am I missing here?

from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as dbpull
from datetime import datetime
from pyspark.sql.types import Row
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameReader
from pyspark.sql import DataFrameWriter
import random
import string
from pyspark.sql.functions import *
import sys
spark=SparkSession.builder.master("local").appName("db pull").getOrCreate()
pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")```

>>>pgsql_df
<pyspark.sql.readwriter.DataFrameReader object at 0x7fb43ce1f890>


pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)


**Error:** 
 Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'write'






Solution

  • Please check below code. You are missing to invoke load() on DataFrameReader object.

    pgsql_df=spark.read.format("jdbc") \
        .option("driver", "org.postgresql.Driver") \
        .option("url", "jdbc:postgresql://<hostdetails>") \
        .option("dbtable", "table") \
        .option("user", "user") \
        .option("password", "password")
        .load() // this is missing 
    
    pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)
    
    or 
    
    
    pgsql_df=spark.read.format("jdbc") \
        .option("driver", "org.postgresql.Driver") \
        .option("url", "jdbc:postgresql://<hostdetails>") \
        .option("dbtable", "table") \
        .option("user", "user") \
        .option("password", "password")
    
    pgsql_df
    .load() \ added here 
    .write. \
    .format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)