Search code examples
apache-kafkaapache-flinkpyflink

Pyflink DataStream API get Rowtime(Kafka Message Timestamp)


Is it possible to get the rowtime of a kafka message within the DataStream API of Flink/Pyflink?

i'm subscribing with pyflink to a kafka topic and need to access the metadata(rowtime) of the message i got:

types = Types.ROW_NAMED(['name', 'platform', 'year', 'global_sales', 
            'time_send', 'append_log_time', 'time_in_sps', 'write_time'], 
            [Types.STRING(), Types.STRING(), Types.INT(), Types.DOUBLE(),
             Types.STRING(), Types.LONG(), Types.STRING(), Types.STRING()])
# 2. create source DataStream
deserialization_schema = JsonRowDeserializationSchema.builder() \
    .type_info(type_info=types).build()

kafka_source = FlinkKafkaConsumer(
    topics='test',
    deserialization_schema=deserialization_schema,
    properties={'bootstrap.servers': 'kafka:9092'})
kafka_props = {'bootstrap.servers': 'kafka:9092', 'group.id': 'test_group'}

ds = env.add_source(kafka_source)
ds = ds.map(MyMapFunction(), output_type= Types.ROW_NAMED(['name', 'platform', 'year', 'global_sales', 'time_send', 'append_log_time', 'time_in_sps', 'write_time'], 
                [Types.STRING(), Types.STRING(), Types.INT(), Types.DOUBLE(), Types.STRING(), Types.LONG(), Types.STRING(), Types.STRING()])) 

I want to be able to read the AppendLogTime that the kafka broker sends within the metadata to Flink. With the TableAPI i'm able to get the rowtime attribute:

 tbl = t_env.from_data_stream(ds, col('name'), col('platform'),
     col('year'), col('global_sales'), col('time_send'),
     col('append_log_time').rowtime, col('time_in_sps'), 
     col('write_time'))

with the .rowtime i can get the AppendLogTime within the TableAPI of Flink from the Kafka message.

If i map my DataStream:

ds.map(MyMapFunction())

class MyMapFunction(MapFunction):

def open(self, runtime_context: RuntimeContext):
    pass

def map(self, value):
    return Row(value[0], value[1], value[2], value[3], value[4], value[5].rowtime, str(datetime.timestamp(datetime.now()) * 1000) , value[7])

and try to acces the rowtime attribute i get an AttributeError: rowtime

I even tried just rowtime; rowtime.rowtime and value[x].rowtime.

Is it possible to get the rowtime of a kafka message within the DataStream API of Flink/Pyflink?


Solution

  • I'm able to get the timestamp of the kafka message with a Processfunction and within that function i can access the timestamp with: context.timestamp():

    class MyProcessFunction(KeyedProcessFunction):
    
    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        result = "Current key: {}, name: {}, platform : {}, year : {}, global_sales : {}, time_send: {}, timestamp: {}, time_in_sps: {}, write_time: {}".format(
            "key", str(value[0]), str(value[1]),  str(value[2]),  str(value[3]), str(value[4]), str(ctx.timestamp()), str(value[6]), str(value[7]))
        yield result
    
    def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
        yield "On timer timestamp: " + str(timestamp)
    
    ds.process(MyProcessFunction(), output_type=Types.STRING())