Search code examples
pythonpysparkscipytypeerrorpeak-detection

Problem/bug with list values reading from Spark dataframe during plotting spikes detection using find_peaks from SciPy


Let's say I have following pandas dataframe contains value over time or date:

import pandas as pd

pdf = pd.DataFrame(data={'date':['2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23','2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03','2020-11-04','2020-11-05','2020-11-06','2020-11-07','2020-11-08','2020-11-09','2020-11-10','2020-11-11','2020-11-12','2020-11-13','2020-11-14','2020-11-15'],
                        'value':[161967, 161270, 148508, 152442, 157504, 157118, 155674, 134522, 213384, 163242, 217415, 221502, 146267, 143621, 145875, 139488, 104466, 94825, 143686, 151952, 161074, 161417, 135042, 148768, 131428, 127816, 151905, 180498, 177899, 193950, 12]})
pdf

or I have following Spark dataframe with similar data:

import pyspark.sql.types
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType

dict  = [ ('2020-10-16', 161967),
          ('2020-10-17', 161270),
          ('2020-10-18', 148508),
          ('2020-10-19', 152442),
          ('2020-10-20', 157504),
          ('2020-10-21', 157118),
          ('2020-10-22', 155674),
          ('2020-10-23', 134522),
          ('2020-10-24', 213384),
          ('2020-10-25', 163242),
          ('2020-10-26', 217415),
          ('2020-10-27', 221502),
          ('2020-10-28', 146267),
          ('2020-10-29', 143621),
          ('2020-10-30', 145875),
          ('2020-10-31', 139488),
          ('2020-11-01', 104466),
          ('2020-11-02', 94825),
          ('2020-11-03', 143686),
          ('2020-11-04', 151952),
          ('2020-11-05', 161074),
          ('2020-11-06', 161417),
          ('2020-11-07', 135042),
          ('2020-11-08', 148768),
          ('2020-11-09', 131428),
          ('2020-11-10', 127816),
          ('2020-11-11', 151905),
          ('2020-11-12', 180498),
          ('2020-11-13', 177899),
          ('2020-11-14', 193950),
          ('2020-11-15', 12),

  ]

schema = StructType([ 
    StructField("date",        StringType(),    True), \
    StructField("value",       IntegerType(),   True), \
  ])
 
#create a Spark dataframe
sc= SparkContext()
sqlContext = SQLContext(sc)
sdf = sqlContext.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.sort('date').show(truncate = False)

I inspired from this answer to detect peaks and valleys via below code:

from scipy.signal import find_peaks
import numpy as np
import matplotlib.pyplot as plt

# Input signal from Pandas dataframe
t = pdf.date
x = pdf.value

# Set thresholds
# std calculated on 10-90 percentile data, without outliers is used for threshold
thresh_top    = np.median(x) + 1 * np.std(x)
thresh_bottom = np.median(x) - 1 * np.std(x)


# Find indices of peaks & of valleys (from inverting the signal)
peak_idx, _   = find_peaks(x,  height =  thresh_top)
valley_idx, _ = find_peaks(-x, height = -thresh_bottom)

# Plot signal
plt.figure(figsize=(14,12))
plt.plot(t, x   , color='b', label='data')
plt.scatter(t, x, s=10,c='b',label='value')

# Plot threshold
plt.plot([min(t), max(t)], [thresh_top, thresh_top],       '--',  color='r', label='peaks-threshold')
plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--',  color='g', label='valleys-threshold')

# Plot peaks (red) and valleys (blue)
plt.plot(t[peak_idx],   x[peak_idx],   "x", color='r', label='peaks')
plt.plot(t[valley_idx], x[valley_idx], "x", color='g', label='valleys')


plt.xticks(rotation=45)
plt.ylabel('value')
plt.xlabel('timestamp')
plt.title(f'data over time')
plt.legend( loc='lower left')
plt.gcf().autofmt_xdate()
plt.show()

and it works and plot it successfully when I read data from Pandas dataframe pdf before creating Spark dataframe. once I created Spark dataframe sdf even if you run the same cell on notebook within reading and input signal from pandas pdf doesn't work anymore and gives following error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-12-1c79b34272c5> in <module>()
     23 
     24 # Plot threshold
---> 25 plt.plot([min(t), max(t)], [thresh_top, thresh_top],       '--',  color='r', label='peaks-threshold')
     26 plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--',  color='g', label='valleys-threshold')
     27 

