I am learning mock and trying it on a personal project.
This is my project structure:
project/
│
├── src/
│ └── My_app/
│ ├── __init__.py
│ ├── application/
│ │ └── main_code.py
│ │ └── __init__.py
│ ├── infrastructure/
│ │ ├── __init__.py
│ │ ├── data_quality.py
│ │ └── s3_utils.py
│ └── settings/
│ ├── __init__.py
│ └── s3_utils.py
│
└── tests/
└── integration_tests/
└── application/
└── test_main_code.py
Aim is to write integration test for main_code.py
import My_app.settings.config as stg
from awsglue.utils import getResolvedOptions
from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
from My_app.infrastructure.s3_utils import csvReader, dataframeWriter
from pyspark.sql import SparkSession
def main(argv: List[str]) -> None:
args = getResolvedOptions(
argv,
[
'JOB_NAME',
'S3_BRONZE_BUCKET_NAME',
'S3_PRE_SILVER_BUCKET_NAME',
'S3_BRONZE_PATH',
'S3_PRE_SILVER_PATH',
'S3_DATA_QUALITY_LOGS_BUCKET_NAME',
],
)
s3_bronze_bucket_name = args['S3_BRONZE_BUCKET_NAME']
s3_pre_silver_bucket_name = args['S3_PRE_SILVER_BUCKET_NAME']
s3_bronze_path = args['S3_BRONZE_PATH']
s3_pre_silver_path = args['S3_PRE_SILVER_PATH']
s3_data_quality_logs_bucket_name = args['S3_DATA_QUALITY_LOGS_BUCKET_NAME']
spark = SparkSession.builder.getOrCreate() # TODO replace this init with common method (waiting for S3 part)
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
for table in list(stg.data_schema.keys()):
raw_data = stg.data_schema[table].columns.to_dict()
schema = generateSchema(raw_data)
df = csvReader(spark, s3_bronze_bucket_name, s3_bronze_path, table, schema, '\t')
(quality_df, table_ingestion_status) = evaluateDataQuality(spark, df, table)
dataframeWriter(
quality_df,
s3_data_quality_logs_bucket_name,
'data-quality/',
'logs',
'date',
'append',
)
if __name__ == '__main__':
main(sys.argv)
import My_app.settings.config as stg
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType, IntegerType, StringType, FloatType, DateType
def typeOf(t: str) -> IntegerType | StringType | FloatType | DateType:
...
return StringType()
def generateSchema(columns_dict: dict) -> StructType:
...
return schema
def evaluateDataQuality(spark: SparkSession, df: DataFrame, table: str) -> (DataFrame, bool):
...
return (
EvaluateDataQuality.apply(...)
.toDF(),
True,
)
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType
def csvReader(spark: SparkSession, bucket: str, path: str, table: str, schema: StructType, sep: str) -> DataFrame:
"""Reads a CSV file as a Dataframe from S3 using user parameters for format."""
return (
spark.read.format('csv')
.option('header', 'true')
.option('sep', sep)
.schema(schema)
.load(f's3a://{bucket}/{path}/{table}.csv')
)
def dataframeWriter(
df: DataFrame, bucket: str, path: str, table: str, partition_key: str, mode: str = 'overwrite'
) -> None:
"""Writes a dataframe in S3 in parquet format using user parameters to define path and partition key."""
df.write.partitionBy(partition_key).mode(mode).parquet(f's3a://{bucket}/{path}/{table}/')
Write an integration test for main_code.py
while:
csvReader
function and replace it with local_csvReader
.dataframeWriter
function and replace it with local_dataframeWriter
.awsgluedq
in order to avoid installing it locally."""Module that contains unit tests for My_app pre silver job."""
import os
from unittest import TestCase
from unittest.mock import patch, Mock
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
def local_csvReader(spark: SparkSession, bu: str, pa: str, table: str, schema: StructType, sep: str):
"""Mocked function that replaces real csvReader. this one reads from local rather than S3."""
return (
spark.read.format('csv')
.option('header', 'true')
.option('sep', ';')
.schema(schema)
.load(f'./tests/integration_tests/input_mock/{table}.csv')
)
def local_dataframeWriter(df, bu: str, pa: str, table: str, partition_key: str):
"""Mocked function that replaces real dataframeWriter. this one writes in local rather than S3."""
output_dir = f'./tests/integration_tests/output_mock/{table}/'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
df.write.partitionBy(partition_key).mode('overwrite').parquet(output_dir)
class IntegrationTest(TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.master('local').appName('TestPerfmarketSilver').getOrCreate()
cls.spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
@patch('My_app.application.main_code.getResolvedOptions')
@patch('My_app.application.main_code.csvReader', side_effect=local_csvReader)
@patch('My_app.application.main_code.dataframeWriter', side_effect=local_dataframeWriter)
def test_main(self, mock_csvreader, mock_datawriter, mocked_get_resolved_options: Mock):
"""Test the main function with local CSV and Parquet output."""
import My_app.application.main_code as main_code
import My_app.settings.config as stg
import tests.integration_tests.settings.config as stg_new
stg.data_schema = stg_new.data_schema_test
expected_results = {'chemins': {'nbRows': 8}}
# Mock the resolved options
mocked_get_resolved_options.return_value = {
'JOB_NAME': 'test_job',
'S3_BRONZE_BUCKET_NAME': 'test_bronze',
'S3_PRE_SILVER_BUCKET_NAME': 'test_pre_silver',
'S3_BRONZE_PATH': './tests/integration_tests/input_mock',
'S3_PRE_SILVER_PATH': './tests/integration_tests/output_mock',
'S3_DATA_QUALITY_LOGS_BUCKET_NAME': 'test_dq',
}
main_code.main([])
for table in stg.data_schema.keys():
# Verify that the output Parquet file is created
output_path = f'./tests/integration_tests/output_mock/{table}/'
self.assertTrue(os.path.exists(output_path))
# Read the written Parquet file and check the data
written_df = self.spark.read.parquet(output_path)
self.assertEqual(written_df.count(), expected_results[table]['nbRows']) # Check row count
self.assertTrue(
set(
[column_data['bronze_name'] for column_data in stg.data_schema[table]['columns'].to_dict().values()]
)
== set(written_df.columns)
)
# Clean up
os.system(f'rm -rf ./tests/integration_tests/output_mock/{table}/')
Running test class is returning:
======================================================================
ERROR: test_main (tests.integration_tests.application.test_main_code.IntegrationTest)
Test the main function with local CSV and Parquet output.
----------------------------------------------------------------------
Traceback (most recent call last):
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1248, in _dot_lookup
return getattr(thing, comp)
AttributeError: module 'My_app.application' has no attribute 'main_code'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1376, in patched
with self.decoration_helper(patched,
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/contextlib.py", line 135, in __enter__
return next(self.gen)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1358, in decoration_helper
arg = exit_stack.enter_context(patching)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/contextlib.py", line 492, in enter_context
result = _cm_type.__enter__(cm)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1431, in __enter__
self.target = self.getter()
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1618, in <lambda>
getter = lambda: _importer(target)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1261, in _importer
thing = _dot_lookup(thing, comp, import_path)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1250, in _dot_lookup
__import__(import_path)
File "/Users/me/IdeaProjects/project_root/apps/project/src/My_app/application/main_code.py", line 10, in <module>
from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
File "/Users/me/IdeaProjects/project_root/apps/project/src/My_app/infrastructure/data_quality.py", line 4, in <module>
from awsgluedq.transforms import EvaluateDataQuality
ModuleNotFoundError: No module named 'awsgluedq'
----------------------------------------------------------------------
Ran 1 test in 2.114s
FAILED (errors=1)
Is my test class well structured ? I am importing main_code
right ?
I dont think so because of: AttributeError: module 'My_app.application' has no attribute 'main_code'
How can I integrate a mocking technique to replace awsgluedq
module by another code ?
#Here's the edited code that I copied and edited. Sorry for the wait, took #quite some time to debug and test:
import My_app.settings.config as stg
#Make sure to import runpy
import runpy
_name_ == _main_.py
from _main_.file import awsglue
from awsglue.utils import getResolvedOptions
from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
from My_app.infrastructure.s3_utils import csvReader, dataframeWriter
from pyspark.sql import SparkSession
def main(argv: List[str]) -> None:
args = getResolvedOptions(
argv,
[
'JOB_NAME',
'S3_BRONZE_BUCKET_NAME',
'S3_PRE_SILVER_BUCKET_NAME',
'S3_BRONZE_PATH',
'S3_PRE_SILVER_PATH',
'S3_DATA_QUALITY_LOGS_BUCKET_NAME',
],
)
s3_bronze_bucket_name = args['S3_BRONZE_BUCKET_NAME']
s3_pre_silver_bucket_name = args['S3_PRE_SILVER_BUCKET_NAME']
s3_bronze_path = args['S3_BRONZE_PATH']
s3_pre_silver_path = args['S3_PRE_SILVER_PATH']
s3_data_quality_logs_bucket_name = args['S3_DATA_QUALITY_LOGS_BUCKET_NAME']
spark = SparkSession.builder.getOrCreate() # TODO replace this init with common method (waiting for S3 part)
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
for table in list(stg.data_schema.keys()):
raw_data = stg.data_schema[table].columns.to_dict()
schema = generateSchema(raw_data)
df = csvReader(spark, s3_bronze_bucket_name, s3_bronze_path, table, Schema, '\t')
(quality_df, table_ingestion_status) = evaluateDataQuality(spark, df, table)
dataframeWriter(
quality_df,
s3_data_quality_logs_bucket_name,
'data-quality/',
'logs',
'date',
'append',
)
#You made a mistake here: #Your name and main had two underscores instead of one. #You also missed the import from main.file for 'awsglue' make sure to include that
if _name_ == '_main_':
main(sys.argv)
import My_app.settings.config as stg
_name_.py == _main_.py
from _main_.file awsglue.context import GlueContext
from _main_.file awsglue.dynamicframe import DynamicFrame
from _main_.file awsgluedq.transforms import EvaluateDataQuality
from _main_.file pyspark.sql import SparkSession, Row
from _main_.file pyspark.sql import functions as F
from _main_.file pyspark.sql.dataframe import DataFrame
from _main_.file pyspark.sql.types import StructType, IntegerType, StringType, FloatType, DateType
def typeOf(t: str) -> IntegerType | StringType | FloatType | DateType:
...
return StringType()
def generateSchema(columns_dict: dict) -> StructType:
...
return schema
def evaluateDataQuality(spark: SparkSession, df: DataFrame, table: str) -> (DataFrame, bool):
...
return (
EvaluateDataQuality.apply(...)
.toDF(),
True,
)
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType
#This is the CSVReader code, what you wrote did not read or write properly. So here it is.
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
dynamicFrame = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": ["s3://s3path"]},
format="csv",
format_options={
"withHeader": True,
# "optimizePerformance": True,
},
)
dataFrame = spark.read\
.format("csv")\
.option("header", "true")\
.load("s3://s3path")
from pyspark.context import SparkContext
from awsglue.context import GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
glueContext.write_dynamic_frame.from_options(
frame=dynamicFrame,
connection_type="s3",
connection_options={"path": "s3://s3path"},
format="csv",
format_options={
"quoteChar": -1,
},
)
dataFrame.write\
.format("csv")\
.option("quote", None)\
.mode("append")\
.save("s3://s3path")
glueContext.create_dynamic_frame.from_options(
frame = datasource1,
connection_type = "s3",
connection_options = {"paths": ["s3://s3path"]},
format = "csv",
format_options={
"optimizePerformance": True,
"separator": ","
},
transformation_ctx = "datasink2")