Is it possible to get the rowtime of a kafka message within the DataStream API of Flink/Pyflink?
i'm subscribing with pyflink to a kafka topic and need to access the metadata(rowtime) of the message i got:
types = Types.ROW_NAMED(['name', 'platform', 'year', 'global_sales',
'time_send', 'append_log_time', 'time_in_sps', 'write_time'],
[Types.STRING(), Types.STRING(), Types.INT(), Types.DOUBLE(),
Types.STRING(), Types.LONG(), Types.STRING(), Types.STRING()])
# 2. create source DataStream
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=types).build()
kafka_source = FlinkKafkaConsumer(
topics='test',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'kafka:9092'})
kafka_props = {'bootstrap.servers': 'kafka:9092', 'group.id': 'test_group'}
ds = env.add_source(kafka_source)
ds = ds.map(MyMapFunction(), output_type= Types.ROW_NAMED(['name', 'platform', 'year', 'global_sales', 'time_send', 'append_log_time', 'time_in_sps', 'write_time'],
[Types.STRING(), Types.STRING(), Types.INT(), Types.DOUBLE(), Types.STRING(), Types.LONG(), Types.STRING(), Types.STRING()]))
I want to be able to read the AppendLogTime that the kafka broker sends within the metadata to Flink. With the TableAPI i'm able to get the rowtime attribute:
tbl = t_env.from_data_stream(ds, col('name'), col('platform'),
col('year'), col('global_sales'), col('time_send'),
col('append_log_time').rowtime, col('time_in_sps'),
col('write_time'))
with the .rowtime i can get the AppendLogTime within the TableAPI of Flink from the Kafka message.
If i map my DataStream:
ds.map(MyMapFunction())
class MyMapFunction(MapFunction):
def open(self, runtime_context: RuntimeContext):
pass
def map(self, value):
return Row(value[0], value[1], value[2], value[3], value[4], value[5].rowtime, str(datetime.timestamp(datetime.now()) * 1000) , value[7])
and try to acces the rowtime attribute i get an AttributeError: rowtime
I even tried just rowtime; rowtime.rowtime and value[x].rowtime.
Is it possible to get the rowtime of a kafka message within the DataStream API of Flink/Pyflink?
I'm able to get the timestamp of the kafka message with a Processfunction and within that function i can access the timestamp with: context.timestamp():
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
result = "Current key: {}, name: {}, platform : {}, year : {}, global_sales : {}, time_send: {}, timestamp: {}, time_in_sps: {}, write_time: {}".format(
"key", str(value[0]), str(value[1]), str(value[2]), str(value[3]), str(value[4]), str(ctx.timestamp()), str(value[6]), str(value[7]))
yield result
def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield "On timer timestamp: " + str(timestamp)
ds.process(MyProcessFunction(), output_type=Types.STRING())