Search code examples
scalaapache-sparkamazon-dynamodbdynamodb-queries

How to read data from dynamo db table into dataframe?


Below is the code where I am trying to read data from dynamo db and load it into a dataframe.

Is it possible to do the same using scanamo?

import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable

var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "GenreRatingCounts")   // Pointing to DynamoDB table
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-2.amazonaws.com")
jobConf.set("dynamodb.regionid", "us-east-2")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")
jobConf.set("dynamodb.version", "2011-12-05")
jobConf.set("dynamodb.awsAccessKeyId", "XXXXX")
jobConf.set("dynamodb.awsSecretAccessKey", "XXXXXXX")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")

var orders = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

orders.map(t => t._2.getItem()).collect.foreach(println)    
val simple2: RDD[(String)] = orders.map { case (text, dbwritable) => (dbwritable.toString)}
spark.read.json(simple2).registerTempTable("gooddata")

The output is of type: org.apache.spark.sql.DataFrame = [count: struct<n: string>, genre: struct<s: string> ... 1 more field]

+------+---------+------+
| count|    genre|rating|
+------+---------+------+
|[4450]| [Action]|   [4]|
|[5548]|[Romance]| [3.5]|
+------+---------+------+

How can I convert this dataframe column types to String instead of Struct?

EDIT-1

Now I am able to create dataframe using below code and able to read data from dynamodb table if it doesn't contain null.

var orders = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

def extractValue : (String => String) = (aws:String) => {
    val pat_value = "\\s(.*),".r

    val matcher = pat_value.findFirstMatchIn(aws)
                matcher match {
                case Some(number) => number.group(1).toString
                case None => ""
        }
  }
val col_extractValue = udf(extractValue)

val rdd_add = orders.map {
      case (text, dbwritable) => (dbwritable.getItem().get("genre").toString(), dbwritable.getItem().get("rating").toString(),dbwritable.getItem().get("ratingCount").toString())

val df_add = rdd_add.toDF()
                  .withColumn("genre", col_extractValue($"_1"))
                  .withColumn("rating", col_extractValue($"_2"))
                  .withColumn("ratingCount", col_extractValue($"_3"))
                  .select("genre","rating","ratingCount")

df_add.show

But I am getting below error if there is a record with no data in one of the column(null or blank).

 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 14)
java.lang.NullPointerException
        at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:67)
        at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:66)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
19/12/20 07:48:21 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 14, localhost, executor driver): java.lang.NullPointerException
        at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:67)
        at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:66)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

How to handle null/blank while reading from Dynamodb to a dataframe in Spark/Scala?


Solution

  • After lots of trial and error, Below is the solution I have implemented. I am still getting error while reading from Dynamodb if any column is having blank (no data) using RDD to DATAFRAME. So, I made sure that instead of keeping that column blank, I write null.

    Other option to handle that would be creating EXTERNAL HIVE tables on DYNAMO DB tables and then read from them.

    Below is the code to first write the data into DYNAMO DB and then READ it back using SPARK/SCALA.

    package com.esol.main
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.rdd.RDD
    import scala.util.matching.Regex
    import java.util.HashMap
    import com.amazonaws.services.dynamodbv2.model.AttributeValue
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.dynamodb.DynamoDBItemWritable
    import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
    import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
    import org.apache.hadoop.mapred.JobConf
    import org.apache.hadoop.io
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    object dynamoDB {
      def main(args: Array[String]): Unit = {
       // val enum = Configurations.initializer() 
        //val table_name = args(0).trim()
        implicit val spark = SparkSession.builder().appName("dynamoDB").master("local").getOrCreate()
        val sc = spark.sparkContext
        import spark.implicits._             
    
    // Writing data into table    
    var jobConf = new JobConf(sc.hadoopConfiguration)
    jobConf.set("dynamodb.output.tableName", "eSol_MapSourceToRaw")
    jobConf.set("dynamodb.throughput.write.percent", "0.5")
    jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
    jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
    jobConf.set("dynamodb.awsAccessKeyId", "XXXXXXXX")
    jobConf.set("dynamodb.awsSecretAccessKey", "XXXXXX")
    jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
    jobConf.set("dynamodb.regionid", "us-east-1")
    jobConf.set("dynamodb.servicename", "dynamodb")
    
    //giving column names is mandatory in below query. else it will fail.
    var MapSourceToRaw = spark.sql("select RowKey,ProcessName,SourceType,Source,FileType,FilePath,FileName,SourceColumnDelimeter,SourceRowDelimeter,SourceColumn,TargetTable,TargetColumnFamily,TargetColumn,ColumnList,SourceColumnSequence,UniqueFlag,SourceHeader from dynamo.hive_MapSourceToRaw")
    
     println("read data from hive table  : "+ MapSourceToRaw.show())
    val df_columns = MapSourceToRaw.columns.toList
    
    var ddbInsertFormattedRDD = MapSourceToRaw.rdd.map(a => {
    var ddbMap = new HashMap[String, AttributeValue]()
    for(i <- 0 to df_columns.size -1)
    {
    val col=df_columns(i)
    var column= new AttributeValue()
    if(a.get(i) == null || a.get(i).toString.isEmpty)
    { column.setS("null")
    ddbMap.put(col, column)
    }
    else
    {
    column.setS(a.get(i).toString)
    ddbMap.put(col, column)
    } }
    var item = new DynamoDBItemWritable()
    item.setItem(ddbMap)
    (new Text(""), item)
    })
    println("ready to write into table")
    
    
    ddbInsertFormattedRDD.saveAsHadoopDataset(jobConf)
    
    println("data written in dynamo db")
    
    // READING DATA BACK
    println("reading data from dynamo db")
    jobConf.set("dynamodb.input.tableName", "eSol_MapSourceToRaw")
    
    def extractValue : (String => String) = (aws:String) => {
        val pat_value = "\\s(.*),".r
    
        val matcher = pat_value.findFirstMatchIn(aws)
                    matcher match {
                    case Some(number) => number.group(1).toString
                    case None => ""
            }
      }
    val col_extractValue = udf(extractValue)
     var dynamoTable = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    val rdd_add = dynamoTable.map {
          case (text, dbwritable) => (dbwritable.getItem().get("RowKey").toString(), dbwritable.getItem().get("ProcessName").toString(),dbwritable.getItem().get("SourceType").toString(),
                 dbwritable.getItem().get("Source").toString(),dbwritable.getItem().get("FileType").toString(),
                 dbwritable.getItem().get("FilePath").toString(),dbwritable.getItem().get("TargetColumn").toString())
          }
    val df_add = rdd_add.toDF()
                      .withColumn("RowKey", col_extractValue($"_1"))
                      .withColumn("ProcessName", col_extractValue($"_2"))
                                                  .withColumn("SourceType", col_extractValue($"_3"))
                                                  .withColumn("Source", col_extractValue($"_4"))
                                                  .withColumn("FileType", col_extractValue($"_5"))
                                                  .withColumn("FilePath", col_extractValue($"_6"))
                                                  .withColumn("TargetColumn", col_extractValue($"_7"))
                      .select("RowKey","ProcessName","SourceType","Source","FileType","FilePath","TargetColumn") 
    df_add.show
    }
    
    }