I'm using the map
method of DynamicFrame (or, equivalently, the Map.apply
method). I've noticed that any errors in the function that I pass to these functions are silently ignored and cause the returned DynamicFrame to be empty.
Say I have a job script like this:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database="radixdemo", table_name="census_csv")
def my_mapper(rec):
import logging
logging.error("[RADIX] An error-log from in the mapper!")
print "[RADIX] from in the mapper!"
raise Exception("[RADIX] A bug!")
dyF = dyF.map(my_mapper, 'my_mapper')
print "Count: ", dyF.count()
dyF.printSchema()
dyF.toDF().show()
If I run this script in my Glue Dev Endpoint with gluepython
, I get output like this:
[glue@ip-172-31-83-196 ~]$ gluepython gluejob.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/05/23 20:56:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Count: 0
root
++
||
++
++
Notes about this output:
print
statement or the logging.error
statement.my_mapper
raised an exception.printSchema
call is showing that there is no schema metadata on the produced DynamicFrameshow
method also isn't producing any output, indicating that all the rows are gone.Likewise, when I save this script as a job in the AWS Glue console, and run it, the job doesn't indicate any error occurred -- The Job Status is "Succeeded". Notably, I do get the print
statements and logging.error
calls output to the job logs, but only in the regular "Logs", not the "Error Logs".
What I want is to be able to indicate that my job has failed, and to be able to easily find these error logs. Most important is to just indicate that it has failed.
Is there a way to log an error within a mapped function in such a way that Glue will pick it up as an "Error Log" (and put it in that separate AWS CloudWatch Logs path)? If this happens, will it automatically mark the entire Job as Failing? Or is there some other way to explicitly fail the job from within a mapped function?
(my plan, if there is a way to log errors and/or mark the job as failed, is to create a decorator or other utility function that will automatically catch exceptions in my mapped functions and ensure that they are logged & marked as a failure).
The only way I have discovered to make a Glue job show up as "Failed" is to raise an exception from the main script (not inside a mapper or filter function, as those seem to get spun out to the Data Processing Units).
Fortunately, there is a way to detect if an exception occurred inside of a map or filter function: using the DynamicFrame.stageErrorsCount()
method. It will return a number indicating how many exceptions were raised while running the most recent transformation.
So the correct way to solve all the problems:
try/except
statements in every single function you write.stageErrorsCount()
method and check if it's greater than 0. If you want to abort the job, just raise an exception.For example:
import logging
def log_errors(inner):
def wrapper(*args, **kwargs):
try:
return inner(*args, **kwargs)
except Exception as e:
logging.exception('Error in function: {}'.format(inner))
raise
return wrapper
@log_errors
def foo(record):
1 / 0
Then, inside your job, you'd do something like:
df = df.map(foo, "foo")
if df.stageErrorsCount() > 0:
raise Exception("Error in job! See the log!")
Note that even calling logging.exception
from inside the mapper function still doesn't write the logs to the error log in AWS CloudWatch Logs, for some reason. It gets written to the regular success logs. However, with this technique you will at least see that the job failed and be able to find the info in the logs. Another caveat: Dev Endpoints don't seem to show ANY logs from the mapper or filter functions.