Search code examples
scalaapache-flinkflink-streamingscala-streams

How to consume api rest passing flink stream as parameter and return this stream transformed


I'm new in apache flink. I have one flink scala project that consume data from kafka cluster and I need to pass the stream result as parameter to consume api that return this stream transformed. Here is my code

class Testing {
  def main(args: Array[String]): Unit = {}
  def streamTest(): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
    val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
    consumer_test.setStartFromEarliest()
    val stream =  env.addSource(consumer_test).setParallelism(5)
    val api_test = "http://api-test.server.local/test/?msg=%s"
    // Here I need pass stream as parameter to api and return transformed stream
    env.execute()
  }   
}

Any Help ?


Solution

  • This is my final code. I hope that helps

    class Testing extends Serializable{
      def main(args: Array[String]): Unit = {}
      def streamTest(): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
        val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
        consumer_test.setStartFromEarliest()
        val stream =  env.addSource(consumer_test)
        // Here I need pass stream as parameter to api and return transformed stream
        val result = stream.flatMap{
          (str, out: Collector[String]) =>
            val api_test = "http://api-test.server.local/test/?msg=%s"
            out.collect {
              getUrl(api_test.format(URLEncoder.encode(str, "UTF-8")))
            }        
        }    
        env.execute()
      }
    
      def getUrl(url: String): String = {
        val timeout = 5
        val config = RequestConfig.custom.setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build
        val client: CloseableHttpClient = HttpClientBuilder.create.setDefaultRequestConfig(config).build
        val request = new HttpGet(url)
        val response = client.execute(request)
        val entity = response.getEntity
        val get_result = EntityUtils.toString(entity)
        get_result
      }     
    }