I got an exception while executing the below code snippet. The dataset i was working with is "stocks.csv" and which has the columns - date, symbol, volume, open, close, high, low and adjclose
val stock =
sc.textFile("C:/Users/kondr/Desktop/stocks/stocks.csv")
val splits = stock.map(record => record.split(","))
val symvol = splits.map(arr => (arr(1),arr(2).toInt))
val maxvol = symvol.reduceByKey((vol1,vol2) =>
Math.max(vol1,vol2),1)
maxvol.collect().foreach(println)
Error Message
21/05/05 14:09:31 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.NumberFormatException: For input string: "volume" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
Here is how you can skip the first row
stock.zipWithIndex().filter(_._2 != 0)
.map(_._1)
.map(record => record.split(" "))
.map(arr => (arr(1),arr(2).toInt))
.reduceByKey((vol1,vol2) => Math.max(vol1,vol2),1)
Or you can directly read it to dataframe as below
val csvDF = spark.read
.option("header", true)
.option("delimiter", " ")
.csv("stock.txt")
csvDF.show(false)
Output:
+----------+------+-------+-----------+-----------+-----------+-----------+-----------+
|date |symbol|volume |open |close |high |low |adjclose |
+----------+------+-------+-----------+-----------+-----------+-----------+-----------+
|18-04-2019|A |2874100|75.73000336|76.16999817|76.54000092|75.30999756|76.16999817|
|17-04-2019|A |4472000|78.15000153|75.43000031|78.31999969|74.45999908|75.43000031|
|16-04-2019|A |3441500|80.81999969|77.55000305|80.95999908|77.19000244|77.55000305|
|15-04-2019|A |1627300|81 |80.40000153|81.12999725|79.91000366|80.40000153|
|12-04-2019|A |1249300|81.43000031|80.98000336|82.05999756|80.90000153|80.98000336|
+----------+------+-------+-----------+-----------+-----------+-----------+-----------+