Search code examples
palantir-foundryfoundry-code-repositories

loop through a list of tables and save to csv with table name


I have a list of 120 tables and i want to save sample size of first 1000 and last 1000 rows from each table into individual csv files for each table.

How can this be done in code repo or code authoring.

The following code allows to save one table to csv, can anyone help with this to loop through list of tables from a project folder and create individual csv files for each table?

@transform(
    my_input = Input('/path/to/input/dataset'),
    my_output = Output('/path/to/output/dataset')
)
def compute_function(my_input, my_output):
    my_output.write_dataframe(
        my_input.dataframe(),
        output_format = "csv",
        options = {
            "compression": "gzip"
        }
    )

psuedo code

list_of_tables = [table1,table2,table3,...table120]
for tables in list_of_tables:
    table = table.limit(1000)
    table.write_dataframe(table.dataframe(),output_format = "csv",
        options = {
            "compression": "gzip"
        })

i was able to get it working for one table, how can i just loop through a list of tables and generate it ? The code for one table

# to get the first and last rows 
from transforms.api import transform_df, Input, Output 
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col


table_name = 'stock'
@transform_df(
    output=Output(f"foundry/sample/{table_name}_sample"),
    my_input=Input(f"foundry/input/{table_name}"),
)
def compute_first_last_1000(my_input):
    first_stock_df = my_input.withColumn("index", monotonically_increasing_id())
    first_stock_df = first_stock_df.orderBy("index").filter(col("index") < 1000).drop("index")
    last_stock_df = my_input.withColumn("index", monotonically_increasing_id())
    last_stock_df = last_stock_df.orderBy("index").filter(col("index") < 1000).drop("index")
    stock_df = first_stock_df.unionByName(last_stock_df)
    return stock_df

# code to save as csv file
table_name = 'stock'

@transform(
        output=Output(f"foundry/sample/{table_name}_sample_csv"),
        my_input=Input(f"foundry/sample/{table_name}_sample"),
)

def my_compute_function(my_input, output):
    df = my_input.dataframe()
    with output.filesystem().open('stock.csv', 'w') as stream:
        csv_writer = csv.writer(stream)
        csv_writer.writerow(df.schema.names)
        csv_writer.writerows(df.collect())

Solution

  • Your best strategy here would be to programatically generate your transforms, you can also do a multi output transform if you don't fancy creating 1000 transforms. Something like this (written live into the answer box, non tested code some sintax may be wrong):

    # you can generate this programatically
    my_inputs = [
       '/path/to/input/dataset1',
       '/path/to/input/dataset2',
       '/path/to/input/dataset3',
       # ...
    ]
    
    for table_path in my_inputs:
       @transform_df(
          Output(table_path + '_out'),
          df=Input(table_path))
       def transform(df):
           # your logic here
           return df