I am trying to do analysis on Twitter Tweet data with Apache Spark from a file of JSON Tweet objects.
Here's how I'm loading it in with Spark's jsonFile method:
val sqc = new org.apache.spark.sql.SQLContext(sc)
val tweets = sqc.jsonFile("stored_tweets/*.json")
tweets.registerTempTable("tweets")
Next, I sample only the hashtag entities with the following line:
val hashtags = sqc.sql("SELECT entities.hashtags FROM tweets LIMIT 3")
hashtags.take(1)
The result is:
res14: Array[org.apache.spark.sql.Row] = Array([ArrayBuffer([ArrayBuffer(43, 50),online], [ArrayBuffer(51, 61),marketing], [ArrayBuffer(88, 102),growthhacking], [ArrayBuffer(103, 111),inbound], [ArrayBuffer(112, 120),startup], [ArrayBuffer(121, 138),contentmarketing])])
If you look close, the data is there, but it's wrapped in Array([ArrayBuffer(xx,yy), hashtag).
Some people suggested using .map() or .flatMap() with or without some custom function using the .getAs() method, but I don't get how that's supposed to work.
Any ideas?
UPDATE May 23rd:
Been going through Spark docs. Still no progress. The Spark SQL Row documentation (https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/sql/api/java/Row.html) suggests using code such as
import org.apache.spark.sql._
val row = hashtags.take(1)
row(0)
However, in this case, that yields
res124: org.apache.spark.sql.Row = [ArrayBuffer([ArrayBuffer(43, 50),online], [ArrayBuffer(51, 61),marketing], [ArrayBuffer(88, 102),growthhacking], [ArrayBuffer(103, 111),inbound], [ArrayBuffer(112, 120),startup], [ArrayBuffer(121, 138),contentmarketing])]
Here, this StackOverflow post (org.apache.spark.sql.Row to Int) suggests using the .get() family of methods such as .getString() but my attempts in that didn't produce much results:
row(0).getString(0)
Yields:
java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot be cast to java.lang.String
And,
row(0).getString(1)
Yields:
:28: error: value getString is not a member of Any row(0).getString(1)
And
row(0)(0)
Yields
res184: Any = ArrayBuffer([ArrayBuffer(43, 50),online], [ArrayBuffer(51, 61),marketing], [ArrayBuffer(88, 102),growthhacking], [ArrayBuffer(103, 111),inbound], [ArrayBuffer(112, 120),startup], [ArrayBuffer(121, 138),contentmarketing])
But then
row(0)(0)(0)
Yields
:28: error: Any does not take parameters row(0)(0)(0)
So, still stuck.
Update May 24th:
After also trying to go the route of using the .textFile() (non Spark SQL way) and using native Scala JSON parsing capability, as instructed here: Spark SQL - How to select on dates stored as UTC millis from the epoch?, and getting stuck with Spark json4s compatibility issues described here: https://github.com/json4s/json4s/issues/212, I decided to try to use Python and pyspark if that would solve these problems.
Update 2, May 24th:
With help from a friend who suggested trying something like:
import scala.collection.mutable.ArrayBuffer
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).asInstanceOf[ArrayBuffer[Any, String]]
I finally found some progress, since this worked:
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0)
And produced:
res53: Any = [ArrayBuffer(43, 50),online]
However, when proceeding with, as suggested:
val a = row(0)(0).asInstanceOf[ArrayBuffer[Any]](0)
a.asInstanceOf[ArrayBuffer[Any, String]]
The result is frustratingly:
22: error: wrong number of type arguments for scala.collection.mutable.ArrayBuffer, should be 1 a.asInstanceOf[ArrayBuffer[Any, String]]
And trying this:
a.asInstanceOf[ArrayBuffer[Any]]
Yields:
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRow cannot be cast to scala.collection.mutable.ArrayBuffer
Again, stuck.
Update 3, May 24th:
So, after getting some help from a friend, I got two possible solutions, neither of which are direct answers to the original question, but do "kind of" solve the problem anyway.
Option 1 (the easy option, with Python): Use pyspark – there you can just say:
row[0][0][1]
Option 2 (the ugly solution in Scala):
val solution = row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).toString().split(" ")(1).split(",")(1).split("]")(0)
Which yields:
scala> solution res28: String = online
The reason why I had to go with toString() instead of going throughout with .asInstanceOf() was that the final object wrapping was:
Any = [ArrayBuffer(43, 50),online]
..and we couldn't find an .asInstanceOf() -approach to that. Here's the things we tried:
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).asInstanceOf[ArrayBuffer[Any, String]]
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).asInstanceOf[Row](0).getString(1)
row(0)(0).asInstanceOf[ArrayBuffer[Any]](0).instanceOf[Array[ArrayBuffer[Any], String]]
..but none of them worked.
Still, I'm hoping there would be a much more elegant way of doing this in Scala, since the whole "pipeline building nature" of Spark + Scala is what attracted me to the package in the first place..
As replied on Twitter. This Twitter schema is too nested so it is pretty complicated in general. However, we cann add support for accessor by name for the complicated nested fields in the future to simplify this.
Both Python and Scala version attached.