2 frames
/content/spark-3.1.2-bin-hadoop2.7/python/pyspark/sql/column.py in _to_java_column(col)
     47             "{0} of type {1}. "
     48             "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
---> 49             "function.".format(col, type(col)))
     50     return jcol
     51 

TypeError: Invalid argument, not a string or column: 0     2020-10-16
1     2020-10-17
2     2020-10-18
3     2020-10-19
4     2020-10-20
5     2020-10-21
6     2020-10-22
7     2020-10-23
8     2020-10-24
9     2020-10-25
10    2020-10-26
11    2020-10-27
12    2020-10-28
13    2020-10-29
14    2020-10-30
15    2020-10-31
16    2020-11-01
17    2020-11-02
18    2020-11-03
19    2020-11-04
20    2020-11-05
21    2020-11-06
22    2020-11-07
23    2020-11-08
24    2020-11-09
25    2020-11-10
26    2020-11-11
27    2020-11-12
28    2020-11-13
29    2020-11-14
30    2020-11-15
Name: date, dtype: object of type <class 'pandas.core.series.Series'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

when I try it directly to read data from spark dataframe using:

# Input signal from Spark dataframe
t = [val.date  for val in sdf.select('date').collect()]
x = [val.value for val in sdf.select('value').collect()]

sadly the plotting code doesn't work and throw out following error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-14-9cfede77d0fb> in <module>()
     31 # Find indices of peaks & of valleys (from inverting the signal)
     32 peak_idx, _   = find_peaks(x,  height =  thresh_top)
---> 33 valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
     34 
     35 

TypeError: bad operand type for unary -: 'list'

I spent lots of time but I couldn't fixed this bug. I also open to other non-find_peaks() solutions to plot spike detection like this numpythonic answer if I can adapt to the code and apply on Spark dataframe. I have tried many things you could check in this google Colab Notebook and feel free to run/test/edit it for quick debugging.


Solution

  • The problem is that you are not working with the same objects.

    When you work with pandas and you get x = pdf.value you actually get Series object. This object can take - in front and it knows that it has to convert the values in it to negative.

    But when you work with PySpark and you collect values, you get list object and if you put - in front you get error:

    TypeError: bad operand type for unary -: 'list'
    

    Which tells you that it doesn't know how to deal with it.

    • So the first thing to do, instead of:
    valley_idx, _ = find_peaks(-x, height=-thresh_bottom)
    

    You have to convert values to negative, for example:

    valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
    
    • Next, find_peaks will return ndarray which again cannot be used with list in:
    plt.plot(t[peak_idx], x[peak_idx], "x", color="r", label="peaks")
    

    So you'll have to do it manually, for example:

    plt.plot(
            [t[i] for i in peak_idx],
            [x[i] for i in peak_idx],
            "x",
            color="r",
            label="peaks",
        )
    

    I've reproduced your plot with the following code (+ calculating median and std_dev in PySpark as an example):

    # data is the same
    # ...
    
    # create a Spark dataframe
    spark = SparkSession.builder.getOrCreate()
    sdf = spark.createDataFrame(data=data, schema=schema)
    std_dev = sdf.select(F.stddev(F.col("value")).alias("std")).collect()[0]["std"]
    median = (
        sdf.groupBy("value")
        .agg(F.expr("percentile_approx(value, 0.5)").alias("med"))
        .collect()[0]["med"]
    )
    thresh_top = median + 1 * std_dev
    thresh_bottom = median - 1 * std_dev
    t = sdf.select("date").rdd.flatMap(lambda x: x).collect()
    x = sdf.select("value").rdd.flatMap(lambda x: x).collect()
    peak_idx, _ = find_peaks(x, height=thresh_top)
    valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
    
    plt.figure(figsize=(14, 12))
    plt.plot(t, x, color="b", label="data")
    plt.scatter(t, x, s=10, c="b", label="value")
    
    # Plot threshold
    plt.plot(
        [min(t), max(t)],
        [thresh_top, thresh_top],
        "--",
        color="r",
        label="peaks-threshold",
    )
    plt.plot(
        [min(t), max(t)],
        [thresh_bottom, thresh_bottom],
        "--",
        color="g",
        label="valleys-threshold",
    )
    
    # Plot peaks (red) and valleys (blue)
    plt.plot(
        [t[i] for i in peak_idx],
        [x[i] for i in peak_idx],
        "x",
        color="r",
        label="peaks",
    )
    plt.plot(
        [t[i] for i in valley_idx],
        [x[i] for i in valley_idx],
        "x",
        color="g",
        label="valleys",
    )
    
    # ...