Search code examples
pythontime-seriesunsupervised-learning

Evaluation metric for parameter tuning for outlier detection (unsupervised learning) on time series


I'm working on implementing parameter tuning for outlier detection in time-series data using the DBSCAN algorithm. To maximize the Silhouette score (as evaluation), I'm leveraging optuna for tuning. However, after parameter tuning, the model's performance seems to be underperformed. Below is the complete code, which encompasses data generation, preprocessing, decomposition, parameter tuning, and applying.

I utilized isolated forest, LOF, and OneSVM algorithms and the result was similar. I utilized metrics including davies_bouldin_score and calinski_harabasz_score, but did not achieve better results.

How can I improve the outlier detection parameter tuning?

import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.preprocessing import MinMaxScaler
from sklearn.cluster import DBSCAN
from sklearn.metrics import silhouette_score
import optuna

# Function to generate time series data
def generate_time_series(n_samples=300, n_outliers=30):
    np.random.seed(np.random.randint(10000))
    t = np.linspace(0, 50, n_samples)
    y = np.cumsum(np.random.randn(n_samples)) + np.sin(t)  # Adding trend and noise
    outlier_indices = np.random.choice(n_samples, n_outliers, replace=False)
    y[outlier_indices] += 15 * np.random.randn(n_outliers)  # Injecting outliers

    return y.reshape(-1, 1), t

# Generate the time series data
y, t = generate_time_series()

# Plot the time series data
plt.figure(figsize=(10, 5))
plt.plot(t, y, label='Time series', color='blue')
plt.xlabel('Time')
plt.ylabel('Value')
plt.title('Generated Time Series Data')
plt.legend()
plt.show()

# Decompose the time series
result = seasonal_decompose(y, period=30, model='additive', two_sided=True)
residual = result.resid

# Handle NaN values in residuals (if any)
non_nan_indices = ~np.isnan(residual).flatten()
residual = residual[non_nan_indices].reshape(-1, 1)
t_residual = t[non_nan_indices]

# Plot the seasonal decomposition
plt.figure(figsize=(10, 5))
plt.subplot(411)
plt.plot(t, y, label='Original', color='blue')
plt.legend(loc='best')
plt.subplot(412)
plt.plot(t, result.trend, label='Trend', color='orange')
plt.legend(loc='best')
plt.subplot(413)
plt.plot(t, result.seasonal, label='Seasonal', color='green')
plt.legend(loc='best')
plt.subplot(414)
plt.plot(t_residual, residual, label='Residual', color='red')
plt.legend(loc='best')
plt.tight_layout()
plt.show()

# Scale the residual data
scaler = MinMaxScaler()
residual_scaled = scaler.fit_transform(residual)

# Define the objective function for DBSCAN
def dbscan_objective(trial):
    eps = trial.suggest_float('eps', 0.01, 0.5, log=True)
    min_samples = trial.suggest_int('min_samples', 2, 20)
    
    dbscan = DBSCAN(eps=eps, min_samples=min_samples)
    clusters = dbscan.fit_predict(residual_scaled)
    
    # Ignore cases where all points are considered noise
    if len(set(clusters)) <= 1:
        return -1.0
    
    score = silhouette_score(residual_scaled, clusters)
    return score

# Optimize DBSCAN using Optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)
dbscan_study = optuna.create_study(direction='maximize')
dbscan_study.optimize(dbscan_objective, n_trials=100, show_progress_bar=True)
best_dbscan_params = dbscan_study.best_params
print(f"Best DBSCAN parameters: {best_dbscan_params}")

# Apply DBSCAN with the best parameters
dbscan = DBSCAN(**best_dbscan_params)
dbscan_clusters = dbscan.fit_predict(residual_scaled)
dbscan_outliers = (dbscan_clusters == -1)

# Plot the detected outliers in the residuals
plt.figure(figsize=(10, 5))
plt.plot(t_residual, residual, label='Residual', color='blue')
plt.scatter(t_residual[dbscan_outliers], residual[dbscan_outliers], color='red', label='Outliers')
plt.xlabel('Time')
plt.ylabel('Value')
plt.title('DBSCAN Outlier Detection on Residuals')
plt.legend()
plt.show()

# Plot the detected outliers in the original time series
plt.figure(figsize=(10, 5))
plt.plot(t, y, label='Time series', color='blue')
plt.scatter(t_residual[dbscan_outliers], y[non_nan_indices][dbscan_outliers], color='red', label='Outliers')
plt.xlabel('Time')
plt.ylabel('Value')
plt.title('DBSCAN Outlier Detection on Original Time Series')
plt.legend()
plt.show()

# Print the number of outliers detected by DBSCAN
print(f"Number of outliers detected by DBSCAN: {np.sum(dbscan_outliers)}")

