Search code examples
hazelcasthazelcast-jet

Jet RoutingPolicy.ISOLATED not working as supposed


I'm trying to use RoutingPolicy.ISOLATED to create dedicated connections between the upstream and downstream, according to this thread Creating a new Jet custom Partitioner

Also, trying to use DiagnosticProcessors.peekOutputP to validate that messages from the same partition were being sent to the same downstream processor.

Below the logs from the Jet initialization:

45:29,049 Loading hazelcast-jet-default.xml from classpath.
45:29,225 Loading hazelcast-jet-member-default.xml from classpath.
45:30,293 [172.21.0.1]:5701 [jet] [0.5.1] Starting Jet 0.5.1 (20171206 - a2156c6)
45:30,294 [172.21.0.1]:5701 [jet] [0.5.1] Setting number of cooperative threads and default parallelism to 8
45:30,294 [172.21.0.1]:5701 [jet] [0.5.1] 
    o   o   o   o---o o---o o     o---o   o   o---o o-o-o        o o---o o-o-o
    |   |  / \     /  |     |     |      / \  |       |          | |       |  
    o---o o---o   o   o-o   |     o     o---o o---o   |          | o-o     |  
    |   | |   |  /    |     |     |     |   |     |   |      \   | |       |  
    o   o o   o o---o o---o o---o o---o o   o o---o   o       o--o o---o   o   
45:30,294 [172.21.0.1]:5701 [jet] [0.5.1] Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
45:32,256 [172.21.0.1]:5701 [jet] [0.5.1] 

Members {size:1, ver:1} [
    Member [172.21.0.1]:5701 - 71c9778e-ce8d-474e-9c8d-08616a229328 this
]

45:32,441 [172.21.0.1]:5701 [jet] [0.5.1] Starting job 0300-0f31-8123-97aa based on join/submit request from client
45:32,481 [172.21.0.1]:5701 [jet] [0.5.1] Start executing job 0300-0f31-8123-97aa, execution f272-4051-f961-34c2, status STARTING
dag
    .vertex("kafkaSource").localParallelism(2)
    .vertex("meta").localParallelism(2)
    .vertex("sink").localParallelism(1)
    .edge(between("kafkaSource", "meta").isolated())
    .edge(between("meta", "sink").partitioned(?))

45:32,551 [172.21.0.1]:5701 [jet] [0.5.1] Execution plan for job 0300-0f31-8123-97aa, execution f272-4051-f961-34c2 initialized
45:32,555 [172.21.0.1]:5701 [jet] [0.5.1] Start execution of job 0300-0f31-8123-97aa, execution f272-4051-f961-34c2 from coordinator [172.21.0.1]:5701
45:32,758 The configuration 'compression.type' was supplied but isn't a known config.
45:32,758 The configuration 'compression.type' was supplied but isn't a known config.
45:32,957 [172.21.0.1]:5701 [jet] [0.5.1] Partition assignments changed, new partitions: [jet-stream-0]
45:32,957 [172.21.0.1]:5701 [jet] [0.5.1] Partition assignments changed, new partitions: [jet-stream-1]

My Kafka producer is routing messages with the same key to the same partition:

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
key=src1 partition=1
key=src1 partition=1
key=src0 partition=0
key=src1 partition=1
key=src0 partition=0
key=src1 partition=1
key=src1 partition=1
key=src0 partition=0
key=src0 partition=0
key=src1 partition=1

Based on the logs, looks like that the kafkaSource vertex is outputing all the messages to the same processor:

