Search code examples
pythonapache-sparkpyspark

Converting string name to sql datatype in spark


I have been referring to the below article

Spark cast column to sql type stored in string

I am looking for the equivalent code in pyspark.

The problem is that the answer in the above post uses classof[DataTypes] but DataTypes class is not available in pyspark.

What I am trying to do is create the Schema dynamically. so, I have a List as below:

>>> sourceToHiveTypeList
['TimestampType', 'TimestampType', 'StringType', 'StringType', 'IntegerType', 'DoubleType']

and I have defined a UDF

def TableASchema(columnName, columnType): 
   return StructType([
       StructField(columnName[0], getattr(pyspark.sql.types,columnType[0]), nullable = True),
       StructField(columnName[1], getattr(pyspark.sql.types,columnType[1]), nullable = True),
       StructField(columnName[2], getattr(pyspark.sql.types,columnType[2]), nullable = True),
       StructField(columnName[3], getattr(pyspark.sql.types,columnType[3]), nullable = True),
       StructField(columnName[4], getattr(pyspark.sql.types,columnType[4]), nullable = True),
       StructField(columnName[5], getattr(pyspark.sql.types,columnType[5]), nullable = True)
      ])

When I call the above UDF, I get error:

>>> schema = TableASchema(headerColumns, sourceToHiveTypeList)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 3, in TableASchema
AttributeError: 'module' object has no attribute 'TimestampType()'

Solution

  • If you're looking for a solution, which works only for atomic types (same as the one in the linked question):

    import pyspark.sql.types
    
    def type_for_name(s):
        return getattr(pyspark.sql.types, s)()
    
    type_for_name("StringType")
    # StringType
    

    Complex types could parsed with eval, but due to security implications, I would be very careful:

    def type_for_name_(s):
        types = {
            t: getattr(pyspark.sql.types, t) 
            for t  in dir(pyspark.sql.types) if t.endswith("Type")}
        t = eval(s, types, {})
        return t if isinstance(t, pyspark.sql.types.DataType) else t()
    
    type_for_name_("DecimalType(10, 2)")
    # DecimalType(10,2)
    

    In general I would recommend using short strings (i.e. string, double, struct<x:integer,y:integer> which can be used directly:

    col("foo").cast("integer")
    

    and if you need more complex representation use JSON.