Search code examples
pythonpysparkapache-spark-sqlbigdatapyspark-schema

how to define Schema for semi - structured text file in pysparK


1 2013-07-25    11599,CLOSED
2 2013-07-25    256,PENDING_PAYMENT
3 2013-07-25    12111,COMPLETE
4 2013-07-25    8827,CLOSED
5 2013-07-25    11318,COMPLETE
6 2013-07-25    7130,COMPLETE
7 2013-07-25    4530,COMPLETE
8 2013-07-25    2911,PROCESSING
9 2013-07-25    5657,PENDING_PAYMENT
10 2013-07-25   5648,PENDING_PAYMENT
11 2013-07-25   918,PAYMENT_REVIEW
12 2013-07-25   1837,CLOSED

Above data is semi structured text file data.

2nd column is seperated by space. 3rd column is seperated by tab. 4th column is seperated by , .

How to define schema(datatype) for every column like for 1st column 'int',2nd column 'timestamp',3rd column 'int',4th column 'string'.

I have tried to seperate this record into each row by following code below

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pysp  ark.sql.functions import regexp_extract
from pyspark.sql.types import IntegerType, StructField, StructType, 
     StringType, TimestampType
 
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")

my_conf.set("spark.master","local[*]")

spark = SparkSession.builder.config(conf=my_conf).getOrCreate()

schema1 = StructType([StructField("order_id", IntegerType(),True),
StructField("date", TimestampType(),True),
StructField("customer_id", IntegerType(),True),
StructField("status", StringType(),True)])


myregex = r'^(\S+) (\S+)\t(\S+)\,(\S+)'

lines_df = spark.read.format("text")\
           .option("path","C:/Users/Lenovo/Desktop/week11/week 11 
            datasets/orders_new.csv").load()
 
 
final_df =lines_df.select(regexp_extract('value',myregex,1).alias("order_id"),
regexp_extract('value',myregex,2).alias("date"),
regexp_extract('value',myregex,3).alias("customer_id"),
regexp_extract('value',myregex,4).alias("status"))


final_df.show()`

ans========
+--------+----------+-----------+---------------+
|order_id|      date|customer_id|         status|
+--------+----------+-----------+---------------+
|       1|2013-07-25|      11599|         CLOSED|
|       2|2013-07-25|        256|PENDING_PAYMENT|
|       3|2013-07-25|      12111|       COMPLETE|
|       4|2013-07-25|       8827|         CLOSED|
|       5|2013-07-25|      11318|       COMPLETE|
|       6|2013-07-25|       7130|       COMPLETE|
|       7|2013-07-25|       4530|       COMPLETE|
|       8|2013-07-25|       2911|     PROCESSING|
|       9|2013-07-25|       5657|PENDING_PAYMENT|
|      10|2013-07-25|       5648|PENDING_PAYMENT|
|      11|2013-07-25|        918| PAYMENT_REVIEW|
|      12|2013-07-25|       1837|         CLOSED|
+--------+----------+-----------+---------------+

final_df.printSchema()

|-- order_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- status: string (nullable = true)

as in printschema i get string datatype only...



------but now to define schema for this
whenever i do

    df=spark.createDataFrame(final_df.rdd,schema1)

    final_df.show()    -------i get error here

so how to define schema
plzz tell..

Solution

  • After regexp_extract you need to do cast.

    Try with below syntax.

    Example:

    final_df =lines_df.select(regexp_extract('value',myregex,1).cast("int").alias("order_id"),
    regexp_extract('value',myregex,2).cast("timestamp").alias("date"),
    regexp_extract('value',myregex,3).cast("int").alias("customer_id"),
    regexp_extract('value',myregex,4).alias("status"))
    
    final_df.show()
    df=spark.createDataFrame(final_df.rdd,schema1)
    df.printSchema()
    #root
    # |-- order_id: integer (nullable = true)
    # |-- date: timestamp (nullable = true)
    # |-- customer_id: integer (nullable = true)
    # |-- status: string (nullable = true)