Search code examples
pythonpysparkgoogle-bigqueryaws-glueamazon-keyspaces

Spark ETL Large data transfer - how to parallelize


I am looking to move a large amount of data from one db to another and I have seen that Spark is a good tool for doing this. I am trying to understand the process and the ideology behind Spark's big data ETL's. Would also appreciate if someone could explain how Spark goes about parallelizing (or splitting) the data in to the various jobs that it spawns. My main aim is to move the data from BigQuery to Amazon Keyspaces - and the data is around 40Gigs.

I am putting here the understanding I have gathered already from online.

This is the code to read the data from Bigquery.

from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-bigquery-ks') \
  .getOrCreate()

spark.conf.set("credentialsFile", "./cred.json")

# Load data from BigQuery.
df = spark.read.format('bigquery') \
  .option('parentProject','project-id') \
  .option('dataset', 'mydataset') \
  .option('table', 'mytable') \
  .option('query', 'SELECT * from mytable LIMIT 10') \
  .load()


print(df.head())

Now I need to figure out the best way to transform the data (which is super easy and I can do that myself) - but my most important question is regarding the batching and handling of such a large data set. Are there any considerations that I need to have to move such data ( which wont fit in memory ) to KeySpaces.

from ssl import SSLContext, CERT_REQUIRED, PROTOCOL_TLSv1_2
import boto3
from boto3 import Session
from cassandra_sigv4.auth import AuthProvider, Authenticator, SigV4AuthProvider
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT


ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations('./AmazonRootCA1.pem')
ssl_context.verify_mode = CERT_REQUIRED
boto_session = boto3.Session(aws_access_key_id="accesstoken",
                             aws_secret_access_key="accesskey",
                             aws_session_token="token",
                             region_name="region")
auth_provider = SigV4AuthProvider(boto_session)
cluster = Cluster(['cassandra.region.amazonaws.com'], ssl_context=ssl_context, auth_provider=auth_provider,
                  port=9142)
session = cluster.connect()

and finally to push the data to keyspaces would look something like this code.

# Write data to Amazon Keyspaces
for index, row in pdf.iterrows():
    keyfamilyid = row["keyfamilyid"]
    recommendedfamilyid = row["recommendedfamilyid"]
    rank = row["rank"]
    chi = row["chi"]
    recommendationtype = row["recommendationtype"]
    title = row["title"]
    location = row["location"]
    typepriority = row["typepriority"]
    customerid = row["customerid"]
    insert_query = f"INSERT INTO {keyspace_name}.{table_name} (keyfamilyid, recommendedfamilyid, rank, chi, recommendationtype, title, location, typepriority, customerid) VALUES ('{keyfamilyid}', '{recommendedfamilyid}', {rank}, {chi}, '{recommendationtype}', '{title}', '{location}', '{typepriority}', '{customerid}')"
    try:
        client.execute(insert_query)
    except ClientError as e:
        print(f"Error writing data for row {index}: {e.response['Error']['Message']}")


Solution

  • You can use the spark cassandra connector to copy data to Amazon Keyspaces. In the following example I write to Keyspaces from S3 but big query would be similar. The biggest thing with writing to Keyspaces would be to do the following.

    1. Prewarm the tables using provisioned capacity mode. Provision a high amount to ensure the table has enough resources at intial write rate.

    2. Shuffle the data from bigtable since it would most likely be exported in sorted order. Inserting to NoSQL you should write in a random access pattern

    object GlueApp {
    
      def main(sysArgs: Array[String]) {
    
      val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF", "FORMAT", "S3_URI").toArray)
    
      val driverConfFileName = args("DRIVER_CONF")
    
      val conf = new SparkConf()
          .setAll(
           Seq(
               ("spark.task.maxFailures",  "10"),
                 
              
              ("spark.cassandra.connection.config.profile.path",  driverConfFileName),
              ("spark.cassandra.query.retry.count", "1000"),
              ("spark.cassandra.output.consistency.level",  "LOCAL_QUORUM"),
    
              ("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
              ("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
              ("spark.cassandra.concurrent.reads", "512"),
    
              ("spark.cassandra.output.concurrent.writes", "15"),
              ("spark.cassandra.output.batch.grouping.key", "none"),
              ("spark.cassandra.output.batch.size.rows", "1")
          ))
    
        val spark: SparkContext = new SparkContext(conf)
        val glueContext: GlueContext = new GlueContext(spark)
        val sparkSession: SparkSession = glueContext.getSparkSession
    
        import sparkSession.implicits._
    
        Job.init(args("JOB_NAME"), glueContext, args.asJava)
        
        val logger = new GlueLogger
        
        //validation steps for peers and partitioner 
        val connector = CassandraConnector.apply(conf);
        val session = connector.openSession();
        val peersCount = session.execute("SELECT * FROM system.peers").all().size()
        
        val partitioner = session.execute("SELECT partitioner from system.local").one().getString("partitioner")
        
        logger.info("Total number of seeds:" + peersCount);
        logger.info("Configured partitioner:" + partitioner);
        
        if(peersCount == 0){
           throw new Exception("No system peers found. Check required permissions to read from the system.peers table. If using VPCE check permissions for describing VPCE endpoints. https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html")
        }
        
        if(partitioner.equals("com.amazonaws.cassandra.DefaultPartitioner")){
            throw new Exception("Sark requires the use of RandomPartitioner or Murmur3Partitioner. See Working with partioners in Amazon Keyspaces documentation. https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-partitioners.html")
        }
        
        val tableName = args("TABLE_NAME")
        val keyspaceName = args("KEYSPACE_NAME")
        val backupFormat = args("FORMAT")
        val s3bucketBackupsLocation = args("S3_URI")    
    
    
        val orderedData = sparkSession.read.format(backupFormat).load(s3bucketBackupsLocation)  
    
       //You want randomize data before loading to maximize table throughput and avoid WriteThottleEvents   
       //Data exported from another database or Cassandra may be ordered by primary key.    
       //With Amazon Keyspaces you want to load data in a random way to use all available resources.    
       //The following command will randomize the data. 
       val shuffledData = orderedData.orderBy(rand())   
    
       shuffledData.write.format("org.apache.spark.sql.cassandra").mode("append").option("keyspace", keyspaceName).option("table", tableName).save()
    
       Job.commit()
      }
    }