Search code examples
amazon-s3parquetaws-gluepartitioningglue-crawler

Force Glue Crawler to create separate tables


I am continuously add parquet data sets to an S3 folder with a structure like this:

s3:::my-bucket/public/data/set1
s3:::my-bucket/public/data/set2
s3:::my-bucket/public/data/set3

At the beginning I only have set1 and my crawler is configured to run on the whole bucket s3:::my-bucket. This leads to the creation of a partitioned tabled named my-bucket with partitions named public, data and set1. What I actually want is to have a table named set1 without any partitions. I see the reasons why this happens, as it is explained under How Does a Crawler Determine When to Create Partitions?. But when a new data set is uploaded (e.g. set2) I don't want it to be another partition (because it is completely different data with a different schema). How can I force the Glue crawler to NOT create partitions? I know I could define the crawler path as s3:::my-bucket/public/data/ but unfortunately I don't know where the new data sets will be created (e.g. could also be s3:::my-bucket/other/folder/set2).

Any ideas how to solve this?


Solution

  • My solution was to manually add the specific paths to the Glue crawler. The big picture is that I am using a Glue job to transform data from one S3 bucket and write it to another one. I now ended up to initially configure the Glue crawler to crawl the whole bucket. But every time the Glue transformation job runs it also updates the Glue crawler: it removes the initial full bucket location (if it still exists) and then adds the new path to the S3 targets.

    In Python it looks something like this:

    def update_target_paths(crawler):
        """
        Remove initial include path (whole bucket) from paths and
        add folder for current files to include paths.
        """
    
        def path_is(c, p):
            return c["Path"] == p
    
        # get S3 targets and remove initial bucket target
        s3_targets = list(
            filter(
                lambda c: not path_is(c, f"s3://{bucket_name}"),
                crawler["Targets"]["S3Targets"],
            )
        )
        # add new target path if not in targets yet
        if not any(filter(lambda c: path_is(c, output_loc), s3_targets)):
            s3_targets.append({"Path": output_loc})
            logging.info("Appending path '%s' to Glue crawler include path.", output_loc)
        crawler["Targets"]["S3Targets"] = s3_targets
        return crawler
    
    
    def remove_excessive_keys(crawler):
        """Remove keys from Glue crawler dict that are not needed/allowed to update the crawler"""
        for k in ["State", "CrawlElapsedTime", "CreationTime", "LastUpdated", "LastCrawl", "Version"]:
            try:
                del crawler[k]
            except KeyError:
                logging.warning(f"Key '{k}' not in crawler result dictionary.")
        return crawler
    
    
    if __name__ == "__main__":
        logging.info(f"Transforming from {input_loc} to {output_loc}.")
        if prefix_exists(curated_zone_bucket_name, curated_zone_key):
            logging.info("Target object already exists, appending.")
        else:
            logging.info("Target object doesn't exist, writing to new one.")
        transform() # do data transformation and write to output bucket
        while True:
            try:
                crawler = get_crawler(CRAWLER_NAME)
                crawler = update_target_paths(crawler)
                crawler = remove_excessive_keys(crawler)
    
                # Update Glue crawler with updated include paths
                glue_client.update_crawler(**crawler)
    
                glue_client.start_crawler(Name=CRAWLER_NAME)
                logging.info("Started Glue crawler '%s'.", CRAWLER_NAME)
                break
            except (
                glue_client.exceptions.CrawlerRunningException,
                glue_client.exceptions.InvalidInputException,
            ):
                logging.warning("Crawler still running...")
                time.sleep(10)
    

    Variables defined defined globally: input_loc, output_loc, CRAWLER_NAME, bucket_name.

    For every new data set a new path is added to the Glue crawler. No partitions will be created.