I am trying to perform column transformation using map(). However, it is not working for 1 column as below : -
from pyspark.sql import SparkSession
data = [('James','Smith','M',30),
('Anna','Rose','F',41),
('Robert','Williams','M',62)]
rdd = spark.sparkContext.parallelize(data)
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data,schema = columns)
rdd2=df.rdd.map(lambda x:
(x["firstname"]+","+ x["lastname"])
)
df2=rdd2.toDF(["fullname"])
df2.show()
It is showing TypeError
The same query is working for multiple columns as below : -
from pyspark.sql import SparkSession
data = [('James','Smith','M',30),
('Anna','Rose','F',41),
('Robert','Williams','M',62)]
rdd = spark.sparkContext.parallelize(data)
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data,schema = columns)
rdd2=df.rdd.map(lambda x:
(x["firstname"]+","+ x["lastname"],x["gender"])
)
df2=rdd2.toDF(["fullname","gender"])
Output of above code is below : -
So, I want to understand why map() is not working with 1 column but working with multiple columns. What is the error in the 1st code that is returning TypeError ?
I can see the error is happening while trying to convert rdd to dataframe. Please check and let me know.
Thank you
toDF()
expects a tuple of values but when you pass a single column it is just a string and infer schema will complain. The _infer_schema
code looks something like:
if isinstance(row, dict):
# ...
elif isinstance(row, (tuple, list)):
# ...
elif hasattr(row, "__dict__"): # object
# ...
else:
raise TypeError("Can not infer schema for type: %s" % type(row))
To fix this add a trailing comma when you only have a single column, this will convert it into a tuple:
rdd2=df.rdd.map(lambda x:
(x["firstname"]+","+ x["lastname"],)
)
This will help you understand better:
>>> print(type(("hello")))
<class 'str'>
>>> print(type(("hello",)))
<class 'tuple'>
Now it will print:
+---------------+
| fullname|
+---------------+
| James,Smith|
| Anna,Rose|
|Robert,Williams|
+---------------+