Search code examples
pythonnumpypandasmultiprocessingblaze

pydata blaze: does it allow parallel processing or not?


I am looking to parallelise numpy or pandas operations. For this I have been looking into pydata's blaze. My understanding was that seemless parallelisation was its major selling point.

Unfortunately I have been unable to find an operation that runs on more than one core. Is parallel processing in blaze available yet or currently only a stated aim? Am I doing something wrong? I am using blaze v0.6.5.

Example of one function I was hoping to parallelise: (deduplication of a pytables column too large to fit in memory)

import pandas as pd
import blaze as bz
def f1():
    counter = 0
    groups = pd.DataFrame(columns=['name'])
    t = bz.TableSymbol('t', '{name: string}')
    e = bz.distinct(t)
    for chunk in store.select('my_names', columns=['name'],
                              chunksize=1e5):
        counter += 1
        print('processing chunk %d' % counter)
        groups = pd.concat([groups, chunk])
        groups = bz.compute(e, groups)

Edit 1

I have had problems following Phillip's examples:

In [1]: from blaze import Data, compute

In [2]: d = Data('test.bcolz')

In [3]: d.head(5)
Out[3]: <repr(<blaze.expr.collections.Head at 0x7b5e300>) failed: NotImplementedError: Don't know how to compute:
expr: _1.head(5).head(11)
data: {_1: ctable((8769257,), [('index', '<i8'), ('date', 'S10'), ('accessDate', 'S26')])
  nbytes: 367.97 MB; cbytes: 35.65 MB; ratio: 10.32
  cparams := cparams(clevel=5, shuffle=True, cname='blosclz')
  rootdir := 'test.bcolz'
[(0L, '2014-12-12', '2014-12-14T17:39:19.716000')
 (1L, '2014-12-11', '2014-12-14T17:39:19.716000')
 (2L, '2014-12-10', '2014-12-14T17:39:19.716000') ...,
 (1767L, '2009-11-11', '2014-12-15T13:32:39.906000')
 (1768L, '2009-11-10', '2014-12-15T13:32:39.906000')
 (1769L, '2009-11-09', '2014-12-15T13:32:39.906000')]}>

My environment:

C:\Anaconda>conda list blaze
# packages in environment at C:\Anaconda:
#
blaze                     0.6.8               np19py27_69

But note, blaze seems to report a wrong version:

In [5]: import blaze

In [6]: blaze.__version__
Out[6]: '0.6.7'

With other data sources blaze seems to work:

In [6]: d = Data([1,2,2,2,3,4,4,4,5,6])

In [7]: d.head(5)
Out[7]:
   _2
0   1
1   2
2   2
3   2
4   3

In [16]: list(compute(d._2.distinct()))
Out[16]: [1, 2, 3, 4, 5, 6]

Solution

  • Note: The example below requires the latest version of blaze, which you can get via

    conda install -c blaze blaze
    

    You'll also need the latest version of the nascent into project. You'll need to install into from master, which you can do with

    pip install git+git://github.com/ContinuumIO/into.git
    

    You can't do "seamless" parallelization with an arbitrary backend, but the bcolz backend supports parallelization in a nice way. Here's an example with the NYC Taxi trip/fare dataset

    Note: I've combined both the trip and fare datasets into a single dataset. There are 173,179,759 rows in the dataset

    In [28]: from blaze import Data, compute
    
    In [29]: ls -d *.bcolz
    all.bcolz/  fare.bcolz/ trip.bcolz/
    
    In [30]: d = Data('all.bcolz')
    
    In [31]: d.head(5)
    Out[31]:
                              medallion                      hack_license  \
    0  89D227B655E5C82AECF13C3F540D4CF4  BA96DE419E711691B9445D6A6307C170
    1  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472
    2  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472
    3  DFD2202EE08F7A8DC9A57B02ACB81FE2  51EE87E3205C985EF8431D850C786310
    4  DFD2202EE08F7A8DC9A57B02ACB81FE2  51EE87E3205C985EF8431D850C786310
    
      vendor_id  rate_code store_and_fwd_flag     pickup_datetime  \
    0       CMT          1                  N 2013-01-01 15:11:48
    1       CMT          1                  N 2013-01-06 00:18:35
    2       CMT          1                  N 2013-01-05 18:49:41
    3       CMT          1                  N 2013-01-07 23:54:15
    4       CMT          1                  N 2013-01-07 23:25:03
    
         dropoff_datetime  passenger_count  trip_time_in_secs  trip_distance  \
    0 2013-01-01 15:18:10                4                382            1.0
    1 2013-01-06 00:22:54                1                259            1.5
    2 2013-01-05 18:54:23                1                282            1.1
    3 2013-01-07 23:58:20                2                244            0.7
    4 2013-01-07 23:34:24                1                560            2.1
    
         ...     pickup_latitude  dropoff_longitude  dropoff_latitude  \
    0    ...           40.757977         -73.989838         40.751171
    1    ...           40.731781         -73.994499         40.750660
    2    ...           40.737770         -74.009834         40.726002
    3    ...           40.759945         -73.984734         40.759388
    4    ...           40.748528         -74.002586         40.747868
    
       tolls_amount  tip_amount  total_amount  mta_tax  fare_amount  payment_type  \
    0             0           0           7.0      0.5          6.5           CSH
    1             0           0           7.0      0.5          6.0           CSH
    2             0           0           7.0      0.5          5.5           CSH
    3             0           0           6.0      0.5          5.0           CSH
    4             0           0          10.5      0.5          9.5           CSH
    
      surcharge
    0       0.0
    1       0.5
    2       1.0
    3       0.5
    4       0.5
    
    [5 rows x 21 columns]
    

    To add process-based parallelism, we bring in the Pool class from the multiprocessing stdlib module, and pass the Pool instance's map method as a keyword argument to compute:

    In [32]: from multiprocessing import Pool
    
    In [33]: p = Pool()
    
    In [34]: %timeit -n 1 -r 1 values = compute(trip.medallion.distinct())
    1 loops, best of 1: 1min per loop
    
    In [35]: %timeit -n 1 -r 1 values = compute(trip.medallion.distinct(), map=p.map)
    1 loops, best of 1: 16.2 s per loop
    

    So, roughly a 3x speedup for an extra line of code. Note that this is a string column, and these tend to be very inefficient compared to other types. An distinct expression computed over an integer column is finished in about 1 second (vs 3 seconds) with multiple cores (so, about the same improvement in running time):

    In [38]: %timeit -n 1 -r 1 values = compute(trip.passenger_count.distinct())
    1 loops, best of 1: 3.33 s per loop
    
    In [39]: %timeit -n 1 -r 1 values = compute(trip.passenger_count.distinct(), map=p.map)
    1 loops, best of 1: 1.01 s per loop