Search code examples
apache-sparkpysparkapache-spark-sql

Spark reading from mutiple SQL databases in parallel


I have a scenario where I need to load large datasets from several SQL databases (each with its own endpoint and connection) into a Spark cluster. Given the size of these datasets (billions of rows), I want to maximize efficiency by loading the data in parallel rather than sequentially, to avoid overloading a single database and to reduce the overall loading time.

Is it feasible to perform this parallel data loading into Spark? If so, what are the best practices or methods for achieving this? Should this process be managed manually, or are there automated solutions or frameworks that can help?

Any guidance or suggestions on how to approach this would be greatly appreciated!


Solution

  • Since you mentioned each database has its own connection and you want to trigger fetch for all in parallel, you can use threadpool to do it. I optimized reading around 300+ tables with 1Million rows with below code. You need to test performance with varying fetch size.

    Also, if you know spark push down filters at source level if it can so that it can push minimized data, try to use that. Push down will not work when you use cast, some calculations on filter columns or filter column is of struct type.

    If you have partitioning logic at DB try to use that, in my case SQL DB has 3 partitions and I wanted to get data from only 1 partition and hence I used partition hint in SQL query.

    If you want to do some transformations on tables, then do not perform them at this level as it will increase your time and first save these tables somewhere on hive db or delta lake and then do transformation on them, you will get more speed.

    from multiprocessing.pool import ThreadPool
    from itertools import product
    
    def query_table(params):
       connection, table_name = params
       username = connection['username']
       password = connection['password']
       host = connection['host']
       url = f"jdbc:oracle:thin:{username}/{password}@{host}"
       query_str = f"SELECT * FROM {table_name}"
       driver = "oracle.jdbc.driver.OracleDriver"
       fetch_size = 30000
    
       df = spark.read.format("jdbc") \
           .option("url", url) \
           .option("query", query_str) \
           .option("user", username) \
           .option("password", password) \
           .option("fetchsize", str(fetch_size)) \
           .option("driver", driver) \
           .load()
       return df
    
    connection_lists = [
     {'username': 'u1', 'password': 'p1', 'host': 'hostname1:port1/servicename1'},
     {'username': 'u2', 'password': 'p2', 'host': 'hostname2:port2/servicename2'},
     {'username': 'u3', 'password': 'p3', 'host': 'hostname2:port2/servicename2'}
    ]
    table_names = ['table1', 'table2', 'table3']
    
    with ThreadPool(150) as pool:
        dataframes = pool.map(query_table, product(connection_lists, table_names))