Search code examples
maventransactionsapache-stormtransactionaltrident

Trident kafka transactional spout


I should implement a trident transactional topology. I find out that I can use kafka as spout to make my topology transactional. I find https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka it's a kafka spout for storm but it's not transactional. Also I find https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java it's a trident kafka transactional spout. But I don't have maven source for it and there is no instruction for using it. There is only instruction for using kafka storm spout. Also I need an instruction for implementing partitioned kafka. If you had an experience in implementing transactional topology, please help me! Thank you!


Solution

  • have you looked at this Kafka Spout ?

    The transactional spout is not sufficient to guarantee that your topology is transactional. The OpaqueTridentKafkaSpout implements the following properties:

    • Batches for a given txid are always the same. Replays of batches for a txid will exact same set of tuples as the first time that batch was emitted for that txid.
    • There's no overlap between batches of tuples (tuples are in one batch or another, never multiple).
    • Every tuple is in a batch (no tuples are skipped).

    But if you attempt to persist some computations then you have to implement a Transactional State so that updates on your database are performed exactly-once for each batch of tuples

    To achieve exactly-once processing you have to persist batch transaction ids and previous update values for your computations.

    Let's consider the example from the documentation:

    You are processing the following batch of tuples : ["man"] ["man"] ["dog"] associated with the transaction id "3"

    Then in your database you currently hold some word counters as key/value pairs :

    man => [count=3, txid=1]; dog => [count=4, txid=3] ; apple => [count=10, txid=2]

    Since the txid associated with key "dog" is the same that the current processing transaction you can skip this update. The tuple "dog" has already been updated for this transaction. But it's not the case of the others tuples. After updating you database look like this :

    man => [count=5, txid=3]; dog => [count=4, txid=3]; apple => [count=10, txid=2]

    By this way, Trident is able to compare transaction ids and previous values to decide if the update must be performed.

    Have a look at Map State!

    Basically a transactional topology is composed by both a transactional spout and a transactional state.

    You can find more information about trident state on this page : http://storm.incubator.apache.org/documentation/Trident-state

    I hope this will help you.