Search code examples
pysparkpalantir-foundryfoundry-code-repositoriesfoundry-code-workbooks

How can i iterate over json files in code repositories and incrementally append to a dataset


I have imported a dataset with 100,000 raw json files of about 100gb through data connection into foundry. I want to use the Python Transforms raw file access transformation to read the files, Flatten array of structs and structs into a dataframe as an incremental update to df. I want to use something like from the below example from the documentation for *.json files and also convert that into an incremental updated using @incremental() decorator.

>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
...     processed=Output('/examples/hair_eye_color_processed'),
...     hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
...    def process_file(file_status):
...        with hair_eye_color.filesystem().open(file_status.path) as f:
...            r = csv.reader(f)
...
...            # Construct a pyspark.Row from our header row
...            header = next(r)
...            MyRow = Row(*header)
...
...            for row in csv.reader(f):
...                yield MyRow(*row)
...
...    files_df = hair_eye_color.filesystem().files('**/*.csv')
...    processed_df = files_df.rdd.flatMap(process_file).toDF()
...    processed.write_dataframe(processed_df)

With the help of @Jeremy David Gamet i was able to develop the code to get the dataset i want.

from transforms.api import transform, Input, Output
from  pyspark import *
import json


@transform(
     out=Output('foundry/outputdataset'),
     inpt=Input('foundry/inputdataset'),
 )
def update_set(ctx, inpt, out):
    spark = ctx.spark_session
    sc = spark.sparkContext

    filesystem = list(inpt.filesystem().ls())
    file_dates = []
    for files in filesystem:
        with inpt.filesystem().open(files.path,'r', encoding='utf-8-sig') as fi:
            data = json.load(fi)
        file_dates.append(data)

    json_object = json.dumps(file_dates)
    df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))

    df_2.drop_duplicates()
# this code to [Flatten array column][1]
    df_2 = flatten(df_2)
    out.write_dataframe(df_2)

code to flatten__df

The above code works for few files, since the files are above 100,0000 i am hitting the following error:

Connection To Driver Lost 

This error indicates that connection to the driver was lost unexpectedly, which is often caused by the driver being terminated due to running out of memory. Common reasons for driver out-of-memory (OOM) errors include functions that materialize data to the driver such as .collect(), broadcasted joins, and using Pandas dataframes.

any way around this ?


Solution

  • I have given an example of how this can be done dynamically as an answer to another question.

    Here is the link to that code answer: How to union multiple dynamic inputs in Palantir Foundry? and a copy of the same code:

    from transforms.api import Input, Output, transform
    from pyspark.sql import functions as F
    import json
    import logging
    
    
    def transform_generator():
        transforms = []
        transf_dict = {## enter your dynamic mappings here ##}
    
        for value in transf_dict:
            @transform(
                out=Output(' path to your output here '.format(val=value)),
                inpt=Input(" path to input here ".format(val=value)),
            )
            def update_set(ctx, inpt, out):
                spark = ctx.spark_session
                sc = spark.sparkContext
    
                filesystem = list(inpt.filesystem().ls())
                file_dates = []
                for files in filesystem:
                    with inpt.filesystem().open(files.path) as fi:
                        data = json.load(fi)
                    file_dates.append(data)
    
                logging.info('info logs:')
                logging.info(file_dates)
                json_object = json.dumps(file_dates)
                df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
                df_2 = df_2.withColumn('upload_date', F.current_date())
    
                df_2.drop_duplicates()
                out.write_dataframe(df_2)
            transforms.append(update_logs)
        return transforms
    
    
    TRANSFORMS = transform_generator()
    

    Please let me know if there is anything I can clarify.