Search code examples
pythonamazon-web-servicesamazon-emraws-glueamazon-sagemaker

Integrating The Amazon SageMaker Endpoints, into Batch ETL workflows on Glue or EMR


How best can we have the said AWS Sagemaker ML model Endpoint configured via Glue, EMR based Spark Jobs ?

As we see in AWS Documentation 'here' , An End point names as 'linear-learner-2019-11-04-01-57-20-572' is created.It can be invoked as

  response = client.invoke_endpoint(EndpointName='linear-learner-2019-11-04-01-57-20-572',
ContentType='text/csv',Body=values)

However, assuming we have a batch job such that

  • scheduled batch job on a Big Data , Reads the data from a S3, where
  • it undergo a transformation of adding a new column as prediction
  • result Output stored as S3.
  • Could be triggered on Daily basis, or On Arrival of a new file in source folder

How best can we have the said Endpoint configured via Glue, EMR based Spark Jobs ?


Solution

  • from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql import SparkSession
    
    # Initialize Spark and Glue contexts
    spark_context = SparkContext()
    glue_context = GlueContext(spark_context)
    spark = glue_context.spark_session
    
    # Specify your SageMaker endpoint
    endpoint_name = 'your-sagemaker-endpoint-name'
    
    # Read data from S3 into a Spark DataFrame
    s3_path = 's3://your-bucket/your-data-prefix/'
    data_frame = spark.read.format('parquet').load(s3_path)
    
    # Define a UDF  to invoke the SageMaker endpoint
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    def invoke_sagemaker_udf(input_data):
        from sagemaker.predictor import Predictor
        predictor = Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
        result = predictor.predict(input_data)
        return result
    
    # Register the UDF
    sagemaker_udf = udf(invoke_sagemaker_udf, StringType())
    
    # Apply the UDF to the DataFrame
    result_df = data_frame.withColumn("prediction", sagemaker_udf("input_data"))
    
    # Perform any additional transformations if needed
    # ...
    
    # Write the result back to S3 or another destination
    output_s3_path = 's3://your-bucket/your-output-prefix/'
    result_df.write.format('parquet').mode('overwrite').save(output_s3_path)