Search code examples

How can extract features sensitive to length in PySpark without using .toPandas() hack?

I'm a newbie in PySpark, and I want to translate the Feature Extraction (FE) part scripts which are pythonic, into PySpark. At first, I have Spark data frame so-called sdf including 2 columns A & B:

Below is the example:

data A B path/to/file param=42#fragment path/to/file NaN

now I want to apply some feature engineering and extract features out and concat the results with sdf out of column B. So far I can do it using pythonic scripts:

#================================> Type <==========================================
def getType(input_value):
  if pd.isna(input_value):
    return "-"
  type_ = "-"

  if input_value.isdigit():                                                 # Only numeric
    type_ = "Int"
  elif bool(re.match(r"^[a-zA-Z0-9_]+$", input_value)):                     # Consists of one or more of a-zA-Z, 0-9, underscore , and Chinese
    type_ = "String"
  elif bool(re.match(r"^[\d+,\s]+$", input_value)):                         # Only comma exists as separator "^[\d+,\s]+$"
    type_ = "Array"

    existing_separators = re.findall(r"([\+\;\,\:\=\|\\/\#\'\"\t\r\n\s])+", input_value)
    # There are one or more separators
    # when there is only one separator it is not comma (!= "^[\d+,\s]+$")
    if len(existing_separators) > 1 or (len(existing_separators) == 1 and existing_separators[0] != ","):
      type_ = "Sentence"                                                

  return type_

#================================> Length <==========================================
#Number of charactesrs in parameter value
getLength = lambda input_text: 0 if pd.isna(input_text) else len(input_text)

#================================> Token number <==========================================

double_separators_regex = re.compile(r"[\<\[\(\{]+[0-9a-zA-Z_\.\-]+[\}\)\]\>]+")
single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")

token_number = lambda input_text: 0 if pd.isna(input_text) else len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])

#quick test 
param_example = "url="
out = double_separators_regex.findall(param_example) + [element for pair in single_separators_regex.findall(param_example) for element in pair if element != ""] 

print(out)        #['url','http','','sem','resource','code','rss','rssfeed.jsp','type','list']
print(len(out))   #9

#===================================> Encoding type <============================================

import base64

def isBase64(input_value):
    return base64.b64encode(base64.b64decode(input_value)) == input_value
  except Exception as e:
    return False

#================================> Character feature <==========================================
N = 2

n_grams = lambda input_text: 0 if pd.isna(input_text) else len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))

#quick test 
n_grams_example = 'zhang1997'  #output = [‘zh’, ‘ha’, ‘an’, ‘ng’, ‘g1’, ‘19’, ‘99’ , ‘97’]
n_grams(n_grams_example)       # 8

#frame the features
features_df = pd.DataFrame()

features_df["Type"] = df.fragment.apply(getType)
features_df["Length"] = df.fragment.apply(getLength)
features_df["Token_number"] = df.fragment.apply(token_number)
features_df["Encoding_type"] = df.fragment.apply(isBase64)
features_df["Character_feature"] = df.fragment.apply(n_grams)

features_df.columns  #Index(['Type', 'Length', 'Token number', 'Encoding type', 'Character feature'], dtype='object')

Problem: what is the best approach to translating FE without converting Spark dataframe to Pandas datafarame toPandas() to optimize the pipeline and process it 100% spark form?

so I kindly provided a colab notebook for quick debugging and commenting.

The expected output is shown below in form of a Spark dataframe:

|data                |A           |B                |Type    |Length|Token_number |Encoding_type |Character_feature|
|https://example1....|path/to/file|param=42#fragment|Sentence|17.0  |3.0          |False         |15.0             |
|https://example2....|path/to/file|Null             |-       |0.0   |0.0          |False         |0.0              |


  • I make a sample code here for you, it's not perfect but it at least followed your source code and should give you a direction on where to go next. I put some comments on each Spark transformation as well. Hope you found it useful

    from pyspark.sql import functions as F
    from pyspark.sql import functions as f
    from pyspark.sql import types as T
    from pyspark.sql import Window as W
    def count_token(input_text):
        import re
        if input_text is None:
            return 0
        double_separators_regex = re.compile(r"[\<\[\(\{]+[0-9a-zA-Z_\.\-]+[\}\)\]\>]+")
        single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")
        return len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])
    def n_grams(input_text):
        if input_text is None:
            return 0
        N = 2
        return len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))
        .withColumn('test', F.base64(F.unbase64('fragment')))
        .withColumn('Type', F
            .when(F.isnull('fragment'), '-')
            .when(~F.isnull(F.col('fragment').cast('int')), 'Int')
            .when(F.regexp_extract('fragment', '^[a-zA-Z0-9_]+$', 0) == F.col('fragment'), 'String')
            .when(F.regexp_extract('fragment', '^[\d+,\s]+$', 0) == F.col('fragment'), 'Array') # not sure about this regex?
            .otherwise('Sentence') # not sure about this condition either, but you can utilize
                                   # `regexp_extract` like above and do any kind of comparision
        .withColumn('Length', F
            .when(F.isnull('fragment'), 0)
        .withColumn('Token_number', F.udf(count_token, T.IntegerType())('fragment')) # Spark doesn't provide `findall` alternative, so
                                                                                     # so we have to use UDF here, you can find document here
        .withColumn('Encoding_type', F
            .when(F.isnull('fragment'), False)
            .otherwise(F.base64(F.unbase64(F.col('fragment'))) == F.col('fragment')) # FYI, this is not always correct,
                                                                                     # for example `assert(isBase64('param123') == False)`
        .withColumn('Character_feature', F.udf(n_grams, T.IntegerType())('fragment')) # or you can use more advanced feature from SparkML
    # Output
    # +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
    # |                data|        path|         fragment|test|    Type|Length|Token_number|Encoding_type|Character_feature|
    # +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
    # |https://example1....|path/to/file|param=42#fragment|para|Sentence|    17|           3|        false|               15|
    # |https://example2....|path/to/file|             null|null|       -|     0|           0|        false|                0|
    # +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+