54:39,991 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@6da59b20
54:39,993 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #0
54:40,842 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@69c2e820
54:40,844 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #1
54:41,843 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@489aca4c
54:41,845 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #2
54:42,838 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@2e0d6d1f
54:42,838 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #3
54:43,840 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@e1043d3
54:43,841 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #4
54:44,853 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4e6aabe6
54:44,854 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #5
54:45,842 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4ca9eb8a
54:45,842 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #6
54:46,847 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@10c630f7
54:46,849 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #7
54:47,848 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4639cdac
54:47,849 [172.21.0.1]:5701 [jet] [0.5.1] src0 / Frame #8
54:48,843 [172.21.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@325499d7
54:48,844 [172.21.0.1]:5701 [jet] [0.5.1] src1 / Frame #9

Below my Jet code:

DAG dag = new DAG();
DecodeKafkaMessage decodeKafkaMessage = new DecodeKafkaMessage();

Vertex kafkaSource = dag.newVertex("kafkaSource", DiagnosticProcessors.peekOutputP(streamKafkaP(properties, decodeKafkaMessage, topic)))
        .localParallelism(2);

Vertex meta = dag.newVertex("meta", mapP(LogLine::parse))
        .localParallelism(2);

Vertex sink = dag.newVertex("sink", DiagnosticProcessors.writeLoggerP())
        .localParallelism(1);


dag.edge(between(kafkaSource, meta)
        .isolated())
.edge(between(meta, sink)
        .allToOne());

UPDATE

Logline::parse

private static class LogLine {

    public LogLine() {
        // TODO Auto-generated constructor stub
    }

    public static String parse(KafkaMessage m) {
        return m.getSourceId() + " / " + m.getData();
    }       

}

DecodeKafkaMessage

public final class DecodeKafkaMessage implements DistributedBiFunction<Object, Object, Object>, Serializable {

private static final long serialVersionUID = 478528451550904377L;

@Override
public Object apply(Object t, Object u) {

    Gson gson = new Gson();

    KafkaMessage kafkaMessage = gson.fromJson(u.toString(), KafkaMessage.class);

    byte[] encodedData = Base64.getDecoder().decode(kafkaMessage.getData());

    try {

        kafkaMessage.setData(new String(encodedData, "utf-8"));

    } catch (UnsupportedEncodingException e) {

        System.out.println(e.getMessage());
        e.printStackTrace();

    }

    return kafkaMessage;
}   

}

UPDATE: 2018-02-01

53:42,802 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@ada19fa
53:42,803 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@ada19fa
53:42,805 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #0

53:43,625 kafkaSource#1 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@11b98cff
53:43,625 meta#1 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@11b98cff
53:43,626 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src1 / Frame #1

53:44,627 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@3c501af0
53:44,627 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@3c501af0
53:44,628 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #2

53:45,624 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@1dd2234b
53:45,624 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@1dd2234b
53:45,625 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #3

53:46,627 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@25700cd7
53:46,628 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@25700cd7
53:46,629 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #4

53:47,625 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@39b10238
53:47,626 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@39b10238
53:47,627 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #5

53:48,660 kafkaSource#1 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@136fd05c
53:48,660 meta#1 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@136fd05c
53:48,661 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src1 / Frame #6

53:49,629 kafkaSource#0 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@50a0864
53:49,629 meta#0 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@50a0864
53:49,630 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src0 / Frame #7

53:50,634 kafkaSource#1 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@7ef68ceb
53:50,635 meta#1 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@7ef68ceb
53:50,636 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src1 / Frame #8

53:51,632 kafkaSource#1 [172.19.0.1]:5701 [jet] [0.5.1] Output to 0: com.krios.iot.hazelcast.KafkaMessage@4c6c252c
53:51,632 meta#1 [172.19.0.1]:5701 [jet] [0.5.1] Input from 0: com.krios.iot.hazelcast.KafkaMessage@4c6c252c
53:51,633 sink#0 [172.19.0.1]:5701 [jet] [0.5.1] src1 / Frame #9

Solution

  • If you want to see which processor outputs an item and which processor receives it, also attach peekInputP to your meta vertex. In your logging configuration enable printing of the logger name (in log4j, add %c{1} to the ConversionPattern of PatternLayout).

    The logger name ends with #X, where X is the global processor index. You'll see that items from one upstream instance always go to the same downstream instance.