Search code examples
javacachingigniteapacheignite

Streaming large data from an external database to an Ignite server takes too long


My requirement is that I want to load 10 million data into the ignite server from an external database (SQL). I'm using ignite's caching functionality which stores those 10 million records into my ignite server. I've used batch processing and pagination to stream data into the server using the ignite's DataStreamer.Even before sending the first batch of data, I'm getting " Possible too long JVM pause" and even if all of the 10 million records get processed it takes about 40 minutes to load the data into the ignite server. What am I doing wrong? Is there any modification requires apart from increasing the heap? And is there any way to stream that bulk data from external DB into the cache?

My both client and server side JVM_OPTS :

-Xms4g
-Xmx4g
-XX:+AlwaysPreTouch
-XX:+UseG1GC
-XX:+ScavengeBeforeFullGC
-XX:ParallelGCThreads=8
-XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=55

All the below code are on my client.

The client side code to stream the bulk data:

     @Service
        public class Service {
        private static final Logger logger = LoggerFactory.getLogger(PosWavierService.class);
        private static final int BATCH_SIZE = 100_000; // Adjust batch size as needed
        private static final int NUM_THREADS = 6;
    
        private final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
    
        @Autowired
        private IgniteCacheService igniteCacheService;
    
        @Autowired
        private Ignite ignite;
    
        @Autowired
        private Repo productRepo;
    
    
        public CompletableFuture<Void> processAllRecords(String cacheName) {
            long startTime = System.currentTimeMillis();
    
            // List to store CompletableFuture for each thread
            List<CompletableFuture<Void>> futures = new ArrayList<>();
    
            // Submit tasks for fetching and streaming data concurrently
            AtomicInteger pageNumber = new AtomicInteger(0);
            for (int i = 0; i < NUM_THREADS; i++) {
                CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                    while (true) {
                        List<ProductLines> records = fetchDataFromRepo(pageNumber.getAndIncrement(), BATCH_SIZE);
                        if (records.isEmpty()) {
                            break;
                        }
                        igniteCacheService.streamBulkData(cacheName, records);
                        logger.info("Processed {} records for cache {}", records.size(), cacheName);
                    }
                    return null;
                });             futures.add(future);
            }
    
            CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
                    futures.toArray(new CompletableFuture[0]));
    
            combinedFuture.thenRun(() -> {
                long endTime = System.currentTimeMillis();
                long totalTime = endTime - startTime;
                logger.info("Total time taken for processing all records: {} milliseconds", totalTime);
            });
    
            return combinedFuture;
        }}

Repository to retrieve the data from table:

    @Query(value = "SELECT * FROM table WHERE key IS NOT NULL AND key != '' AND key != ' ' ", nativeQuery = true)
    Page<Object> findRecordsWithPanNotNull(Pageable pageable);

The streamBulkData function:

     public void streamBulkData(String cacheName, List<Object> records) {
        try (IgniteDataStreamer<String, Object> streamer = ignite.dataStreamer(cacheName)){
           //  FileWriter writer = new FileWriter(KEYS_FILE_PATH, true)) { // Append mode
    
            streamer.allowOverwrite(true);
            streamer.perNodeBufferSize(1024);
            streamer.perNodeParallelOperations(8);
            streamer.skipStore(true);
    
            for (Product record : records) {
                String key = record.getPan_no();
                if (key != null) {
                    streamer.addData(key, record);
                    //writer.write(key);
                } else {
                    System.err.println("Skipping record with null key: " + record);
                }
            }
            streamer.flush();
    
        } catch (CacheException e) {
            System.err.println("Error streaming data to cache: " + e.getMessage());
            e.printStackTrace();
        }
    }


Solution

  • I believe you are instantiating a streamer for each invocation of streamBulkData. I would recommend creating your streamer instance and pass a reference to it into your streamBulkData method. In doing so it can maintain connections to the hosts that it is streaming data into.

    You also did not provide any information about what you are streaming into! Cluster size (number of nodes), Off heap memory configuration, heap memory size, network capabilities, latency between client loader and server, persistence configuration, ...

    Please realize that multiple ignite hosts can ingest faster than 1, but I have no idea what you are streaming into!

    Hope that helps.