Search code examples
apache-flinkflink-streaming

Distributed execution with Apache Flink and (de)serialization of class fields/members


I have a simple proof of concept for a Flink job. It basically receives messages (in JSON format) from a Kafka topic, deserialize them into a domain model, validate these against some predefined set of rules, applies some transformations, and finally publishes a resultant message into a Kafka sink.

I do have several functions/operators that use some behavior from other "service" classes. Those "service" classes could import some other dependencies as well.

As far as I know, Flink will try to (de)serialize those functions/operators in order to make the entire job truly distributed. I'm not clear if Flink would automatically avoid that by using transient on those fields/members or if it would be enough to declare them as static to avoid that.

This is an example of what I have:

public final class SomeFlatMapFunction implements FlatMapFunction<SomeMessage, Some> {
  private static final long serialVersionUID = -5810858761065889162L;

  private static final SomeMapper MAPPER = SomeMapper.INSTANCE;

  private static final Validator VALIDATOR = Validator.INSTANCE;

  @Override
  public void flatMap(final SomeMessage value, final Collector<Some> out) {
    final var result = MAPPER.valueFrom(value);
    final var violations = VALIDATOR.getValidator().validate(result);
    if (violations.isEmpty()) {
      out.collect(result);
    }
  }
}

I haven't seen any issues with this so far, but I'm just running the application locally. What's the best/accepted approach here, even for those case where one could have to inject those dependencies in the function's constructor? It looks very much that maintaining state between those functions is highly discouraged as well.


Solution

  • Operators do get serialized and deserialized, that's why there are several Rich* versions of the operators with open and close methods, they can be used to set up things after deserialization, once the operator is already in the task manager where it will run. Flink will respect Java's usual serialization rules and will not serialize static or transient members.

    In my experience, injecting domain classes in operators' constructors isn't a problem. Where you need to be careful is with domain classes that go through the network while the job is running, what sometimes is referred to as Data Transfer Objects. For those, the simplest thing is to implement them as POJOs, where 2 things are critical:

    • They must have a no-argument constructor.
    • They should be annotated with type information. See an example in this answer.

    The second is particularly important if such POJOs will be part of your application's state, i.e. if you are using Flink's managed state API.

    And somethig you already considered: adding serialVersionUID is also a good idea.