Search code examples

How to route custom objects in an embedded Flink Statefun module?

I have an embedded module in Apache Flink Statefun 3.0 (customized the Greeter example) that consumes JSON serialized events. When trying to route() messages deserialized from the ingress, I get an exception that my custom type is not convertible to protobuf (yes, it isn't) -- but should it be? I mean I tried to go with the 3.x docs but did not find any restrictions about the type that is to be routed.

Any hints or pointers on this?


// The custom type (Bean-style and all)
public final class Message {
  @JsonProperty private String name;
  @JsonProperty private String id;
  @JsonProperty private int visits;
  public Message() {}
  public String getName() { return name; }
  public void setName(String s) { name = s; }
  public String getId() { return id; }
  public void setId(String s) { id = s; }
  public int getVisits() { return visits; }
  public void setVisits(int i) { visits = i; }

// The function
public class GreeterFn implements StatefulFunction {
    public static final FunctionType TYPE = new FunctionType("example", "greeter");
    public void invoke(Context ctx, Object msg) {
        // I never get here

// The module
public class EmbeddedModule implements StatefulFunctionModule {
    static final IngressIdentifier<Message> INGRESS = new IngressIdentifier<>(Message.class, "example", "names");

    private static final class MsgDeser implements KafkaIngressDeserializer<Message> {
        private final ObjectMapper mapper = new ObjectMapper();
        public Message deserialize(ConsumerRecord<byte[], byte[]> input) {
            try { return mapper.readValue(new String(input.value(), StandardCharsets.UTF_8), Message.class); }
            catch ( e) { e.printStackTrace(); }
            return null; 

    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        binder.bindIngressRouter(INGRESS, new Router<Message>() {
            public void route(Message m, Downstream<Message> ds) {
                ds.forward(GreeterFn.TYPE, m.getName(), m); // <-- I get here OK but then the exception
        binder.bindFunctionProvider(GreeterFn.TYPE, x -> new GreeterFn());

// And the logs (trimmed)
statefun-worker_1   | 2021-07-12 11:29:33,366 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: example-names-ingress -> router (names) (1/1)#0 (2b43e45ce4bcc61340ff131d147f3afe) switched from RUNNING to FAILED.         
statefun-worker_1   | java.lang.RuntimeException: class cannot be cast to class ( is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2aab3c1e; is in unnamed module of loader 'app')                                                                                                                                                                                                  
statefun-worker_1   |   at ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                      
statefun-worker_1   |   at ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                  
statefun-worker_1   |   at ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                  
statefun-worker_1   |   at org.apache.flink.streaming.api.operators.CountingOutput.collect( ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                       
statefun-worker_1   |   at org.apache.flink.streaming.api.operators.CountingOutput.collect( ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                       
statefun-worker_1   |   at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator$DownstreamCollector.forward( ~[statefun-flink-core.jar:3.0.0]                                                                
statefun-worker_1   |   at$Downstream.forward( ~[statefun-flink-distribution.jar:3.0.0]                                                                                                                
statefun-worker_1   |   at$1.route( ~[myflink-1.jar:?]                                                                                                                                                 
statefun-worker_1   |   at$1.route( ~[myflink-1.jar:?]                                                                                                                                                 
statefun-worker_1   |   at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.processElement( ~[statefun-flink-core.jar:3.0.0]                                                                              
statefun-worker_1   | Caused by: java.lang.ClassCastException: class cannot be cast to class ( is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2aab3c1e; is in unnamed module of loader 'app')                                                                                                                                                                                     
statefun-worker_1   |   at org.apache.flink.statefun.flink.core.message.MessagePayloadSerializerPb.serialize( ~[statefun-flink-core.jar:3.0.0]                                                                             


  • Messages that are sent/received by embedded functions are assumed to be Protobuf by default. You can use Kryo (or even a custom serializer) by setting, the following key in flink-conf.yaml:

    statefun.message.serializer: WITH_KRYO_PAYLOADS

    This is not really recommend, as it will be difficult to evolve your application over time.

    You can still stick with Protobuf, by postponing the String -> Message deserialization by using a built in Protobuf type called StringValue.

    I've adopted the code you've pasted to use StringValue:

        public class EmbeddedModule implements StatefulFunctionModule {
        static final IngressIdentifier<StringValue> INGRESS = new IngressIdentifier<>(StringValue.class, "example", "names");
        private static final class MsgDeser implements KafkaIngressDeserializer<StringValue> {
            private final ObjectMapper mapper = new ObjectMapper();
            public StringValue deserialize(ConsumerRecord<byte[], byte[]> input) {
                String json = new String(input.value(), StandardCharsets.UTF_8);
                return StringValue.of(json);
        public void configure(Map<String, String> globalConfiguration, Binder binder) {
            binder.bindIngressRouter(INGRESS, new Router<StringValue>() {
                public void route(StringValue m, Downstream<StringValue> ds) {
                    String json = StringValue.getValue();
                    String name = ... ; // extract the name from this JSON
                    ds.forward(GreeterFn.TYPE, name , m);
            binder.bindFunctionProvider(GreeterFn.TYPE, x -> new GreeterFn());

    Define your Message as a Portobuf Message

    To avoid the double deserialization (both at the router and the function) You can define the following Protobuf message:

    message MyMessage {
     string name = 1;
     string id = 2;
     int visits = 3;

    And then convert the json string to an instance of a MyMessage:

    MyMessage.Builder builder = MyMessage.newBuilder();
    JsonFormat.parser().merge(jsonString, builder);
    MyMessage myMessage =;