Search code examples
pythonunit-testingmocking

Is my integration test well structured and why is it returning import error


I am learning mock and trying it on a personal project.

This is my project structure:

project/
│
├── src/
│   └── My_app/
│       ├── __init__.py
│       ├── application/
│       │   └── main_code.py
│       │   └── __init__.py
│       ├── infrastructure/
│       │   ├── __init__.py
│       │   ├── data_quality.py
│       │   └── s3_utils.py
│       └── settings/
│           ├── __init__.py
│           └── s3_utils.py
│
└── tests/
    └── integration_tests/
        └── application/
            └── test_main_code.py

Aim is to write integration test for main_code.py

simplified version of main_code.py

import My_app.settings.config as stg
from awsglue.utils import getResolvedOptions
from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
from My_app.infrastructure.s3_utils import csvReader, dataframeWriter
from pyspark.sql import SparkSession


def main(argv: List[str]) -> None:
    args = getResolvedOptions(
        argv,
        [
            'JOB_NAME',
            'S3_BRONZE_BUCKET_NAME',
            'S3_PRE_SILVER_BUCKET_NAME',
            'S3_BRONZE_PATH',
            'S3_PRE_SILVER_PATH',
            'S3_DATA_QUALITY_LOGS_BUCKET_NAME',
        ],
    )

    s3_bronze_bucket_name = args['S3_BRONZE_BUCKET_NAME']
    s3_pre_silver_bucket_name = args['S3_PRE_SILVER_BUCKET_NAME']
    s3_bronze_path = args['S3_BRONZE_PATH']
    s3_pre_silver_path = args['S3_PRE_SILVER_PATH']
    s3_data_quality_logs_bucket_name = args['S3_DATA_QUALITY_LOGS_BUCKET_NAME']

    spark = SparkSession.builder.getOrCreate()  # TODO replace this init with common method (waiting for S3 part)
    spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')


    for table in list(stg.data_schema.keys()):
        raw_data = stg.data_schema[table].columns.to_dict()

        schema = generateSchema(raw_data)
        df = csvReader(spark, s3_bronze_bucket_name, s3_bronze_path, table, schema, '\t')


        (quality_df, table_ingestion_status) = evaluateDataQuality(spark, df, table)

        dataframeWriter(
            quality_df,
            s3_data_quality_logs_bucket_name,
            'data-quality/',
            'logs',
            'date',
            'append',
        )


if __name__ == '__main__':
    main(sys.argv)

simplified version of data_quality.py

import My_app.settings.config as stg
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType, IntegerType, StringType, FloatType, DateType


def typeOf(t: str) -> IntegerType | StringType | FloatType | DateType:
    ...
    return StringType()


def generateSchema(columns_dict: dict) -> StructType:
    ...
    return schema


def evaluateDataQuality(spark: SparkSession, df: DataFrame, table: str) -> (DataFrame, bool):
    ...
    return (
        EvaluateDataQuality.apply(...)
        .toDF(),
        True,
    )

simplified version of s3_utils.py

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType


def csvReader(spark: SparkSession, bucket: str, path: str, table: str, schema: StructType, sep: str) -> DataFrame:
    """Reads a CSV file as a Dataframe from S3 using user parameters for format."""
    return (
        spark.read.format('csv')
        .option('header', 'true')
        .option('sep', sep)
        .schema(schema)
        .load(f's3a://{bucket}/{path}/{table}.csv')
    )


def dataframeWriter(
    df: DataFrame, bucket: str, path: str, table: str, partition_key: str, mode: str = 'overwrite'
) -> None:
    """Writes a dataframe in S3 in parquet format using user parameters to define path and partition key."""
    df.write.partitionBy(partition_key).mode(mode).parquet(f's3a://{bucket}/{path}/{table}/')

What I want to do

Write an integration test for main_code.py while:

  • Mocking csvReader function and replace it with local_csvReader.
  • Mocking dataframeWriter function and replace it with local_dataframeWriter.
  • Mocking the import from awsgluedq in order to avoid installing it locally.

What I did:

test_main_code.py

"""Module that contains unit tests for My_app pre silver job."""

import os
from unittest import TestCase
from unittest.mock import patch, Mock

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType


