Search code examples
apache-flinkflink-streamingpyflink

Flink program to detect anomalies does not give any ouput when number of logs ingested by kafka topic is zero


I am writing a pyflink program that is used for anomaly detection on the number of logs coming into a kafka topic using exponentially weighted moving average. The kafka broker the topic is on is running within my local computer

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.window import TumblingProcessingTimeWindows
from pyflink.datastream.functions import AggregateFunction, MapFunction
from pyflink.common.time import Time
from datetime import datetime, timedelta

class SimpleCountAggregateFunction(AggregateFunction):
    def create_accumulator(self):
        return (0, None)  # Include a placeholder for timestamp

    def add(self, value, accumulator):
        # Assume value is a tuple of (count, timestamp)
        count, timestamp = value
        if count == 0:
            timestamp = datetime.now()  # Update timestamp for zero count window
        return (accumulator[0] + 1, timestamp)

    def get_result(self, accumulator):
        return accumulator

    def merge(self, a, b):
        return (a[0] + b[0], a[1])

class EMACalculatorWithPrediction(MapFunction):
    def __init__(self, period, start_time):
        self.period = period
        self.smoothing_factor = 2 / (1 + period)
        self.ema = None
        # Convert start_time from datetime to timestamp for simpler handling
        self.window_start_time = start_time

    def map(self, value):
        count, _ = value  # Unpack the count, ignore the incoming timestamp
        original_value = float(count)
        if self.ema is None:
            self.ema = original_value
        else:
            self.ema = (original_value * self.smoothing_factor) + (self.ema * (1 - self.smoothing_factor))

        current_time = self.window_start_time.strftime("%Y-%m-%d %H:%M:%S")
        # Update start_time for the next window
        self.window_start_time += timedelta(seconds=60)

        # Check for anomaly
        if original_value > 0 and ((original_value - self.ema) / original_value) > 0.3:
            # Report anomaly
            return (current_time, original_value, self.ema, "Anomaly Detected")
        else:
            return (current_time, original_value, self.ema, "No Anomaly")

def main():
    env = StreamExecutionEnvironment.get_execution_environment()

    kafka_source = FlinkKafkaConsumer(
        topics="input-events",
        deserialization_schema=JsonRowDeserializationSchema.builder().type_info(
            type_info=Types.ROW_NAMED(["timestamp", "data"], [Types.INSTANT(), Types.STRING()])).build(),
        properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'count-group'}
    )

    data_stream = env.add_source(kafka_source)

    # Convert to a tuple of (count, timestamp) before windowing
    mapped_stream = data_stream.map(lambda x: (1, x[0]), output_type=Types.TUPLE([Types.LONG(), Types.INSTANT()]))

    counted_stream = mapped_stream \
        .window_all(TumblingProcessingTimeWindows.of(Time.seconds(60))) \
        .aggregate(SimpleCountAggregateFunction(), output_type=Types.TUPLE([Types.LONG(), Types.INSTANT()])) \
        .map(lambda count_timestamp: (str(count_timestamp[0]), count_timestamp[1]), output_type=Types.TUPLE([Types.STRING(), Types.INSTANT()]))

    # Initialize EMACalculatorWithPrediction with the current system time as the start time for the first window
    initial_window_start_time = datetime.now()
    ema_and_value_stream = counted_stream.map(EMACalculatorWithPrediction(period=10, start_time=initial_window_start_time), 
                                              output_type=Types.TUPLE([Types.STRING(), Types.FLOAT(), Types.FLOAT(), Types.STRING()]))

    
    ema_and_value_stream.map(lambda x: f"Timestamp: {x[0]}, Original: {x[1]}, EWMA: {x[2]}, Status: {x[3]}", output_type=Types.STRING()).print()

    env.execute("Kafka JSON Count, EWMA Calculation, and Anomaly Detection")

if __name__ == "__main__":
    main()

The code counts the number of logs coming into a kafka topic using a tumbling window, the ewma (exponential weighted moving average) and anomaly detection part of the code works fine, but the issue arises with the count part of the code.The count of tumbling window works when there are logs coming into the topic, however, the count does not work when the number of logs is zero, i want to modify the code such that it works when the count is zero

The things that I have done to solve this issue are •Writing an if else condition comparing it to 0 •Changing the way the map function is calculated

any other suggestions to help me fix the logic would be greatly appreciated, i have been stuck here for the past 6 hours


Solution

  • When you use the DataStream API, windows don't exist until the first record is assigned to them, so there's no chance for an empty window to produce results. The same is also true for the Table/SQL API.

    My suggestion would be to rewrite this using a KeyedProcessFunction rather than a window. I'm suggesting a KeyedProcessFuntion rather than a ProcessFunction, because you'll need to keep some state (the counter), and you'll need a timer, and timers are only available with keyed streams. Since it doesn't make sense to key partition the stream, you can use a key selector function that returns a constant -- i.e., assign the same key to every event.

    The keyed process function should be pretty straightforward:

    • in the open() method, set up the ValueState object holding the counter
    • in the processElement() method, if the value of the counter is null, initialize the counter to 1 and register a timer; otherwise, increment the counter
    • on the onTimer() method, emit the results, reset the counter to 0, and register a new timer