Search code examples
python-3.xpyspark

Iterate text file contents in pyspark dataframe and generate the file with new dates in a list in pyspark


I have below impala queries in one file in hdfs location. I am trying to read that file and expand the file when I encounter the end_date as "2211-01-01"

list of dates are as

final_dates = ["2022-11-01", "2023-02-01", "2023-05-01", "2023-08-01"]

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2022-04-01" and end_date < "2022-07-01";

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2022-07-01" and end_date < "2022-08-01";

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2022-08-01" and end_date < "2211-01-01";

file = spark.read.text("/usr/ihitha/var/hive/test_script.py"). How do I do above generate new queries with new dates which are in a list by reading the file in pyspark ?

expected output as below:-

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2022-04-01" and end_date < "2022-07-01";

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2022-07-01" and end_date < "2022-08-01";

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2022-08-01" and end_date < "2022-11-01";

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2022-11-01" and end_date < "2023-02-01";

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2023-02-01" and end_date < "2023-05-01";

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2023-05-01" and end_date < "2023-08-01";

insert overwrite table randome_database.random_table partition (end_date) select * from randome_database.random_table_stg where end_date >= "2023-08-01" and end_date < "2211-01-01";


Solution

  • Here is a code snippet to get you started

    # Initialize SparkSession
    spark = SparkSession.builder.appName("DateExpansion").getOrCreate()
    
    # Define the list of dates
    final_dates = ["2022-11-01", "2023-02-01", "2023-05-01", "2023-08-01"]
    
    # Read the file
    file_path = "/usr/ihitha/var/hive/test_script.py"
    file_content = spark.read.text(file_path)
    
    # Filter out the lines that don't contain the end_date condition
    filtered_content = file_content.filter(file_content.value.contains('end_date >=') & file_content.value.contains('end_date <'))
    
    # Iterate over final_dates and generate new queries
    new_queries = []
    for i in range(len(final_dates)-1):
        start_date = final_dates[i]
        end_date = final_dates[i+1]
        new_query = filtered_content.withColumn("value", lit(f'select * from randome_database.random_table_stg where end_date >= "{start_date}" and end_date < "{end_date}";')).select("value")
        new_queries.append(new_query)
    
    # Ensure all DataFrames have the same schema
    for query in new_queries:
        query.createOrReplaceTempView("temp_view")
    
    # Merge the queries
    final_queries = spark.sql("SELECT value FROM temp_view UNION ALL SELECT value FROM temp_view")
    
    # Show the final queries
    final_queries.show(truncate=False)