Search code examples
palantir-foundryfoundry-code-repositories

How can I process large files in Code Repositories?


I have a data feed that gives a large .txt file (50-75GB) every day. The file contains several different schemas within it, where each row corresponds to one schema. I would like to split this into partitioned datasets for each schema, how can I do this efficiently?


Solution

  • The largest problem you need to solve is the iteration speed to recover your schemas, which can be challenging for a file at this scale.

    Your best tactic here will be to get an example 'notional' file with each of the schemas you want to recover as a line within it, and to add this as a file within your repository. When you add this file into your repo (alongside your transformation logic), you will then be able to push it into a dataframe, much as you would with the raw files in your dataset, for quick testing iteration.

    First, make sure you specify txt files as a part of your package contents, this way your tests will discover them (this is covered in documentation under Read a file from a Python repository):

    You can read other files from your repository into the transform context. This might be useful in setting parameters for your transform code to reference.

    To start, In your python repository edit setup.py:

    setup(
       name=os.environ['PKG_NAME'],
    # ...
        package_data={
            '': ['*.txt']
        }
    )
    

    I am using a txt file with the following contents:

    my_column, my_other_column
    some_string,some_other_string
    some_thing,some_other_thing,some_final_thing
    

    This text file is at the following path in my repository: transforms-python/src/myproject/datasets/raw.txt

    Once you have configured the text file to be shipped with your logic, and after you have included the file itself in your repository, you can then include the following code. This code has a couple of important functions:

    1. It keeps raw file parsing logic completely separate from the stage of reading the file into a Spark DataFrame. This is so that the way this DataFrame is constructed can be left to the test infrastructure, or to the run time, depending on where you are running.
    2. This keeping of the logic separate lets you ensure the actual row-by-row parsing you want to do is its own testable function, instead of having it live purely within your my_compute_function
    3. This code uses the Spark-native spark_session.read.text method, which will be orders of magnitude faster than row-by-row parsing of a raw txt file. This will ensure the parallelized DataFrame is what you operate on, not a single file, line by line, inside your executors (or worse, your driver).
    from transforms.api import transform, Input, Output
    from pkg_resources import resource_filename
    
    
    def raw_parsing_logic(raw_df):
        return raw_df
    
    
    @transform(
        my_output=Output("/txt_tests/parsed_files"),
        my_input=Input("/txt_tests/dataset_of_files"),
    )
    def my_compute_function(my_input, my_output, ctx):
        all_files_df = None
        for file_status in my_input.filesystem().ls('**/**'):
            raw_df = ctx.spark_session.read.text(my_input.filesystem().hadoop_path + "/" + file_status.path)
            parsed_df = raw_parsing_logic(raw_df)
            all_files_df = parsed_df if all_files_df is None else all_files_df.unionByName(parsed_df)
        my_output.write_dataframe(all_files_df)
    
    
    def test_my_compute_function(spark_session):
        file_path = resource_filename(__name__, "raw.txt")
        raw_df = raw_parsing_logic(
          spark_session.read.text(file_path)
        )
        assert raw_df.count() > 0
        raw_columns_set = set(raw_df.columns)
        expected_columns_set = {"value"}
        assert len(raw_columns_set.intersection(expected_columns_set)) == 1
    
    

    Once you have this code up and running, your test_my_compute_function method will be very fast to iterate on, so that you can perfect your schema recovery logic. This will make it substantially easier to get your dataset building at the very end, but without any of the overhead of a full build.