Search code examples
apache-sparkpysparkspark-streamingapache-spark-sqlflume-ng

Application hangs when I do join for PipelinedRDD and RDD from DStream


I use spark 1.6.0 with Spark Streaming and have one problem with wide operations.

Code example: There is RDD called "a" which has type: class 'pyspark.rdd.PipelinedRDD'.

"a" was received as:

# Load a text file and convert each line to a Row.
    lines = sc.textFile(filename)
    parts = lines.map(lambda l: l.split(","))
    clients = parts.map(lambda p: Row(client_id=int(p[0]), clientname=p[1] ...))

    # Infer the schema, and register the DataFrame as a table.
    schemaPeople = sqlContext.createDataFrame(clients)
    schemaPeople.registerTempTable("clients")

    client_list = sqlContext.sql("SELECT * FROM clients")

and after:

a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry)))

There is second part "b" with type class 'pyspark.streaming.dstream.TransformedDStream'. I am receiving "b" from Flume:

DStreamB = flumeStream.map(lambda tup: function_for_map(tup[1].encode('ascii','ignore')))

and

b = DStreamB.map(lambda event: (int(event[2]), value_from_event(event)))

Problem is: When i try join as:

mult = b.transform(lambda rdd: rdd.join(a))

my application hangs in this stage (Now I show the screen after b.pprint() and before stage .join())

enter image description here

But when I am adding:

  1. Declare RDD "test":

    test = sc.parallelize(range(1, 100000)).map(lambda k: (k, 'value'))
    

    and do:

    mult0 = a.join(test)
    mult = b.transform(lambda rdd: rdd.join(mult0))`
    

    Then it works(!!):

    screen 2

  2. Also I can do:

    mult0 = b.transform(lambda rdd: rdd.join(test))
    

Thus:

I have RDDs "a" and "test". DStream "b". And I can multiply:

  • a * test * b
  • b * test

But I cannot do 'b * a'.

Any help is appreciated! Thanks!


Solution

  • On the advice of user6910411 I did caching "a" as

    a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry))).cache() 
    

    and problem was resolved.