Search code examples
scalaclassapache-flink

Flink scala Case class


I want to know how to replace x._1._2, x._1._3 by the name of the field using case class

def keyuid(l:Array[String]) : (String,Long,String) ={
  //val l=s.split(",")
  val ip=l(3).split(":")(1)
  val values=Array("",0,0,0)
  val uid=l(1).split(":")(1)
  val timestamp=l(2).split(":")(1).toLong*1000
  val impression=l(4).split(":")(1)
  return (uid,timestamp,ip)
}

val cli_ip = click.map(_.split(","))
  .map(x => (keyuid(x), 1.0)).assignAscendingTimestamps(x=>x._1._2)
  .keyBy(x => x._1._3)
  .timeWindow(Time.seconds(10))
  .sum(1)

Solution

  • Pattern Matching is indeed a good idea and makes the code more readable. To answer your question, to make keyuid function returns a case class, you first need to define it, for instance :

    case class Click(string uid, long timestamp, string ip)

    Then instead of return (uid,timestamp,ip) you need to do return Click(uid,timestamp,ip)

    Case class are not related to flink, but to scala : https://docs.scala-lang.org/tour/case-classes.html