Search code examples
apache-flinkflink-statefun

Flink statefun co-located functions communication


I have a properly working embedded job and I want to deploy additional co-located jobs. These newly added jobs will receive messages from the old job and send it to kafka topic.

code as below

@AutoService(StatefulFunctionModule.class)
public final class CoLocatedModule implements StatefulFunctionModule {

  @Override
  public void configure(Map<String, String> globalConfiguration, Binder binder) {
    FunctionProvider provider = new FunctionProvider();
    binder.bindFunctionProvider( CoLocated.TYPE,provider );

    binder.bindEgress(KafkaSpecs.TO_TRANSACTION_SPEC);
  }
}

I get an error as below

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no ingress defined.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalStateException: There are no ingress defined.
    at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
    at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
    at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:41)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

the error is pretty self-explanatory wants me to define ingress.

There is a similarly defined embedded module in the link -> https://ci.apache.org/projects/flink/flink-statefun-docs-stable/sdk/modules.html#embedded-module

Newly defined modules will receive messages from the other module and send them to kafka.

  • Do I have to define ingress for every co-located job? If not how can I make this work?
  • How can I get co-located jobs to communicate? Is it enough to use the same FunctionType?
  • Are co-located functions communicating over ingress/egress?

Solution

  • Responses inline, and FYI nothing you are asking is co-located specific. These properties hold for remote modules and jobs that contain mixed workloads of co-located and remote.

    Do I have to define ingress for every co-located job? If not how can I make this work?

    Yes, every job (remote or colocated) requires at least one ingress. An ingress is a channel that consumes messages from the outside world into a statefun application. Think Kafka or Kinesis. Without an ingress, the job would never do anything because there would be no initial messages to begin the processing.

    To each ingress, you will bind 1 or more routers, which take each message from the ingress and forward them to 0 or more functions based on their function types[1].

    How can I get co-located jobs to communicate? Is it enough to use the same FunctionType?

    Yes, functions simply message each other using their function types.

    Are co-located functions communicating over ingress/egress?

    No, messages are passed between functions using the Apache Flink runtime which contains a highly optimized network stack. Once a message is pulled from an ingress, it never interacts with that ingress again. If interested, you can read about how Flink's network stack works in some blog posts that the community wrote, but this is not necessary to successfully use statefun in production[2].

    [1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/io-module/index.html#router

    [2]https://flink.apache.org/2019/06/05/flink-network-stack.html