Search code examples
ignite

correct way of using IgniteDataStreamer API


I have a multi threaded application which keeps writing to ignite cache using write() method. At startup, it calls init() method which creates a cache and streamer object. Once all threads are done, flush() is called which close the streamer and write any keys which failed to insert in write() method. I have some queries.

  1. In a write() method, what should i do with IgniteFuture? Should i wait on it until it is completed?

  2. Is this future get complete when entry is written into cache or into streamer internal buffer?

  3. In a flush() method, I am writing all the entries which failed in a write() method before closing streamer object. Is this a correct way? code below:

    public void initialize(final String cacheName) {
      getOrCreateCache(cacheName, true); // create new cache or get existing if it already exists
      this.streamer = ignite.dataStreamer(cacheName);
      this.cacheName = cacheName;
    }
    
    public void write(K key, V value) {
    try {
      IgniteFuture<?> future = streamer.addData(key, value); // what to do with this future. Should i wait on that?
      numberOfEntriesWrittentIntoCache.incrementAndGet();
     } catch (IgniteInterruptedException | IgniteDataStreamerTimeoutException | CacheException | IllegalStateException e) {
       failedMap.put(key, value);
     }
    }
    
    public void flush() {
    try {
      if (streamer != null) {
        if (failedMap.size() > 0) {
          LOGGER.info("Writing " + failedMap.size() + " failed entries");
          failedMap.forEach((k, v) -> writeToGrid(k, v));
        }
      }
    } catch (IllegalStateException | CacheException | IgniteException e) {
      LOGGER.error("Exception while writing/closing ignite");
    } catch (Exception e) {
      LOGGER.error("Exception while writing/closing ignite");
    } finally {
      failedMap.clear();
      if (streamer != null) {
        streamer.close();
        streamer = null;
      }
      LOGGER.info("Number of entries written into cache are " + numberOfEntriesWrittentIntoCache.intValue());
    }
    

    }


Solution

  • Data streamer sends data to other nodes in batches. Future, returned from addData method is completed, when a batch with the provided entry is flushed into a cache.

    So, if you wait for completion of each future, returned from addData(...) method, then you may never wait till it's completed, if autoFlushFrequency is not configured and flush() or close() is not called. And even if autoFlushFrequency is configured, then each call to write() method will wait till the batch is flushed.

    I don't see anything bad in trying to write failed entries to data streamer again at the end of all processing. But I don't really know a case, when it may be useful.

    The only thing, that I would change is wrapping writeToGrid(k, v) inside the forEach into its own try-catch block. Otherwise one exception will stop processing of all failed entries.