Search code examples
apache-flinkcomplex-event-processing

Can I print Individual elements of DataSteam<T> in Apache Flink without using inbuilt print() function


I am trying to Print the values of warnings that have been detected in Flink

// Generate temperature warnings for each matched warning pattern

    DataStream<TemperatureEvent> warnings = tempPatternStream.select(
        (Map<String, MonitoringEvent> pattern) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("first");


            return new TemperatureEvent(first.getRackID(), first.getTemperature()) ;
        }
    );



    // Print the warning and alert events to stdout



    warnings.print();

I am getting output as below(as per toString of eventSource function)

Rack id = 99 and temprature = 76.0

Can someone tell me, if there is any way I can print the values of DataStream without using print? An example would be, if I only want to print temperature, how can I access Individual elements in DataStream.

Thanks in Advance


Solution

  • I have figured out a way to access individual elements, Lets assume we have a DataStream

     HeartRate<Integer,Integer>
    

    It has 2 attributes

    private Integer Patient_id ;
    private  Integer HR;
    

    // Generating a Datasteam using custom function

    DataStream<HREvent> hrEventDataStream = envrionment
                    .addSource(new HRGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
    

    Assuming that you have Generated a Datasteam using custom function ,now we can print the values of Individual Elements of HeartRateEvent as below

    hrEventDataStream.keyBy(new KeySelector<HREvent, Integer>() {
        @Override
        public Integer getKey(HREvent hrEvent) throws Exception {
            return hrEvent.getPatient_id();
        }
             })
            .window(TumblingEventTimeWindows.of(milliseconds(10)))
            .apply(new WindowFunction<HREvent, Object, Integer, TimeWindow>() {
                @Override
                public void apply(Integer integer, TimeWindow timeWindow, Iterable<HREvent> iterable, Collector<Object> collector) throws Exception {
    
                    for(HREvent in : iterable){
    
                        System.out.println("Patient id  = " + in.getPatient_id() + " Heart Rate  = " + in.getHR());
                    }//for
    
                }//apply
            });
    

    Hope it Helps !