Stuck with type hints in clojure for generic class

I am trying to get a small example from Apache flink running in clojure, but right now I am stuck, because of the type hinting in clojure and some strange quirk in flink.

Here is my code:

(ns pipeline.core
 ( ExecutionEnvironment)
 (org.apache.flink.api.common.functions FlatMapFunction)
 ( Tuple2)
 (org.apache.flink.util Collector)
 (java.lang String)))

(def flink-env (ExecutionEnvironment/createLocalEnvironment))

(def dataset (.fromElements flink-env (to-array ["please test me"])))

(defn tokenizer [] (reify FlatMapFunction
                 ( flatMap [this value collector] 
                   (println value))))

(.flatMap dataset (tokenizer))

If I do not provide type hints, I get an error from the flink api:

Caused by: java.lang.IllegalArgumentException: The types of the interface org.apache.flink.api.common.functions.FlatMapFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic types is limited at this point.

If I provide type hints:

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector collector] 
                   (println value))))

I get an error from the clojure compiler:

Caused by: java.lang.IllegalArgumentException: Can't find matching method: flatMap, leave off hints for auto match.
at clojure.lang.Compiler$NewInstanceMethod.parse( 

Is there a way to add type hints in clojure with generic classes? It should be something like this:

(defn tokenizer [] (reify FlatMapFunction
                 ( ^void flatMap [this ^String value ^Collector<Tuple2<String, Integer>> collector] 
                   (println value))))

But that doesn't work. Any ideas?

The lein config looks like this:

(defproject pipeline "0.1.0-SNAPSHOT"
 :description "FIXME: write description"
 :url ""
 :license {:name "Eclipse Public License"
        :url ""}
 :dependencies [[org.clojure/clojure "1.7.0"]               
             [org.apache.flink/flink-java "0.9.0"]              
  :aot :all)


  • Clojure cannot handle reflections, thus you need to specify the return type manually via Flink method returns.

    (.returns (.flatMap dataset (tokenizer)) String)

    Furthermore, you need to use deftype to define tokenizer and instantiate a new object when using it because Flink cannot handle anonymous classes:

    (deftype tokenizer [] FlatMapFunction
                          (flatMap [this value collector] 
                            (println value)))
    (.flatMap dataset (tokenizer.))

    Here is a full "Word-Count-Example" that can be packed into a jar and executed.

    Pay attention to the type hints and casts. For tokenizer output (int 1) is required, otherwise Long would be the second type of Tuple2. Furthermore, we use a String to declare the output type for tokenizer (a class type is not sufficient because the reflection types must also be specified). Finally, we need to type hint (int-array [0]) to resolve the overload of groupBy (without it, the method is ambiguous to the Clojure compiler).

    (ns org.apache.flink.flink-clojure.WordCount
     (org.apache.flink.api.common.functions FlatMapFunction)
     ( DataSet)
     ( ExecutionEnvironment)
     ( Tuple2)
     (org.apache.flink.util Collector)
     (java.lang String))
     (:require [clojure.string :as str])
    (def flink-env (ExecutionEnvironment/createLocalEnvironment))
    (def text (.fromElements flink-env (to-array ["please test me and me too"])))
    (deftype tokenizer [] FlatMapFunction
                          (flatMap [this value collector]
                            (doseq [v (str/split value #"\s")]
                              (.collect collector (Tuple2. v (int 1))))))
    (def tokens (.returns (.flatMap text (tokenizer.)) "Tuple2<String,Integer>"))
    (def counts (.sum (.groupBy tokens (int-array [0])) 1))
    (defn -main []
      (.print counts)