Search code examples
javajsonapache-sparkspark-streamingobjectmapper

Problem with transformation JavaRDD<Status> to JavaRDD<String>


I'm trying to save tweets from twitter to MongoDb database.

I have got RDD<Status> and i'm trying to convert this one to JSON format with help ObjectMapper.But there is some problem with this transformation(

public class Main {


    //set system credentials for access to twitter
    private static void setTwitterOAuth() {
        System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);
        System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);
        System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);
        System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);
    }


    public static void main(String [] args) {

        setTwitterOAuth();

        SparkConf conf = new SparkConf().setMaster("local[2]")
                                        .setAppName("SparkTwitter");
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));
        JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);

        //Stream that contains just tweets in english
        JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
        enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());


        enTweetsDStream.print();
        jssc.start();
        jssc.awaitTermination();
    }

    static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {
     try {
            ObjectMapper objectMapper = new ObjectMapper();
            SQLContext sqlContext = new SQLContext(sparkContext);
            JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));

            DataFrame dataFrame = sqlContext.read().json(tweet);

            Map<String, String> writeOverrides = new HashMap<>();
            writeOverrides.put("uri", "mongodb://127.0.0.1/forensicdb.LiveRawTweets");
            WriteConfig writeConfig = WriteConfig.create(sparkContext).withJavaOptions(writeOverrides);
            MongoSpark.write(dataFrame).option("collection", "LiveRawTweets").mode("append").save();

        } catch (Exception e) {
            System.out.println("Error saving to database");
        }
    }

JavaRDD<String> tweet =  rdd.map(status -> objectMapper.writeValueAsString(status));

here is a problem.Incompatible types required JavaRDD<String> but map was inferred to javaRDD<R>


Solution

  • Java type inference isn't always super smart unfortunately, so what I do in these cases is extracting all the bits of my lambda as variables until I find one that Java can't give an accurate type for. I then give the expression the type I think it should have and see why Java is complaining about it. Sometimes it will just be a limitation in the compiler and you'll have to explicitly "cast" the expression as the desired type, other times you'll find an issue with your code. In your case, the code ooks fine to me so there must be something else.

    I have a comment however: here you are paying the cost of JSON serialization once (from Status to JSON string) and then deserialization (from JSON string to Row). Plus, you're not providing any schema to your Dataset so it will have to make two passes of the data (or a sample of it depending on your config) to infer the schema. All that can be quite expensive if the data is large. I would advise you to write a conversion from Status to Row directly if performance is a concern and if Status is relatively simple.

    Another "by the way": you are implicitly serializing your ObjectMapper, chances are you don't want to do that. It seems like the class does support Java serialization, but with special logic. Since the default config for Spark is to use Kryo (which has much better performance than Java serialization), I doubt it will do the right thing when using the default FieldSerializer. You have three options:

    • make the object mapper static to avoid serializing it
    • configure your Kryo registrator to serialize/deserialize objects of type ObjectMapper with Java serialization. That would work but not worth the effort.
    • use Java serialization everywhere instead of Kryo. Bad idea! It's slow and uses a lot of space (memory and disk depending on where the serialized objects will be written).