In pyspark sqlcontext sql, have written code to get text and then reformat it But something like this is the issue
Having something in the dataframe like this where the code is like
hash_tags_fun = udf(lambda t: re.findall('(#[^#]\w{3,})', t))
hash_tags_in_tweets_df.registerTempTable("hash_tags_table")
hash_tags_result = sqlContext.sql("SELECT text FROM hash_tags_table")
hash_tags_list = hash_tags_result.withColumn('text', hash_tags_fun('text'))
hash_tags_list.show(3)
+-------------------+
| text|
+-------------------+
| [#shutUpAndDANCE]|
| [#SHINee, #AMBER]|
|[#JR50, #flipagram]|
+-------------------+
I need something like
+-------------------+
| text|
+-------------------+
| #shutUpAndDANCE|
| #SHINee|
| #AMBER|
| #JR50|
| #flipagram|
+-------------------+
hash_tags_list.withColumn("text", explode("text")) has given an error saying
AnalysisException: u"cannot resolve 'explode(
text
)' due to data type mismatch: input to function explode should be array or map type, not string;; \n'Project [explode(text#24) AS text#68]\n+- AnalysisBarrier\n
+- Project [(text#9) AS text#24]\n
+- Project [text#9]\n
+- SubqueryAlias hash_tags_table\n
+- Project [text#9]\n
+- Filter text#9 LIKE %#%\n
+- SubqueryAlias twt\n
+- SubqueryAlias tweets\n
+- Relation[country#6,id#7,place#8,text#9,user#10] json\n"
Expanding on my comment:
Your column looks like an array but it is actually a string- this is why your call to explode()
didn't work. You will have to convert the column to an array first.
This will involve removing the leading and trailing square brackets and splitting on the comma character.
First to remove the leading and trailing brackets, you can use pyspark.sql.functions.regexp_replace()
:
from pyspark.sql.functions import regexp_replace, split
df = hash_tags_list.select(regexp_replace("text", r"(^\[)|(\]$)", "").alias("text"))
df.show()
#+-----------------+
#| text|
#+-----------------+
#| #shutUpAndDANCE|
#| #SHINee, #AMBER|
#|#JR50, #flipagram|
#+-----------------+
Now split on the comma followed by a space:
df = df.select(split("text", ", ").alias("text"))
df.show()
#+-------------------+
#| text|
#+-------------------+
#| [#shutUpAndDANCE]|
#| [#SHINee, #AMBER]|
#|[#JR50, #flipagram]|
#+-------------------+
You'll notice that this prints exactly like what you started with, but when we inspect the schema we see that these are actually arrays of strings:
df.printSchema()
#root
# |-- text: array (nullable = true)
# | |-- element: string (containsNull = true)
Compare this with your original DataFrame's schema:
hash_tags_list.printSchema()
#root
# |-- text: string (nullable = true)
Having the data as an array, makes calling explode()
now possible:
from pyspark.sql.functions import explode
df = df.select(explode("text").alias("hashtags"))
df.show()
#+---------------+
#| hashtags|
#+---------------+
#|#shutUpAndDANCE|
#| #SHINee|
#| #AMBER|
#| #JR50|
#| #flipagram|
#+---------------+