Search code examples
pythonhadoophdfspysparkcustom-formatting

PySpark: read, map and reduce from multiline record textfile with newAPIHadoopFile


I'm trying so solve a problem that is kind of similar to this post. My original data is a text file that contains values (observations) of several sensors. Each observation is given with a timestamp but the sensor name is given only once, and not in each line. But there a several sensors in one file.

Time    MHist::852-YF-007   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Time    MHist::852-YF-008   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0

Therefore I want to configure Hadoop to split the file at those lines where the sensor-information is given. Then read the sensor name (e.g. 852-YF-007 and 852-YF-008) from those lines and use MapReduce for reading the values for each sensor accordingly.

I did this in Python (Jupyter Notebook):

sheet = sc.newAPIHadoopFile(
    '/user/me/sample.txt',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': 'Time\tMHist'}
)

sf = sheet.filter(lambda (k, v): v)
sf.map(lambda (k, v): v).splitlines())

sf.take(50)

The output is like this:

[[u'::852-YF-007\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0'],
 [u'::852-YF-008\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0']]

My question is, how to further process this to extract the sensor name and having the value-lines for that sensor. Somewhat likes this

852-YF-007 --> array of sensor_lines
852-YF-008 --> array of sensor_lines

The lines themselves will be then split into timestamp and value later on. But I'm more interested in splitting the sensor names from the lines.


Solution

  • Personally I would:

    • extend delimiter with ::

      sheet = sc.newAPIHadoopFile(
          path,
          'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
          'org.apache.hadoop.io.LongWritable',
          'org.apache.hadoop.io.Text',
          conf={'textinputformat.record.delimiter': 'Time\tMHist::'}
      )
      
    • drop keys:

      values = sheet.values()
      
    • filter out empty entries

      non_empty = values.filter(lambda x:  x)
      
    • split:

      grouped_lines = non_empty.map(str.splitlines)
      
    • separate keys and values:

      from operator import itemgetter
      
      pairs = grouped_lines.map(itemgetter(0, slice(1, None)))
      
    • and finally split values:

      pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs])
      

    All of that can done with a single function of course:

    import dateutil.parser
    
    def process(pair):
        _, content = pair
        clean = [x.strip() for x in content.strip().splitlines()]
        if not clean:
            return []
        k, vs = clean[0], clean[1:]
        for v in vs:
            try:
                ds, x = v.split("\t")
                yield k, (dateutil.parser.parse(ds), float(x))  # or int(x)
            except ValueError:
                pass
    
    sheet.flatMap(process)