Search code examples
pythonstreamlitcatalogkedro

Kedro and Streamlit Integration - Running Kedro Pipeline with Custom DataCatalog


I am working on integrating Kedro, a data pipeline framework, with Streamlit, a popular Python web app framework, to build a data processing application. The primary goal is to run a specific Kedro pipeline from within my Streamlit app, using a custom DataCatalog to manage and load DataFrames.

Problem Details:

  1. Integration Background:

    I have successfully integrated Kedro and Streamlit, and I can run Kedro pipelines from my Streamlit app. However, I want to pass custom data loaded in Streamlit as a DataCatalog to the Kedro pipeline for processing.

  2. Data Loading in Streamlit:

    In Streamlit, I use file uploaders to load data from various sources (e.g., CSV, Excel). I then create DataFrames from this data.

  3. Running Kedro Pipeline:

    After loading and processing the data in Streamlit, I want to execute a specific Kedro pipeline, passing the custom DataCatalog containing the loaded DataFrames. The pipeline processes this data and stores the results.

  4. Error Encountered:

    When I attempt to run the Kedro pipeline with the custom DataCatalog, I encounter the following error messages:

    • TypeError: KedroSession.run() got an unexpected keyword argument 'extra_params'
    • AttributeError: can't set attribute 'catalog'
    • AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

How can I properly run a Kedro pipeline from within a Streamlit app, passing a dynamically created DataCatalog? Kedro Version: Latest Streamlit Version: Latest

# Code block 1: Loading data in Streamlit
if st.button('Processar Dados de Entrada'):
    # Executar a pipeline do Kedro
    with KedroSession.create(project_path=project_path) as session:
        catalog = DataCatalog({
            "reagentes_raw": MemoryDataSet(df1),
            "balanco_de_massas_raw": MemoryDataSet(df2),
            "laboratorio_raw": MemoryDataSet(df3),
            "laboratorio_raiox_raw": MemoryDataSet(df4),
            "carta_controle_pims_raw": MemoryDataSet(df5),
            "blend_raw": MemoryDataSet(df6)
        })
        session.run(data_catalog=catalog, pipeline_name="tag_web_app")
    st.success('Dados de entrada processados com sucesso!')

    # Ler o arquivo Parquet após a execução da pipeline
    merged_data = catalog.load("merged_raw_data_process")
    # Supondo que o arquivo Parquet tenha uma coluna de timestamp
    last_update = merged_data['timestamp_column'].max()
    st.header('Resultado da Atualização da base de dados')
    st.write(f"Dados de entrada processados com sucesso: a data e hora da informação mais recente é {last_update.strftime('%d/%m/%Y %H:%M')}")

# Code block 2: Attempt to set DataCatalog in KedroContext
if st.button('Processar Dados de Entrada'):
    with KedroSession.create(project_path=project_path) as session:
        context = session.load_context()
        context.catalog = catalog  # This line triggers an AttributeError
        runner = SequentialRunner()
        session.run(pipeline_name="tag_web_app")
        runner.run(pipeline=context.pipeline_registry.get("tag_web_app"), catalog=catalog)  # This line triggers an AttributeError

Error 2:

AttributeError: can't set attribute 'catalog'

Error 3:

AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'


Solution

  • Error 2

    AttributeError: can't set attribute 'catalog'
    

    KedroSession.catalog has been read-only for a while, what you are trying to do cannot be done.

    Error 3

    AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'
    

    KedroContext never had a pipeline_registry property.