Search code examples
apache-sparkapache-kafkapysparkspark-streamingtwitter-streaming-api

Spark streaming duplicate network calls


I'm using pyspark with a Kafka Receiver to process a stream of tweets. One of the steps of my application includes a call to the Google Natural Language API to get a sentiment score per tweet. However, I'm seeing that the API is getting several calls per processed tweet (I see the number calls in the Google Cloud Console).

Also, if I print the tweetIDs (inside the mapped function) I get the same ID 3 or 4 times. At the end of my application, tweets are being sent to another topic in Kafka and there I get the correct count of tweets (no repeated ID's), so in principle everything is working correctly, but I don't know how to avoid calling Google API more than once per tweet.

Does this has to do with some configuration parameters in Spark or Kafka?

Here's an example of my console output:

TIME 21:53:36: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:36: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:36: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
TIME 21:53:37: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:37: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:37: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!

But in the Kafka receiver I only get 4 processed tweets (which is the correct thing to receive since they are only 4 unique tweets).

The code that does this is:

def sendToKafka(rdd,topic,address):
    publish_producer = KafkaProducer(bootstrap_servers=address,\
                            value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    records = rdd.collect()
    msg_dict = defaultdict(list)
    for rec in records:
        msg_dict["results"].append(rec)
    publish_producer.send(resultTopic,msg_dict)
    publish_producer.close()


kafka_stream = KafkaUtils.createStream(ssc, zookeeperAddress, "spark-consumer-"+myTopic, {myTopic: 1})

dstream_tweets=kafka_stream.map(lambda kafka_rec: get_json(kafka_rec[1]))\
                 .map(lambda post: add_normalized_text(post))\
                 .map(lambda post: tagKeywords(post,tokenizer,desired_keywords))\
                 .filter(lambda post: post["keywords"] == True)\
                 .map(lambda post: googleNLP.complementTweetFeatures(post,job_id))

dstream_tweets.foreachRDD(lambda rdd: sendToKafka(rdd,resultTopic,PRODUCER_ADDRESS))

Solution

  • I already found the solution to this! I just had to cache the DStream with:

    dstream_tweets.cache()
    

    The multiple network calls happened because Spark recalculated the RDDs inside that DStream before performing latter operations in my script. When I cache() the DStream it is only necessary to calculate it one time; and since it is saved in memory, later functions can access that information without re-calculations (in this case a re-calculation involved to call again the API, so it is worth to pay the price of more memory usage).