Search code examples
python-3.xdataframepysparkrdd

How to infer a schema for a pyspark dataframe?


There are many question on this site regarding how to convert a pyspark rdd to a dataframe. But none of them answer the question of how to convert a SQL table style rdd to a dataframe while preserving type.

I have an rdd that is exactly a list of dicts in python:

>>> rdd.take(1)

[{'se_error': 0, 'se_subjective_count': 0, 'se_word_count': 10, 'se_entity_summary_topic_phrases': {}, 'se_entity_hits': 1, 'se_entity_summary': 'rt @mercuryinrx: disgusting. cut it out FOCALENTITY twitter.com/anons4cetacean', 'se_query_with_hits': 0, 'id': 180034992495.0, 'se_objective_count': 2, 'se_category': {}, 'se_sentence_count': 2, 'se_entity_sentiment': 0.0, 'se_document_sentiment': -0.49000000953674316, 'se_entity_themes': {}, 'se_query_hits': 0, 'se_named_entities': {}}]

>>> rdd.take(1)[0].keys()

dict_keys(['se_error', 'se_subjective_count', 'se_word_count', 'se_entity_summary_topic_phrases', 'se_entity_hits', 'se_entity_summary', 'se_query_with_hits', 'id', 'se_objective_count', 'se_category', 'se_sentence_count', 'se_entity_sentiment', 'se_document_sentiment', 'se_entity_themes', 'se_query_hits', 'se_named_entities'])

All rows have the same columns. All columns have the same datatype. This is trivial to turn into a dataframe in pandas.

out = rdd.take(rdd.count())
outdf = pd.DataFrame(out)

This of course defeats the purpose of using spark! I can demonstrate that the columns are all the same datatype as well.

>>> typemap = [{key: type(val) for key, val in row.items()} for row in out]
>>> typedf = pd.DataFrame(typemap)
>>> for col in list(typedf):
>>>     typedf[col].value_counts()

<class 'float'>    1016
Name: id, dtype: int64
<class 'dict'>    1010
Name: se_category, dtype: int64
<class 'float'>    1010
Name: se_document_sentiment, dtype: int64
<class 'int'>    1010
Name: se_entity_hits, dtype: int64
...

It goes on farther, but they are all one type; or else they are nones.

How do I do this in spark? Here are some tries that don't work:

>>> outputDf = rdd.toDF()

...
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

>>> outputDf = rdd.toDF(sampleRatio=0.1)

...
File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 905, in <lambda>
    return lambda row: dict((kconv(k), vconv(v)) for k, v in row.items())
AttributeError: 'NoneType' object has no attribute 'items'

What is the issue here? Why is it so hard to figure out the datatype in a column that only has one python datatype?


Solution

  • The solution here is in the line

    <class 'float'>    1016
    Name: id, dtype: int64
    <class 'dict'>    1010
    Name: se_category, dtype: int64
    

    There are 1016 rows total in this rdd; but in 6 of those rose, the column se_category is absent. That is why you only see 1010 dict objects. This is no problem for pandas, which simply infers type from the rest of the column and uses an empty object of whatever appropriate type (list -> []; dict -> {}; float or int -> NaN) to fill in the blanks.

    Spark doesn't do that. If you think about it from the perspective of Java, which is the language underlying the rdd objects, this makes complete sense. Since I have been programming mostly python, a dynamically-typed language, for some time, it didn't occur to me immediately that this was a problem. But in a statically-typed language, it would be expected that something has a defined type at compile time.

    The solution is to 'declare' each row to be returned to an rdd as a set of objects with types; thus imitating the static typing. So I declare

    {"int_field": 0; "list_field": []; "float_field": 0.0, "string_field": ""}
    

    before I fill in any of the values. This way, if the value is not updated by my function that generates the rdd; the row still has all the correct types in place, and

    outputDf = rdd.map(lambda x: Row(**x)).toDF()
    

    successfully converts this rdd to a dataframe.