enter image description here enter image description here


Solution

  • DBSCAN relies on distance measurements to find clusters, thus it is sensitive to the scale and distribution of the data. Even, in your case, you have just one feature vector, I don't think you need to scale it for outlier detection. Just use residual variable in hyper-parameter and final prediction. You may also need to increase eps may be up to 2. So final code would look like this:

    import numpy as np
    import matplotlib.pyplot as plt
    from statsmodels.tsa.seasonal import seasonal_decompose
    from sklearn.cluster import DBSCAN
    from sklearn.metrics import silhouette_score
    import optuna
    
    # Function to generate time series data
    def generate_time_series(n_samples=300, n_outliers=30):
        np.random.seed(np.random.randint(10000))
        t = np.linspace(0, 50, n_samples)
        y = np.cumsum(np.random.randn(n_samples)) + np.sin(t)  # Adding trend and noise
        outlier_indices = np.random.choice(n_samples, n_outliers, replace=False)
        y[outlier_indices] += 15 * np.random.randn(n_outliers)  # Injecting outliers
    
        return y.reshape(-1, 1), t
    
    # Generate the time series data
    y, t = generate_time_series()
    
    # Plot the time series data
    plt.figure(figsize=(10, 5))
    plt.plot(t, y, label='Time series', color='blue')
    plt.xlabel('Time')
    plt.ylabel('Value')
    plt.title('Generated Time Series Data')
    plt.legend()
    plt.show()
    
    # Decompose the time series
    result = seasonal_decompose(y, period=30, model='additive', two_sided=True)
    residual = result.resid
    
    # Handle NaN values in residuals (if any)
    non_nan_indices = ~np.isnan(residual).flatten()
    residual = residual[non_nan_indices].reshape(-1, 1)
    t_residual = t[non_nan_indices]
    
    # Plot the seasonal decomposition
    plt.figure(figsize=(10, 5))
    plt.subplot(411)
    plt.plot(t, y, label='Original', color='blue')
    plt.legend(loc='best')
    plt.subplot(412)
    plt.plot(t, result.trend, label='Trend', color='orange')
    plt.legend(loc='best')
    plt.subplot(413)
    plt.plot(t, result.seasonal, label='Seasonal', color='green')
    plt.legend(loc='best')
    plt.subplot(414)
    plt.plot(t_residual, residual, label='Residual', color='red')
    plt.legend(loc='best')
    plt.tight_layout()
    plt.show()
    
    
    # Define the objective function for DBSCAN
    def dbscan_objective(trial):
        eps = trial.suggest_float('eps', 0.01, 2, log=True)
        min_samples = trial.suggest_int('min_samples', 2, 20)
        
        dbscan = DBSCAN(eps=eps, min_samples=min_samples)
        clusters = dbscan.fit_predict(residual)
        
        # Ignore cases where all points are considered noise
        if len(set(clusters)) <= 1:
            return -1.0
        
        score = silhouette_score(residual, clusters)
        return score
    
    # Optimize DBSCAN using Optuna
    optuna.logging.set_verbosity(optuna.logging.WARNING)
    dbscan_study = optuna.create_study(direction='maximize')
    dbscan_study.optimize(dbscan_objective, n_trials=100, show_progress_bar=True)
    best_dbscan_params = dbscan_study.best_params
    print(f"Best DBSCAN parameters: {best_dbscan_params}")
    
    # Apply DBSCAN with the best parameters
    dbscan = DBSCAN(**best_dbscan_params)
    dbscan_clusters = dbscan.fit_predict(residual)
    dbscan_outliers = (dbscan_clusters == -1)
    
    # Plot the detected outliers in the residuals
    plt.figure(figsize=(10, 5))
    plt.plot(t_residual, residual, label='Residual', color='blue')
    plt.scatter(t_residual[dbscan_outliers], residual[dbscan_outliers], color='red', label='Outliers')
    plt.xlabel('Time')
    plt.ylabel('Value')
    plt.title('DBSCAN Outlier Detection on Residuals')
    plt.legend()
    plt.show()
    
    # Plot the detected outliers in the original time series
    plt.figure(figsize=(10, 5))
    plt.plot(t, y, label='Time series', color='blue')
    plt.scatter(t_residual[dbscan_outliers], y[non_nan_indices][dbscan_outliers], color='red', label='Outliers')
    plt.xlabel('Time')
    plt.ylabel('Value')
    plt.title('DBSCAN Outlier Detection on Original Time Series')
    plt.legend()
    plt.show()
    
    # Print the number of outliers detected by DBSCAN
    print(f"Number of outliers detected by DBSCAN: {np.sum(dbscan_outliers)}")
    

    And you will get something like this:

    enter image description here