Below is the code where I am trying to read data from dynamo db and load it into a dataframe.
Is it possible to do the same using scanamo?
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable
var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "GenreRatingCounts") // Pointing to DynamoDB table
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-2.amazonaws.com")
jobConf.set("dynamodb.regionid", "us-east-2")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")
jobConf.set("dynamodb.version", "2011-12-05")
jobConf.set("dynamodb.awsAccessKeyId", "XXXXX")
jobConf.set("dynamodb.awsSecretAccessKey", "XXXXXXX")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
var orders = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
orders.map(t => t._2.getItem()).collect.foreach(println)
val simple2: RDD[(String)] = orders.map { case (text, dbwritable) => (dbwritable.toString)}
spark.read.json(simple2).registerTempTable("gooddata")
The output is of type: org.apache.spark.sql.DataFrame = [count: struct<n: string>, genre: struct<s: string> ... 1 more field]
+------+---------+------+
| count| genre|rating|
+------+---------+------+
|[4450]| [Action]| [4]|
|[5548]|[Romance]| [3.5]|
+------+---------+------+
How can I convert this dataframe column types to String instead of Struct?
EDIT-1
Now I am able to create dataframe using below code and able to read data from dynamodb table if it doesn't contain null.
var orders = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
def extractValue : (String => String) = (aws:String) => {
val pat_value = "\\s(.*),".r
val matcher = pat_value.findFirstMatchIn(aws)
matcher match {
case Some(number) => number.group(1).toString
case None => ""
}
}
val col_extractValue = udf(extractValue)
val rdd_add = orders.map {
case (text, dbwritable) => (dbwritable.getItem().get("genre").toString(), dbwritable.getItem().get("rating").toString(),dbwritable.getItem().get("ratingCount").toString())
val df_add = rdd_add.toDF()
.withColumn("genre", col_extractValue($"_1"))
.withColumn("rating", col_extractValue($"_2"))
.withColumn("ratingCount", col_extractValue($"_3"))
.select("genre","rating","ratingCount")
df_add.show
But I am getting below error if there is a record with no data in one of the column(null or blank).
ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 14)
java.lang.NullPointerException
at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:67)
at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:66)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/12/20 07:48:21 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 14, localhost, executor driver): java.lang.NullPointerException
at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:67)
at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:66)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
How to handle null/blank while reading from Dynamodb to a dataframe in Spark/Scala?
After lots of trial and error, Below is the solution I have implemented. I am still getting error while reading from Dynamodb if any column is having blank (no data) using RDD to DATAFRAME. So, I made sure that instead of keeping that column blank, I write null.
Other option to handle that would be creating EXTERNAL HIVE tables on DYNAMO DB tables and then read from them.
Below is the code to first write the data into DYNAMO DB and then READ it back using SPARK/SCALA.
package com.esol.main
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.util.matching.Regex
import java.util.HashMap
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
object dynamoDB {
def main(args: Array[String]): Unit = {
// val enum = Configurations.initializer()
//val table_name = args(0).trim()
implicit val spark = SparkSession.builder().appName("dynamoDB").master("local").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
// Writing data into table
var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.output.tableName", "eSol_MapSourceToRaw")
jobConf.set("dynamodb.throughput.write.percent", "0.5")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("dynamodb.awsAccessKeyId", "XXXXXXXX")
jobConf.set("dynamodb.awsSecretAccessKey", "XXXXXX")
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "us-east-1")
jobConf.set("dynamodb.servicename", "dynamodb")
//giving column names is mandatory in below query. else it will fail.
var MapSourceToRaw = spark.sql("select RowKey,ProcessName,SourceType,Source,FileType,FilePath,FileName,SourceColumnDelimeter,SourceRowDelimeter,SourceColumn,TargetTable,TargetColumnFamily,TargetColumn,ColumnList,SourceColumnSequence,UniqueFlag,SourceHeader from dynamo.hive_MapSourceToRaw")
println("read data from hive table : "+ MapSourceToRaw.show())
val df_columns = MapSourceToRaw.columns.toList
var ddbInsertFormattedRDD = MapSourceToRaw.rdd.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
for(i <- 0 to df_columns.size -1)
{
val col=df_columns(i)
var column= new AttributeValue()
if(a.get(i) == null || a.get(i).toString.isEmpty)
{ column.setS("null")
ddbMap.put(col, column)
}
else
{
column.setS(a.get(i).toString)
ddbMap.put(col, column)
} }
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
})
println("ready to write into table")
ddbInsertFormattedRDD.saveAsHadoopDataset(jobConf)
println("data written in dynamo db")
// READING DATA BACK
println("reading data from dynamo db")
jobConf.set("dynamodb.input.tableName", "eSol_MapSourceToRaw")
def extractValue : (String => String) = (aws:String) => {
val pat_value = "\\s(.*),".r
val matcher = pat_value.findFirstMatchIn(aws)
matcher match {
case Some(number) => number.group(1).toString
case None => ""
}
}
val col_extractValue = udf(extractValue)
var dynamoTable = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
val rdd_add = dynamoTable.map {
case (text, dbwritable) => (dbwritable.getItem().get("RowKey").toString(), dbwritable.getItem().get("ProcessName").toString(),dbwritable.getItem().get("SourceType").toString(),
dbwritable.getItem().get("Source").toString(),dbwritable.getItem().get("FileType").toString(),
dbwritable.getItem().get("FilePath").toString(),dbwritable.getItem().get("TargetColumn").toString())
}
val df_add = rdd_add.toDF()
.withColumn("RowKey", col_extractValue($"_1"))
.withColumn("ProcessName", col_extractValue($"_2"))
.withColumn("SourceType", col_extractValue($"_3"))
.withColumn("Source", col_extractValue($"_4"))
.withColumn("FileType", col_extractValue($"_5"))
.withColumn("FilePath", col_extractValue($"_6"))
.withColumn("TargetColumn", col_extractValue($"_7"))
.select("RowKey","ProcessName","SourceType","Source","FileType","FilePath","TargetColumn")
df_add.show
}
}