I am naive in Big data, I am trying to connect kafka to spark. Here is my producer code
import os
import sys
import pykafka
def get_text():
## This block generates my required text.
text_as_bytes=text.encode(text)
producer.produce(text_as_bytes)
if __name__ == "__main__":
client = pykafka.KafkaClient("localhost:9092")
print ("topics",client.topics)
producer = client.topics[b'imagetext'].get_producer()
get_text()
This is printing my generated text on console consumer when I do bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imagetext --from-beginning
Now I want this text to be consumed using Spark and this is my Jupyter code
import findspark
findspark.init()
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /spark-2.1.1-bin-hadoop2.6/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,5)
print('ssc =================== {} {}')
kstream = KafkaUtils.createDirectStream(ssc, topics = ['imagetext'],
kafkaParams = {"metadata.broker.list": 'localhost:9092'})
print('contexts =================== {} {}')
lines = kstream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)
But this is producing output on my Jupyter as
Time: 2018-02-21 15:03:25
-------------------------------------------
-------------------------------------------
Time: 2018-02-21 15:03:30
-------------------------------------------
Not the text that is on my console consumer.. Please help, unable to figure out the mistake.
I found another solution to it. While the solution of putting get_text()
in a loop works, it is not the right solution. You data was not in continuous fashion when it was sent in Kafka. As a result, Spark streaming should not get it in such a way.
Kafka-python library provides a get(timeout)
functionality so that Kafka waits for a request.
producer.send(topic,data).get(timeout=10)
Since you are using pykafka
, I am not sure whether it will work. Nevertheless, you can still try once and dont put get_text()
in loop.