Search code examples
apache-kafkaapache-storm

Storm-kafka: set startOffsetTime to kafka.api.OffsetRequest.LatestTime in apache Flux Yaml topology


I am working on a topology using apache flux. Currently, strom fetches messages from beginning but I want it to fetch only the latest messages from kafka.

I am writing topology in YAML file.

This is how my spoutConfig looks like:

  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"

  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "172.25.33.191:2181"

  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      - ref: "zkHosts"
      - "blockdata"
      - ""
      - "myId"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

      - name: "ignoreZkOffsets"
        value: true

      - name: "startOffsetTime"
        ref: "XXXXXXXXX"

Now, I am stuck. How do I set startOffsetTime to proper function to get only the latest messages from kafka?

I have tried ref:"LatestTime", but no matter what I put in there, it give me error :

java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value


Solution

  • I believe Flux can handle calling static factory methods.

    - id: "startingOffsetTime"
      className: "kafka.api.OffsetRequest"
      factory: "LatestTime"
    

    and then use it in your SpoutConfig definition like

    properties:
      - name: "startOffsetTime"
        ref: "startingOffsetTime"
    

    I haven't tested this, but I think it should work. The ability to call static factory methods was merged a while back https://issues.apache.org/jira/browse/STORM-2796, but it seems to be missing from the documentation. I've raised an issue to update the docs https://issues.apache.org/jira/browse/STORM-3086.

    In case you'd like to see an example of this feature, take a look at https://github.com/apache/storm/blob/master/flux/flux-core/src/test/resources/configs/config-methods-test.yaml#L38