I'm trying to save tweets from twitter to MongoDb database.
I have got RDD<Status>
and i'm trying to convert this one to JSON format with help ObjectMapper.But there is some problem with this transformation(
public class Main {
//set system credentials for access to twitter
private static void setTwitterOAuth() {
System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);
System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);
System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);
System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);
}
public static void main(String [] args) {
setTwitterOAuth();
SparkConf conf = new SparkConf().setMaster("local[2]")
.setAppName("SparkTwitter");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Stream that contains just tweets in english
JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());
enTweetsDStream.print();
jssc.start();
jssc.awaitTermination();
}
static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {
try {
ObjectMapper objectMapper = new ObjectMapper();
SQLContext sqlContext = new SQLContext(sparkContext);
JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));
DataFrame dataFrame = sqlContext.read().json(tweet);
Map<String, String> writeOverrides = new HashMap<>();
writeOverrides.put("uri", "mongodb://127.0.0.1/forensicdb.LiveRawTweets");
WriteConfig writeConfig = WriteConfig.create(sparkContext).withJavaOptions(writeOverrides);
MongoSpark.write(dataFrame).option("collection", "LiveRawTweets").mode("append").save();
} catch (Exception e) {
System.out.println("Error saving to database");
}
}
JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));
here is a problem.Incompatible types required JavaRDD<String>
but map was inferred to javaRDD<R>
Java type inference isn't always super smart unfortunately, so what I do in these cases is extracting all the bits of my lambda as variables until I find one that Java can't give an accurate type for. I then give the expression the type I think it should have and see why Java is complaining about it. Sometimes it will just be a limitation in the compiler and you'll have to explicitly "cast" the expression as the desired type, other times you'll find an issue with your code. In your case, the code ooks fine to me so there must be something else.
I have a comment however: here you are paying the cost of JSON serialization once (from Status
to JSON string) and then deserialization (from JSON string to Row
). Plus, you're not providing any schema to your Dataset
so it will have to make two passes of the data (or a sample of it depending on your config) to infer the schema. All that can be quite expensive if the data is large. I would advise you to write a conversion from Status
to Row
directly if performance is a concern and if Status
is relatively simple.
Another "by the way": you are implicitly serializing your ObjectMapper
, chances are you don't want to do that. It seems like the class does support Java serialization, but with special logic. Since the default config for Spark is to use Kryo (which has much better performance than Java serialization), I doubt it will do the right thing when using the default FieldSerializer
. You have three options:
ObjectMapper
with Java serialization. That would work but not worth the effort.