Search code examples
pythonapache-sparkbigdatapysparkrdd

PySpark DataFrames - way to enumerate without converting to Pandas?


I have a very big pyspark.sql.dataframe.DataFrame named df. I need some way of enumerating records- thus, being able to access record with certain index. (or select group of records with indexes range)

In pandas, I could make just

indexes=[2,3,6,7] 
df[indexes]

Here I want something similar, (and without converting dataframe to pandas)

The closest I can get to is:

  • Enumerating all the objects in the original dataframe by:

    indexes=np.arange(df.count())
    df_indexed=df.withColumn('index', indexes)
    
    • Searching for values I need using where() function.

QUESTIONS:

  1. Why it doesn't work and how to make it working? How to add a row to a dataframe?
  2. Would it work later to make something like:

     indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
    
  3. Any faster and simpler way to deal with it?


Solution

  • It doesn't work because:

    1. the second argument for withColumn should be a Column not a collection. np.array won't work here
    2. when you pass "index in indexes" as a SQL expression to where indexes is out of scope and it is not resolved as a valid identifier

    PySpark >= 1.4.0

    You can add row numbers using respective window function and query using Column.isin method or properly formated query string:

    from pyspark.sql.functions import col, rowNumber
    from pyspark.sql.window import Window
    
    w = Window.orderBy()
    indexed = df.withColumn("index", rowNumber().over(w))
    
    # Using DSL
    indexed.where(col("index").isin(set(indexes)))
    
    # Using SQL expression
    indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
    

    It looks like window functions called without PARTITION BY clause move all data to the single partition so above may be not the best solution after all.

    Any faster and simpler way to deal with it?

    Not really. Spark DataFrames don't support random row access.

    PairedRDD can be accessed using lookup method which is relatively fast if data is partitioned using HashPartitioner. There is also indexed-rdd project which supports efficient lookups.

    Edit:

    Independent of PySpark version you can try something like this:

    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, LongType
    
    row = Row("char")
    row_with_index = Row("char", "index")
    
    df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
    df.show(5)
    
    ## +----+
    ## |char|
    ## +----+
    ## |   a|
    ## |   b|
    ## |   c|
    ## |   d|
    ## |   e|
    ## +----+
    ## only showing top 5 rows
    
    # This part is not tested but should work and save some work later
    schema  = StructType(
        df.schema.fields[:] + [StructField("index", LongType(), False)])
    
    indexed = (df.rdd # Extract rdd
        .zipWithIndex() # Add index
        .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
        .toDF(schema)) # It will work without schema but will be more expensive
    
    # inSet in Spark < 1.3
    indexed.where(col("index").isin(indexes))