tried this data is getting streamed and couldnt save that data in form of tuples in local disk or hdfs. from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
def process(RDD):
#RDD.pprint()
kvs2=RDD.map()
kvs2.saveAsTextFiles('path')
#kvs.foreachRDD(lambda x: process(x))
#kvs1=kvs.map(lambda x: x)
kvs.pprint()
kvs.saveAsTextFiles('path','txt')
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
At this line:
kvs.saveAsTextFiles('path','txt')
You are storing the raw stream, not the one with tuples. Store from the counts instead:
counts.saveAsTextFiles('path','txt')
Be curious the files saved on the worker nodes under the directory provided in 'path'.
Saving to HDFS is not supported by pySpark API as for latest version, other languages do have saveAsHadoopFiles. Link to the doc.