I am continuously add parquet data sets to an S3 folder with a structure like this:
s3:::my-bucket/public/data/set1
s3:::my-bucket/public/data/set2
s3:::my-bucket/public/data/set3
At the beginning I only have set1
and my crawler is configured to run on the whole bucket s3:::my-bucket
. This leads to the creation of a partitioned tabled named my-bucket
with partitions named public
, data
and set1
. What I actually want is to have a table named set1
without any partitions.
I see the reasons why this happens, as it is explained under How Does a Crawler Determine When to Create Partitions?. But when a new data set is uploaded (e.g. set2
) I don't want it to be another partition (because it is completely different data with a different schema).
How can I force the Glue crawler to NOT create partitions?
I know I could define the crawler path as s3:::my-bucket/public/data/
but unfortunately I don't know where the new data sets will be created (e.g. could also be s3:::my-bucket/other/folder/set2
).
Any ideas how to solve this?
My solution was to manually add the specific paths to the Glue crawler. The big picture is that I am using a Glue job to transform data from one S3 bucket and write it to another one. I now ended up to initially configure the Glue crawler to crawl the whole bucket. But every time the Glue transformation job runs it also updates the Glue crawler: it removes the initial full bucket location (if it still exists) and then adds the new path to the S3 targets.
In Python it looks something like this:
def update_target_paths(crawler):
"""
Remove initial include path (whole bucket) from paths and
add folder for current files to include paths.
"""
def path_is(c, p):
return c["Path"] == p
# get S3 targets and remove initial bucket target
s3_targets = list(
filter(
lambda c: not path_is(c, f"s3://{bucket_name}"),
crawler["Targets"]["S3Targets"],
)
)
# add new target path if not in targets yet
if not any(filter(lambda c: path_is(c, output_loc), s3_targets)):
s3_targets.append({"Path": output_loc})
logging.info("Appending path '%s' to Glue crawler include path.", output_loc)
crawler["Targets"]["S3Targets"] = s3_targets
return crawler
def remove_excessive_keys(crawler):
"""Remove keys from Glue crawler dict that are not needed/allowed to update the crawler"""
for k in ["State", "CrawlElapsedTime", "CreationTime", "LastUpdated", "LastCrawl", "Version"]:
try:
del crawler[k]
except KeyError:
logging.warning(f"Key '{k}' not in crawler result dictionary.")
return crawler
if __name__ == "__main__":
logging.info(f"Transforming from {input_loc} to {output_loc}.")
if prefix_exists(curated_zone_bucket_name, curated_zone_key):
logging.info("Target object already exists, appending.")
else:
logging.info("Target object doesn't exist, writing to new one.")
transform() # do data transformation and write to output bucket
while True:
try:
crawler = get_crawler(CRAWLER_NAME)
crawler = update_target_paths(crawler)
crawler = remove_excessive_keys(crawler)
# Update Glue crawler with updated include paths
glue_client.update_crawler(**crawler)
glue_client.start_crawler(Name=CRAWLER_NAME)
logging.info("Started Glue crawler '%s'.", CRAWLER_NAME)
break
except (
glue_client.exceptions.CrawlerRunningException,
glue_client.exceptions.InvalidInputException,
):
logging.warning("Crawler still running...")
time.sleep(10)
Variables defined defined globally: input_loc
, output_loc
, CRAWLER_NAME
, bucket_name
.
For every new data set a new path is added to the Glue crawler. No partitions will be created.