Search code examples
ignite

Ignite dataStreamer is not working


I am exploring Ignite dataStreamer with the following code. But the output is:

For MessageKey0001, the output all displays data is null. For MessageKey0003, the output also all displays data is null For MessageKey0002, the output shows nothing, looks that receiver code is not run

When I change

dataStreamer.addData(i, "data-" + i);

to

    IgniteFuture future = dataStreamer.addData(i, "data-" + i);
    future.get();

The future.get() doesn't return, looks addData doesn't finish anyway? I am not sure where the problem is, can someone take a look? Thanks!

package ignite.streamer;

import org.apache.ignite.*;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.stream.StreamReceiver;

import javax.cache.Cache;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;

class IgniteDataStreamer_Person implements Serializable {
    @QuerySqlField(index = true)
    private String name;

    @QuerySqlField(index = true)
    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

public class IgniteDataStreamerTest {
    public static void main(String[] args) {
        String configPath = "D:/Software/apache-ignite-fabric-1.7.0-bin/apache-ignite-fabric-1.7.0-bin/config/default-config.xml";
        Ignite ignite = Ignition.start(configPath);
        CacheConfiguration<Integer, String> cfg = new CacheConfiguration<Integer, String>();
        String cacheName = "stream_cache";
        cfg.setName(cacheName);
        cfg.setIndexedTypes(Integer.class, IgniteDataStreamer_Person.class);
        Cache cache = ignite.getOrCreateCache(cfg);

        IgniteDataStreamer<Integer, String> dataStreamer = ignite.dataStreamer(cacheName);

        for (int i = 0; i < 3; i++) {
            dataStreamer.addData(i, "data-" + i);
        }

        //null is got from cache
        for (int i = 0; i < 3; i++) {
            System.out.println(String.format("0001: data is %s ", cache.get(i)));
        }

        dataStreamer.receiver(new StreamReceiver<Integer, String>() {
            public void receive(IgniteCache<Integer, String> cache, Collection<Map.Entry<Integer, String>> entries) throws IgniteException {
                //nothing is printed to console
                for (Map.Entry<Integer, String> entry : entries) {
                    System.out.println(String.format("0002: key is: %s, value is: %s", entry.getKey(), entry.getValue()));
                }
            }
        });

        //null is got from cache
        for (int i = 0; i < 3; i++) {
            System.out.println(String.format("0003: data is %s ", cache.get(i)));
        }
        ignite.close();
    }
}

Solution

  • DataStreamer uses batches in order to provide good performance. You should flush data in your case (use flush() method) before block on future.get() method.

    Please, see IgniteDataStreamer javadocs for details.