Search code examples
apache-flinkflink-statefun

Using Flink connector within Flink StateFun


I've managed to plug in the GCP PubSub dependency into the Flink Statefun JAR and then build the Docker image.

I've added the below to the pom.xml.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-gcp-pubsub</artifactId>
    <version>1.16.0</version>
    <scope>test</scope>
</dependency>

It's not too clear how I now specify my PubSub ingress and egress in the module.yaml that we use with the StateFun image.

https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/

For example, for Kakfa you use:

kind: io.statefun.kafka.v1/egress
spec:
  id: com.example/my-egress
  address: kafka-broker:9092
  deliverySemantic:
    type: exactly-once
    transactionTimeout: 15min

I can see the official connectors have a Kind const in the Java code that you use to reference the connectors within your module.yaml but I can't see in the docs how to reference the Flink connectors you plug in yourself to the StateFun image.


Solution

  • GCP PubSub is not officially supported as a standard Statefun IO component, only Kafka and Kinesis for now; however you can come up with your own custom ingress/egress connector relatively easily. Unfortunately you won't be able to provide a way to have a new yaml-based config item, as the modules configurators for Kafka and Kinesis seem to be hard-coded in the runtime. You'll have to do your configuration in your code:

    Looking at the source/ingress example:

    public class ModuleWithSourceSpec implements StatefulFunctionModule {
    
        @Override
        public void configure(Map<String, String> globalConfiguration, Binder binder) {
            IngressIdentifier<TypedValue> id =
                new IngressIdentifier<>(TypedValue.class, "com.example", "custom-source");
            IngressSpec<TypedValue> spec = new SourceFunctionSpec<>(id, new FlinkSource<>());
            binder.bindIngress(spec);
            binder.bindIngressRouter(id, new CustomRouter());
        }
    }
    

    Your goal is going to be to provide the new FlinkSource<>(), which is a org.apache.flink.streaming.api.functions.source.SourceFunction

    You could declare it thus:

    SourceFunction source = 
        PubSubSource.newBuilder()
          .withDeserializationSchema(new IntegerSerializer())
          .withProjectName(projectName)
          .withSubscriptionName(subscriptionName)
          .withMessageRateLimit(1)
          .build();
    

    You'll also have to come up with a new CustomRouter(), to determine which function instance should handle an event initially. You can take inspiration from here:

    public static class GreetingsStateBootstrapDataRouter implements Router<Tuple2<String, Integer>> {
      @Override
      public void route(
          Tuple2<String, Integer> message, Downstream<Tuple2<String, Integer>> downstream) {
        downstream.forward(new Address(GREETER_FUNCTION_TYPE, message.f0), message);
      }
    }            
    

    Same thing for sink/egress, no router to provide:

    public class ModuleWithSinkSpec implements StatefulFunctionModule {
    
        @Override
        public void configure(Map<String, String> globalConfiguration, Binder binder) {
            EgressIdentifier<TypedValue> id = new EgressIdentifier<>("com.example", "custom-sink", TypedValue.class);
            EgressSpec<TypedValue> spec = new SinkFunctionSpec<>(id, new FlinkSink<>());
            binder.bindEgress(spec);
        }
    }
    

    With new FlinkSink<>() replaced by this sink:

    SinkFunction sink =
        PubSubSink.newBuilder()
            .withSerializationSchema(new IntegerSerializer())
            .withProjectName(projectName)
            .withTopicName(outputTopicName)
            .build();
    

    That you would use like so, in the egress case:

    public class GreeterFn implements StatefulFunction {
    
        static final TypeName TYPE = TypeName.typeNameFromString("com.example.fns/greeter");
    
        static final TypeName CUSTOM_EGRESS = TypeName.typeNameFromString("com.example/custom-sink");
    
        static final ValueSpec<Integer> SEEN = ValueSpec.named("seen").withIntType();
    
        @Override 
        CompletableFuture<Void> apply(Context context, Message message) {
            if (!message.is(User.TYPE)) {
                throw new IllegalStateException("Unknown type");
            }
    
            User user = message.as(User.TYPE);
            String name = user.getName();
    
            var storage = context.storage();
            var seen = storage.get(SEEN).orElse(0);
            storage.set(SEEN, seen + 1);
    
            context.send(
                EgressMessageBuilder.forEgress(CUSTOM_EGRESS)
                    .withUtf8Value("Hello " + name + " for the " + seen + "th time!")
                    .build());
    
            return context.done();
        }
    }
    

    You'll also have to make your Module known to the runtime using a file mentioning your Module in the META-INF/services directory of your jar, like so:

    com.example.your.path.ModuleWithSourceSpec
    com.example.your.path.ModuleWithSinkSpec 
    

    Alternatively if you prefer annotations you can use Google Autoservice like so


    I hope it helps!