I am trying to implement window triggers in flink, which will fire if the average value is higher than a threshold.
The streaming data has student's name & mark seperated by ,
. The window has to be triggered, if the average mark of the student cross 90
irrespective of number of attempts.
example data:
Fred,88
Fred,91
Wilma,93
.
.
Current Flink Code:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger, Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window}
case class Marks(name : String, mark : Double, count : Int)
class MarksTrigger[W <: Window] extends Trigger[Marks,W] {
override def onElement(element: Marks, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
if(element.mark > 90) TriggerResult.FIRE // fire if avg mark is > 90
else TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def clear(window: W, ctx: TriggerContext) = ???
}
object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.socketTextStream("localhost", 9999)
val fdata = data.map { values =>
val columns = values.split(",")
Marks(columns(0), columns(1).toDouble, 1)
}
val keyed = fdata.keyBy(_.name).
window(GlobalWindows.create()).
trigger(new MarksTrigger[GlobalWindow]()). // TODO
keyed.print()
env.execute()
}
}
Calculating average: Tried the following in batch mode
case class Marks(name : String, mark : Double, count : Int)
val data = benv.fromElements(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
data.map(x => (x._1, x._2, 1)).groupBy(0).reduce { (x, y) =>
(x._1, x._2 + y._2, x._3 + y._3)
}.map(x => Marks(x._1, x._2/x._3, x._3)).collect
How do i tie these together? should .window()
and .trigger()
be called before calculating average or average calculation should be done inside onElement()
?
I figured out the solution
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger, Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window}
class MarksTrigger[W <: Window] extends Trigger[Marks,W] {
override def onElement(element: Marks, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
//trigger is fired if average marks of a student cross 80
if(element.mark > 90) TriggerResult.FIRE
else TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def clear(window: W, ctx: TriggerContext) = ???
}
case class Marks(name : String, mark : Double, count : Int)
object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.socketTextStream("localhost", 9999)
// data is obtained in "name,mark" format
val fdata = data.map { values =>
val columns = values.split(",")
(columns(0), columns(1).toDouble, 1)
}
// calculating average mark and number of exam attempts
val keyed1 = fdata.keyBy(0).reduce { (x,y) =>
(x._1, x._2 + y._2, x._3 + y._3)
}.map( x => Marks(x._1, x._2 / x._3, x._3))
val keyed = keyed1.keyBy(_.name).
window(GlobalWindows.create()).
trigger(PurgingTrigger.of(new MarksTrigger[GlobalWindow]())).
maxBy(1)
keyed.print()
env.execute()
}
}