Let's say I have following pandas dataframe contains value
over time or date
:
import pandas as pd
pdf = pd.DataFrame(data={'date':['2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23','2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03','2020-11-04','2020-11-05','2020-11-06','2020-11-07','2020-11-08','2020-11-09','2020-11-10','2020-11-11','2020-11-12','2020-11-13','2020-11-14','2020-11-15'],
'value':[161967, 161270, 148508, 152442, 157504, 157118, 155674, 134522, 213384, 163242, 217415, 221502, 146267, 143621, 145875, 139488, 104466, 94825, 143686, 151952, 161074, 161417, 135042, 148768, 131428, 127816, 151905, 180498, 177899, 193950, 12]})
pdf
or I have following Spark dataframe with similar data:
import pyspark.sql.types
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType
dict = [ ('2020-10-16', 161967),
('2020-10-17', 161270),
('2020-10-18', 148508),
('2020-10-19', 152442),
('2020-10-20', 157504),
('2020-10-21', 157118),
('2020-10-22', 155674),
('2020-10-23', 134522),
('2020-10-24', 213384),
('2020-10-25', 163242),
('2020-10-26', 217415),
('2020-10-27', 221502),
('2020-10-28', 146267),
('2020-10-29', 143621),
('2020-10-30', 145875),
('2020-10-31', 139488),
('2020-11-01', 104466),
('2020-11-02', 94825),
('2020-11-03', 143686),
('2020-11-04', 151952),
('2020-11-05', 161074),
('2020-11-06', 161417),
('2020-11-07', 135042),
('2020-11-08', 148768),
('2020-11-09', 131428),
('2020-11-10', 127816),
('2020-11-11', 151905),
('2020-11-12', 180498),
('2020-11-13', 177899),
('2020-11-14', 193950),
('2020-11-15', 12),
]
schema = StructType([
StructField("date", StringType(), True), \
StructField("value", IntegerType(), True), \
])
#create a Spark dataframe
sc= SparkContext()
sqlContext = SQLContext(sc)
sdf = sqlContext.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.sort('date').show(truncate = False)
I inspired from this answer to detect peaks and valleys via below code:
from scipy.signal import find_peaks
import numpy as np
import matplotlib.pyplot as plt
# Input signal from Pandas dataframe
t = pdf.date
x = pdf.value
# Set thresholds
# std calculated on 10-90 percentile data, without outliers is used for threshold
thresh_top = np.median(x) + 1 * np.std(x)
thresh_bottom = np.median(x) - 1 * np.std(x)
# Find indices of peaks & of valleys (from inverting the signal)
peak_idx, _ = find_peaks(x, height = thresh_top)
valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
# Plot signal
plt.figure(figsize=(14,12))
plt.plot(t, x , color='b', label='data')
plt.scatter(t, x, s=10,c='b',label='value')
# Plot threshold
plt.plot([min(t), max(t)], [thresh_top, thresh_top], '--', color='r', label='peaks-threshold')
plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--', color='g', label='valleys-threshold')
# Plot peaks (red) and valleys (blue)
plt.plot(t[peak_idx], x[peak_idx], "x", color='r', label='peaks')
plt.plot(t[valley_idx], x[valley_idx], "x", color='g', label='valleys')
plt.xticks(rotation=45)
plt.ylabel('value')
plt.xlabel('timestamp')
plt.title(f'data over time')
plt.legend( loc='lower left')
plt.gcf().autofmt_xdate()
plt.show()
and it works and plot it successfully when I read data from Pandas dataframe pdf
before creating Spark dataframe. once I created Spark dataframe sdf
even if you run the same cell on notebook within reading and input signal from pandas pdf
doesn't work anymore and gives following error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-12-1c79b34272c5> in <module>()
23
24 # Plot threshold
---> 25 plt.plot([min(t), max(t)], [thresh_top, thresh_top], '--', color='r', label='peaks-threshold')
26 plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--', color='g', label='valleys-threshold')
27
2 frames
/content/spark-3.1.2-bin-hadoop2.7/python/pyspark/sql/column.py in _to_java_column(col)
47 "{0} of type {1}. "
48 "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
---> 49 "function.".format(col, type(col)))
50 return jcol
51
TypeError: Invalid argument, not a string or column: 0 2020-10-16
1 2020-10-17
2 2020-10-18
3 2020-10-19
4 2020-10-20
5 2020-10-21
6 2020-10-22
7 2020-10-23
8 2020-10-24
9 2020-10-25
10 2020-10-26
11 2020-10-27
12 2020-10-28
13 2020-10-29
14 2020-10-30
15 2020-10-31
16 2020-11-01
17 2020-11-02
18 2020-11-03
19 2020-11-04
20 2020-11-05
21 2020-11-06
22 2020-11-07
23 2020-11-08
24 2020-11-09
25 2020-11-10
26 2020-11-11
27 2020-11-12
28 2020-11-13
29 2020-11-14
30 2020-11-15
Name: date, dtype: object of type <class 'pandas.core.series.Series'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
when I try it directly to read data from spark dataframe using:
# Input signal from Spark dataframe
t = [val.date for val in sdf.select('date').collect()]
x = [val.value for val in sdf.select('value').collect()]
sadly the plotting code doesn't work and throw out following error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-14-9cfede77d0fb> in <module>()
31 # Find indices of peaks & of valleys (from inverting the signal)
32 peak_idx, _ = find_peaks(x, height = thresh_top)
---> 33 valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
34
35
TypeError: bad operand type for unary -: 'list'
I spent lots of time but I couldn't fixed this bug. I also open to other non-find_peaks()
solutions to plot spike detection like this numpythonic answer if I can adapt to the code and apply on Spark dataframe. I have tried many things you could check in this google Colab Notebook and feel free to run/test/edit it for quick debugging.
The problem is that you are not working with the same objects.
When you work with pandas and you get x = pdf.value
you actually get Series
object. This object can take -
in front and it knows that it has to convert the values in it to negative.
But when you work with PySpark and you collect values, you get list
object and if you put -
in front you get error:
TypeError: bad operand type for unary -: 'list'
Which tells you that it doesn't know how to deal with it.
valley_idx, _ = find_peaks(-x, height=-thresh_bottom)
You have to convert values to negative, for example:
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
find_peaks
will return ndarray
which again cannot be used with list
in:plt.plot(t[peak_idx], x[peak_idx], "x", color="r", label="peaks")
So you'll have to do it manually, for example:
plt.plot(
[t[i] for i in peak_idx],
[x[i] for i in peak_idx],
"x",
color="r",
label="peaks",
)
I've reproduced your plot with the following code (+ calculating median
and std_dev
in PySpark as an example):
# data is the same
# ...
# create a Spark dataframe
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(data=data, schema=schema)
std_dev = sdf.select(F.stddev(F.col("value")).alias("std")).collect()[0]["std"]
median = (
sdf.groupBy("value")
.agg(F.expr("percentile_approx(value, 0.5)").alias("med"))
.collect()[0]["med"]
)
thresh_top = median + 1 * std_dev
thresh_bottom = median - 1 * std_dev
t = sdf.select("date").rdd.flatMap(lambda x: x).collect()
x = sdf.select("value").rdd.flatMap(lambda x: x).collect()
peak_idx, _ = find_peaks(x, height=thresh_top)
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
plt.figure(figsize=(14, 12))
plt.plot(t, x, color="b", label="data")
plt.scatter(t, x, s=10, c="b", label="value")
# Plot threshold
plt.plot(
[min(t), max(t)],
[thresh_top, thresh_top],
"--",
color="r",
label="peaks-threshold",
)
plt.plot(
[min(t), max(t)],
[thresh_bottom, thresh_bottom],
"--",
color="g",
label="valleys-threshold",
)
# Plot peaks (red) and valleys (blue)
plt.plot(
[t[i] for i in peak_idx],
[x[i] for i in peak_idx],
"x",
color="r",
label="peaks",
)
plt.plot(
[t[i] for i in valley_idx],
[x[i] for i in valley_idx],
"x",
color="g",
label="valleys",
)
# ...