def local_csvReader(spark: SparkSession, bu: str, pa: str, table: str, schema: StructType, sep: str):
    """Mocked function that replaces real csvReader. this one reads from local rather than S3."""
    return (
        spark.read.format('csv')
        .option('header', 'true')
        .option('sep', ';')
        .schema(schema)
        .load(f'./tests/integration_tests/input_mock/{table}.csv')
    )


def local_dataframeWriter(df, bu: str, pa: str, table: str, partition_key: str):
    """Mocked function that replaces real dataframeWriter. this one writes in local rather than S3."""
    output_dir = f'./tests/integration_tests/output_mock/{table}/'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    df.write.partitionBy(partition_key).mode('overwrite').parquet(output_dir)


class IntegrationTest(TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder.master('local').appName('TestPerfmarketSilver').getOrCreate()
        cls.spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

    @patch('My_app.application.main_code.getResolvedOptions')
    @patch('My_app.application.main_code.csvReader', side_effect=local_csvReader)
    @patch('My_app.application.main_code.dataframeWriter', side_effect=local_dataframeWriter)
    def test_main(self, mock_csvreader, mock_datawriter, mocked_get_resolved_options: Mock):
        """Test the main function with local CSV and Parquet output."""
        import My_app.application.main_code as main_code

        import My_app.settings.config as stg
        import tests.integration_tests.settings.config as stg_new

        stg.data_schema = stg_new.data_schema_test

        expected_results = {'chemins': {'nbRows': 8}}

        # Mock the resolved options
        mocked_get_resolved_options.return_value = {
            'JOB_NAME': 'test_job',
            'S3_BRONZE_BUCKET_NAME': 'test_bronze',
            'S3_PRE_SILVER_BUCKET_NAME': 'test_pre_silver',
            'S3_BRONZE_PATH': './tests/integration_tests/input_mock',
            'S3_PRE_SILVER_PATH': './tests/integration_tests/output_mock',
            'S3_DATA_QUALITY_LOGS_BUCKET_NAME': 'test_dq',
        }

        main_code.main([])

        for table in stg.data_schema.keys():
            # Verify that the output Parquet file is created
            output_path = f'./tests/integration_tests/output_mock/{table}/'
            self.assertTrue(os.path.exists(output_path))

            # Read the written Parquet file and check the data
            written_df = self.spark.read.parquet(output_path)
            self.assertEqual(written_df.count(), expected_results[table]['nbRows'])  # Check row count
            self.assertTrue(
                set(
                    [column_data['bronze_name'] for column_data in stg.data_schema[table]['columns'].to_dict().values()]
                )
                == set(written_df.columns)
            )
            # Clean up
            os.system(f'rm -rf ./tests/integration_tests/output_mock/{table}/')

Questions:

Running test class is returning:

======================================================================
ERROR: test_main (tests.integration_tests.application.test_main_code.IntegrationTest)
Test the main function with local CSV and Parquet output.
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1248, in _dot_lookup
    return getattr(thing, comp)
AttributeError: module 'My_app.application' has no attribute 'main_code'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1376, in patched
    with self.decoration_helper(patched,
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1358, in decoration_helper
    arg = exit_stack.enter_context(patching)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/contextlib.py", line 492, in enter_context
    result = _cm_type.__enter__(cm)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1431, in __enter__
    self.target = self.getter()
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1618, in <lambda>
    getter = lambda: _importer(target)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1261, in _importer
    thing = _dot_lookup(thing, comp, import_path)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1250, in _dot_lookup
    __import__(import_path)
  File "/Users/me/IdeaProjects/project_root/apps/project/src/My_app/application/main_code.py", line 10, in <module>
    from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
  File "/Users/me/IdeaProjects/project_root/apps/project/src/My_app/infrastructure/data_quality.py", line 4, in <module>
    from awsgluedq.transforms import EvaluateDataQuality
ModuleNotFoundError: No module named 'awsgluedq'

----------------------------------------------------------------------
Ran 1 test in 2.114s

FAILED (errors=1)
  • Is my test class well structured ? I am importing main_code right ? I dont think so because of: AttributeError: module 'My_app.application' has no attribute 'main_code'

  • How can I integrate a mocking technique to replace awsgluedq module by another code ?


Solution

  • #Here's the edited code that I copied and edited. Sorry for the wait, took #quite some time to debug and test:

    import My_app.settings.config as stg
    #Make sure to import runpy
    import runpy
    _name_ == _main_.py
    from _main_.file import awsglue
    from awsglue.utils import getResolvedOptions
    from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
    from My_app.infrastructure.s3_utils import csvReader, dataframeWriter
    from pyspark.sql import SparkSession
    
    
    def main(argv: List[str]) -> None:
        args = getResolvedOptions(
            argv,
            [
                'JOB_NAME',
                'S3_BRONZE_BUCKET_NAME',
                'S3_PRE_SILVER_BUCKET_NAME',
                'S3_BRONZE_PATH',
                'S3_PRE_SILVER_PATH',
                'S3_DATA_QUALITY_LOGS_BUCKET_NAME',
            ],
        )
    
        s3_bronze_bucket_name = args['S3_BRONZE_BUCKET_NAME']
        s3_pre_silver_bucket_name = args['S3_PRE_SILVER_BUCKET_NAME']
        s3_bronze_path = args['S3_BRONZE_PATH']
        s3_pre_silver_path = args['S3_PRE_SILVER_PATH']
        s3_data_quality_logs_bucket_name = args['S3_DATA_QUALITY_LOGS_BUCKET_NAME']
    
        spark = SparkSession.builder.getOrCreate()  # TODO replace this init with common method (waiting for S3 part)
        spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
    
    
        for table in list(stg.data_schema.keys()):
            raw_data = stg.data_schema[table].columns.to_dict()
    
            schema = generateSchema(raw_data)
            df = csvReader(spark, s3_bronze_bucket_name, s3_bronze_path, table, Schema, '\t')
    
    
            (quality_df, table_ingestion_status) = evaluateDataQuality(spark, df, table)
    
            dataframeWriter(
                quality_df,
                s3_data_quality_logs_bucket_name,
                'data-quality/',
                'logs',
                'date',
                'append',
            )
    

    #You made a mistake here: #Your name and main had two underscores instead of one. #You also missed the import from main.file for 'awsglue' make sure to include that

    if _name_ == '_main_':
        main(sys.argv)
    
    import My_app.settings.config as stg
    _name_.py == _main_.py
    from _main_.file awsglue.context import GlueContext
    from _main_.file awsglue.dynamicframe import DynamicFrame
    from _main_.file awsgluedq.transforms import EvaluateDataQuality
    from _main_.file pyspark.sql import SparkSession, Row
    from _main_.file pyspark.sql import functions as F
    from _main_.file pyspark.sql.dataframe import DataFrame
    from _main_.file pyspark.sql.types import StructType, IntegerType, StringType, FloatType, DateType
    
    
    def typeOf(t: str) -> IntegerType | StringType | FloatType | DateType:
        ...
        return StringType()
    
    
    def generateSchema(columns_dict: dict) -> StructType:
        ...
        return schema
    
    
    def evaluateDataQuality(spark: SparkSession, df: DataFrame, table: str) -> (DataFrame, bool):
        ...
        return (
            EvaluateDataQuality.apply(...)
            .toDF(),
            True,
        )
    
    from pyspark.sql import SparkSession
    from pyspark.sql.dataframe import DataFrame
    from pyspark.sql.types import StructType
    

    #This is the CSVReader code, what you wrote did not read or write properly. So here it is.

    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    dynamicFrame = glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={"paths": ["s3://s3path"]},
        format="csv",
        format_options={
            "withHeader": True,
            # "optimizePerformance": True,
        },
    )
    
    dataFrame = spark.read\
        .format("csv")\
        .option("header", "true")\
        .load("s3://s3path")
    

    Write CSV to S3

    For show, customize how to write string type values. Set quoteChar to -1 so our values are not quoted.

    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    
    glueContext.write_dynamic_frame.from_options(
        frame=dynamicFrame,
        connection_type="s3",
        connection_options={"path": "s3://s3path"},
        format="csv",
        format_options={
            "quoteChar": -1,
        },
    )
    
    dataFrame.write\
        .format("csv")\
        .option("quote", None)\
        .mode("append")\
        .save("s3://s3path")
    
    glueContext.create_dynamic_frame.from_options(
        frame = datasource1,
        connection_type = "s3", 
        connection_options = {"paths": ["s3://s3path"]}, 
        format = "csv", 
        format_options={
            "optimizePerformance": True, 
            "separator": ","
            }, 
        transformation_ctx = "datasink2")