Search code examples
pythonapache-sparkpysparkrdd

Pyspark directStreams foreachRdd always has empty RDD


I have been trying to read my data from a Kafka topic and writing it to a parquet file. So far I have everything working, except the foreachRdd function. I can see the data when I use the map on the dstream, but the next step of using the foreachRdd, the Rdd is always empty and I don't know why.

My environment is Ubuntu running both Kafka and Spark standalone. I am using the pyspark shell. I'm new to python, so there is a lot that I am still stumbling on with the syntax and am not sure if that is where my problem is.

Any help or insight would be greatly appreciated.

Here's a copy of my code that I have been pasting in the pyspark shell

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *             
import json

kafkaBroker = 'localhost:9092'
consumer_group = 'spark-streaming'
topic = 'test'
batchTimeDur=5

ssc = StreamingContext(sc, batchTimeDur)
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": kafkaBroker})

#change string to json string
lines = directKafkaStream.map(lambda v: json.loads(v[1]))

# show what is in the stream
lines.map(lambda x: 'rec in this line: %s\n' % x).pprint()

# save lines to file
lines.foreachRDD(lambda x: saveAsParquet(x))

def saveAsParquet(rdd):
    print('in save a parquet')
    if not rdd.isEmpty:
        df = sqlContext.createDataFrame(rdd, buildSchema())
        #df.write.parquet('file:///vagrant/streamed-parquet', mode='overwrite')
        print('  writing file')
        df.write.parquet('file:///vagrant/streamed-parquet', mode='append')
    print('return save as parquet')
    return rdd

ssc.start()

Solution

  • RDD.isEmpty is a method, not a property, therefore according to the language defintion, rdd.isEmpty will be evaluated as true in a Boolean context:

    the following values are interpreted as false: "False", "None", numeric zero of all types, and empty strings and containers (including strings, tuples, lists, dictionaries, sets and frozensets). All other values are interpreted as true.

    and subsequently if not rdd.isEmpty will be false.

    You should:

    if not rdd.isEmpty(): 
        ...