Search code examples
pythonpysparkapache-spark-sqlsinglestore

Memsql::Streamliner Python Transform


I am working in the Memsql::Streamliner::Transform (Python) utility. There is a transform method that must be overwritten to provide custom transform functionality.

def transform(self, sql_context, dataframe, logger):

dataframe.column[0] is a byte array (JSON string).

How do I convert the byte array into a DataFrame with named columns?

Goal: Access individual columns within converted DataFrame.


Solution

  • you can access the underlying rdd with dataframe.rdd and map over it to transform each byte string into a list containing your columns. you can turn the resulting rdd back into a dataframe with named columns by providing the column list as the second argument to createDataframe.

    Something like the following should work:

    def parse(row):
       bytestring = row[0]
       json_data = convert_bytes_and_parse_json(bytestring)
       return [ json_data["mycolumn1"], json_data["mycolumn2"] ]
    
    parsedRDD = dataframe.rdd.map(parse)
    parsedDf = sql_context.createDataframe(parsedRDD, ["mycolumn1", "mycolumn2"])
    
    # now you can access columns by name
    parsedDf.select(parsedDf["mycolumn1"])