I am creating a batch data streamer in apache ignite, and need to control what happening after data receive. My batch has a structure:
public class Batch implements Binarylizable, Serializable {
private String eventKey;
private byte[] bytes;
etc..
Then i trying to stream my data:
try (IgniteDataStreamer<Integer, Batch> streamer = serviceGrid.getIgnite().dataStreamer(cacheName);
StreamBatcher batcher = StreamBatcherFactory.create(event) ){
streamer.receiver(StreamTransformer.from(new BatchDataProcessor(event)));
streamer.autoFlushFrequency(1000);
streamer.allowOverwrite(true);
statusService.updateStatus(event.getKey(), StatusType.EXECUTING);
int counter = 0;
Batch batch = null;
IgniteFuture<?> future = null;
while ((batch = batcher.batch()) != null) {
future = streamer.addData(counter++, batch);
}
Object getted = future.get();
Just for test use lets get only the last future, and try to analyze this object. In the code above I'm using BatchDataProcessor, that look like this:
public class BatchDataProcessor implements CacheEntryProcessor<Integer, Batch, Object> {
private final Event event;
private final String eventKey;
public BatchDataProcessor(Event event) {
this.event = event;
this.eventKey = event.getKey();
}
@Override
public Object process(MutableEntry<Integer, Batch> mutableEntry, Object... objects) throws EntryProcessorException {
Node node = NodeIgniter.node(Ignition.localIgnite().cluster().localNode().id());
ServiceGridContainer container = (ServiceGridContainer) node.getEnvironmentContainer().getContainerObject(ServiceGridContainer.class);
ProcessMarshaller marshaller = (ProcessMarshaller) container.getService(ProcessMarshaller.class);
LocalProcess localProcess = marshaller.intoProccessing(event.getLambdaExecutionKey());
try {
localProcess.addBatch(mutableEntry);
} catch (IOException e) {
e.printStackTrace();
} finally {
return new String("111");
}
}
}
So after localProcess.addBatch(mutableEntry) I want to send back an information about the status of this particular batch, so I think that I should do this in IgniteFuture object, but I don't find any information how to control the future object that's received in addData function.
Can anybody help with understanding, where can I control future that receives in addData function or some other way to realize a callback to streamed batch?
When you do StreamTransformer.from()
, you forfeit the result of your BatchDataProcessor
, because
for (Map.Entry<K, V> entry : entries)
cache.invoke(entry.getKey(), this, entry.getValue());
// ^ result of cache.invoke() is discarded here
DataStreamer
is for one-directional streaming of data. It is not supposed to return values as far as I know.
If you depend on the result of cache.invoke()
, I recommend calling it directly instead of relying on DataStreamer
.
BTW, be careful with fut.get()
. You should do dataStreamer.flush()
first, or DataStreamer
's futures will wait indefinitely.