Search code examples
futureignitedata-stream

apache ignite datastreamer how to set data into ignitefuture?


I am creating a batch data streamer in apache ignite, and need to control what happening after data receive. My batch has a structure:

public class Batch implements Binarylizable, Serializable {

    private String eventKey;
    private byte[] bytes;

    etc..

Then i trying to stream my data:

 try (IgniteDataStreamer<Integer, Batch> streamer = serviceGrid.getIgnite().dataStreamer(cacheName);
             StreamBatcher batcher = StreamBatcherFactory.create(event) ){
            streamer.receiver(StreamTransformer.from(new BatchDataProcessor(event)));
            streamer.autoFlushFrequency(1000);
            streamer.allowOverwrite(true);
            statusService.updateStatus(event.getKey(), StatusType.EXECUTING);
            int counter = 0;
            Batch batch = null;
            IgniteFuture<?> future = null;
            while ((batch = batcher.batch()) != null) {
                future = streamer.addData(counter++, batch);
            }
            Object getted = future.get();

Just for test use lets get only the last future, and try to analyze this object. In the code above I'm using BatchDataProcessor, that look like this:

public class BatchDataProcessor implements CacheEntryProcessor<Integer, Batch, Object> {

    private final Event event;
    private final String eventKey;

    public BatchDataProcessor(Event event) {
        this.event = event;
        this.eventKey = event.getKey();
    }

    @Override
    public Object process(MutableEntry<Integer, Batch> mutableEntry, Object... objects) throws EntryProcessorException {
        Node node = NodeIgniter.node(Ignition.localIgnite().cluster().localNode().id());
        ServiceGridContainer container = (ServiceGridContainer) node.getEnvironmentContainer().getContainerObject(ServiceGridContainer.class);
        ProcessMarshaller marshaller = (ProcessMarshaller) container.getService(ProcessMarshaller.class);
        LocalProcess localProcess = marshaller.intoProccessing(event.getLambdaExecutionKey());
        try {
            localProcess.addBatch(mutableEntry);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            return new String("111");
        }
    }
}

So after localProcess.addBatch(mutableEntry) I want to send back an information about the status of this particular batch, so I think that I should do this in IgniteFuture object, but I don't find any information how to control the future object that's received in addData function.

Can anybody help with understanding, where can I control future that receives in addData function or some other way to realize a callback to streamed batch?


Solution

  • When you do StreamTransformer.from(), you forfeit the result of your BatchDataProcessor, because

    for (Map.Entry<K, V> entry : entries)
        cache.invoke(entry.getKey(), this, entry.getValue());
    //  ^ result of cache.invoke() is discarded here
    

    DataStreamer is for one-directional streaming of data. It is not supposed to return values as far as I know.

    If you depend on the result of cache.invoke(), I recommend calling it directly instead of relying on DataStreamer.

    BTW, be careful with fut.get(). You should do dataStreamer.flush() first, or DataStreamer's futures will wait indefinitely.