Search code examples
jsonapache-flinkflink-streamingflink-cepflink-sql

how to stream a json using flink?


i 'm actually working on a stream, receiving a bunch of strings and need to make a count of all the strings. the sums is aggragated, that mean for the second record the sum was added to the day before the output must be some json file looking like

{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
    {"date" : "2018-03-03",
    "sum" : 120},
  {"date" :"2018-03-04",
  "sum" : 203}
  ]
}

i created a stream looking like :

val eventStream : DataStream [String] = 
eventStream
    .addSource(source)
    .keyBy("")
    .TimeWindow(Time.days(1), Time.days(1))
    .trigger(new MyTriggerFunc)
    .aggregation(new MyAggregationFunc)
    .addSink(sink)

thank you in advance for the help :)


Solution

  • Note on working with JSON in Flink:

    Use JSONDeserializationSchema to deserialize the events, which will produce ObjectNodes. You can map the ObjectNode to YourObject for convenience or continue working with the ObjectNode.

    Tutorial on working with ObjectNode: http://www.baeldung.com/jackson-json-node-tree-model

    Back to your case, you can do it like the following:

    val eventStream : DataStream [ObjectNode] = 
    oneMinuteAgg
        .addSource(source)
        .windowAll()
        .TimeWindow(Time.minutes(1))
        .trigger(new MyTriggerFunc)
        .aggregation(new MyAggregationFunc)
    

    will output a stream of 1min aggregates

    [     
          {
          "date" :2018-03-03
          "sum" : 120
          }, 
          {
          "date" :2018-03-03
          "sum" : 120
          }
    ]
    

    then chain another operator to the "oneMinuteAgg" that will add the 1min aggregates into 1day aggregates:

    [...]
    oneMinuteAgg
            .windowAll()
            .TimeWindow(Time.days(1))
            .trigger(new Whatever)
            .aggregation(new YourDayAggF)
    

    that will output what you need

    {
        "aggregationType" : "day"
        "days before" : 4
        "aggregates : [{
          "date" :2018-03-03
          "sum" : 120
          }, 
          {
          "date" :2018-03-03
          "sum" : 120
          }]
    }
    

    I used windowAll() assuming you don't need to key the stream.