Search code examples
javahazelcast-jet

How to get a simple DAG to work in Hazelcast Jet?


While working on my DAG in hazelcast Jet, I stumbled into a weird problem. To check for the error I dumbed down my approach completely and: it seems that the edges are not working according to the tutorial.

The code below is almost as simple as it gets. Two vertices (one source, one sink), one edge.

The source is reading from a map, the sink should put into a map.

The data.addEntryListener correctly tells me that the map is filled with 100 lists (each with 25 objects at 400 byte) by another application ... and then nothing. The map fills up, but the dag doesn't interact with it at all.

Any idea where to look for the problem?

package be.andersch.clusterbench;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.config.Config;
import com.hazelcast.config.SerializerConfig;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.jet.*;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.stream.IStreamMap;
import com.hazelcast.map.listener.EntryAddedListener;
import be.andersch.anotherpackage.myObject;

import java.util.List;
import java.util.concurrent.ExecutionException;

import static com.hazelcast.jet.Edge.between;
import static com.hazelcast.jet.Processors.*;

/**
 * Created by abernard on 24.03.2017.
 */
public class Analyzer {
    private static final ObjectMapper mapper = new ObjectMapper();
    private static JetInstance jet;
    private static final IStreamMap<Long, List<String>> data;
    private static final IStreamMap<Long, List<String>> testmap;

    static {
        JetConfig config = new JetConfig();
        Config hazelConfig = config.getHazelcastConfig();
        hazelConfig.getGroupConfig().setName( "name" ).setPassword( "password" );
        hazelConfig.getNetworkConfig().getInterfaces().setEnabled( true ).addInterface( "my_IP_range_here" );
        hazelConfig.getSerializationConfig().getSerializerConfigs().add(
                new SerializerConfig().
                        setTypeClass(myObject.class).
                        setImplementation(new OsamKryoSerializer()));
        jet = Jet.newJetInstance(config);
        data = jet.getMap("data");
        testmap = jet.getMap("testmap");
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        DAG dag = new DAG();
        Vertex source = dag.newVertex("source", readMap("data"));
        Vertex test = dag.newVertex("test", writeMap("testmap"));

        dag.edge(between(source, test));

        jet.newJob(dag).execute()get();

        data.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> {
            System.out.println("Got data: " + entryEvent.getKey() + " at " + System.currentTimeMillis() + ", Size: " + jet.getHazelcastInstance().getMap("data").size());
        }, true);

        testmap.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> {
            System.out.println("Got test: " + entryEvent.getKey() + " at " + System.currentTimeMillis());
        }, true);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> Jet.shutdownAll()));
    }
}

Solution

  • The Jet job is already finished at the line jet.newJob(dag).execute().get(), before you even created the entry listeners. This means that the job runs on an empty map. Maybe your confusion is about the nature of this job: it's a batch job, not an infinite stream processing one. Jet version 0.3 does not yet support infinite stream processing.