Search code examples
apache-kafkaapache-flinkpyflinkstream-processing

Why does PyFlink give me a time in the past


I have a Kafka topic in which I produce an entry every 2-3 seconds Then I have PyFlink job that will format the entries and send them to a db

here's my Flink env setup

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(3, 10000))
env.set_parallelism(4)
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

kafka_consumer = FlinkKafkaConsumer(
    topics=SOURCE_TOPIC,
    deserialization_schema=deserialization_schema,
    properties=KAFKA_PROPERTIES
)
kafka_consumer.set_start_from_group_offsets()

ds = env.add_source(kafka_consumer, "DataFlowSource")

then here's the part where the problem happens

class FormatData(BroadcastProcessFunction):
    def process_element(self, value, ctx):
        # something happens on value here
        time = ctx.current_processing_time() / 1000
        yield metrics_stream_tag, (time, value)
        yield ("some other information for another table")

Now i am sure that there's at least 2 seconds between each value yet when i look at the db or print the stream i see something like this

2024-04-30 11:48:16+00  5
2024-04-30 11:48:16+00  7
2024-04-30 11:48:16+00  12

and then it keeps going for 10-25 times then the time changes to the current time like

2024-04-30 11:50:22+00  5

and then repeat

Some more context

  1. When i print instead of sending to the db i can see at least 2 seconds between each entry but the time value doesn't change (same idea as above)
  2. here's the part where i sink the metrics_stream_tag
metrics_stream = ds.get_side_output(metrics_stream_tag)
# metrics_stream.add_sink(psql_metrics_sink)
metrics_stream.print()

I tried to use datetime.now.timestamp() but this didn't change anything what I'm expecting is getting the time to be 2-3 seconds apart, it shouldn't get stuck for 2 minutes each before changing


Solution

  • So I still don't know why it is this way but I know a solution it all starts with the output types, I had it as Types.FLOAT() this causes these weird conversions when I use Types.DOUBLE() or Types.STRING() instead it works as a charm, so if someone is here and they know why please let me know