Search code examples
pythonapache-sparkpysparkapache-spark-2.0

Why does spark not recognize my "dataframe boolean expression"?


Environment

  • pyspark 2.1.0
  • python 3.5.2

Problem

I have a join with multiple conditions:

join_cond = [
    (col("ltmr1.anc_ref") == col("global.anc_ref") &
     col("global.straight_distance") >= args.radius_1 &
     col("global.straight_distance") <= args.radius_2)
]

Which is used later on:

ltm_r1_in_r2 = data_with_straight_distance.alias("global") \
    .join(
        less_than_min_r1.select(
            col("anc_ref"),
            col("num_rep")
        ).alias("ltmr1"),
        join_cond,
        "leftsemi"
)

The program fails at the following line:

col("global.straight_distance") >= args.radius_1 &

Where args is the tuple from argparse.

Relevant stacktrace:

  File "/mnt/mesos/sandbox/full_load.py", line 114, in full_load
    col("global.straight_distance") >= args.radius_1 &
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/column.py", line 635, in __nonzero__
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

Why is spark not recognizing my condition and how should I correct it ?


Solution

  • The following has the expected output:

    join_cond = [
        col("ltmr1.anc_ref") == col("global.anc_ref"),
        col("global.distance") >= args.radius_1,
        col("global.distance") <= args.radius_2
    ]