Search code examples
javaconcurrencyrunnableexecutorservicejava.util.concurrent

Java ExecutorService Runnable doesn't update value


I'm using Java to download HTML contents of websites whose URLs are stored in a database. I'd like to put their HTML into database, too. I'm using Jsoup for this purpose:

public String downloadHTML(String byLink) {
        String htmlInPage = "";
        try {
            Document doc = Jsoup.connect(byLink).get();
            htmlInPage = doc.html();
        } catch (org.jsoup.UnsupportedMimeTypeException e) {
            // process this and some other exceptions
        } 
        return htmlInPage;
    }

I'd like to download websites concurrently and use this function:

public void downloadURL(int websiteId, String url, 
                        String categoryName, ExecutorService executorService) {
   executorService.submit((Runnable) () -> {
       String htmlInPage = downloadHTML(url);
       System.out.println("Category: " + categoryName + " " + websiteId + " " + url);
       String insertQuery = 
              "INSERT INTO html_data (website_id, html_contents)  VALUES (?,?)";
       dbUtils.query(insertQuery, websiteId, htmlInPage);   
   });
}

dbUtils is my class based on Apache Commons DbUtils. Details are here: http://pastebin.com/iAKXchbQ
And I'm using everything mentioned above in a such way: (List<Object[]> details are explained on pastebin, too)

public static void main(String[] args) {
        DbUtils dbUtils = new DbUtils("host", "db", "driver", "user", "pass");
        List<String> categoriesList = 
                     Arrays.asList("weapons", "planes", "cooking", "manga");
        String sql = "SELECT lw.id, lw.website_url, category_name " +
                "FROM list_of_websites AS lw JOIN list_of_categories AS lc " +
                "ON lw.category_id = lc.id " +
                "where category_name = ? ";

        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (String category : categoriesList) {
            List<Object[]> sitesInCategory = dbUtils.select(sql, category );
            for (Object[] entry : sitesInCategory) {
                int websiteId = (int) entry[0];
                String url = (String) entry[1];
                String categoryName = (String) entry[2];
                downloadURL(websiteId, url, categoryName, executorService);
            }
        }
        executorService.shutdown();
}

I'm not sure if this solution is correct but it works. Now I want to modify code to save HTML not from all websites in my database, but only their fixed ammount in each category.
For example, download and save HTML of 50 websites from the "weapons" category, 50 from "planes", etc. I don't think it's necessary to use sql for this purpose: if we select 50 sites per category, it doesn't mean we save them all, because of possibly incorrect syntax and connection problems.
I've tryed to create separate class implementing Runnable with fields: counter and maxWebsitesPerCategory, but these variables aren't updated. Another idea was to create field Map<String,Integer> sitesInCategory instead of counter, put each category as a key there and increment its value until it reaches maxWebsitesPerCategory, but it didn't work, too. Please, help me!
P.S: I'll also be grateful for any recommendations connected with my realization of concurrent downloading (I haven't worked with concurrency in Java before and this is my first attempt)


Solution

  • How about this?

    for (String category : categoriesList) {
            dbUtils.select(sql, category).stream()
                .limit(50)
                .forEach(entry -> {
                    int websiteId = (int) entry[0];
                    String url = (String) entry[1];
                    String categoryName = (String) entry[2];
                    downloadURL(websiteId, url, categoryName, executorService);
                });
        }
    

    sitesInCategory has been replaced with a stream of at most 50 elements, then your code is run on each entry.

    EDIT

    In regard to comments. I've gone ahead and restructured a bit, you can modify/implement the content of the methods I've suggested.

    public void werk(Queue<Object[]> q, ExecutorService executorService) {
        executorService.submit(() -> {
            try {
                Object[] o = q.remove();
                try {
                    String html = downloadHTML(o); // this takes one of your object arrays and returns the text of an html page
    
                    insertIntoDB(html); // this is the code in the latter half of your downloadURL method
                }catch (/*narrow exception type indicating download failure*/Exception e) {
                    werk(q, executorService);
                }
            }catch (NoSuchElementException e) {}
        });
    }
    

    ^^^ This method does most of the work.

    for (String category : categoriesList) {
        Queue<Object[]> q = new ConcurrentLinkedQueue<>(dbUtils.select(sql, category));
        IntStream.range(0, 50).forEach(i -> werk(q, executorService));
    }
    

    ^^^ this is the for loop in your main

    Now each category tries to download 50 pages, upon failure of downloading a page it moves on and tries to download another page. In this way, you will either download 50 pages or have attempted to download all pages in the category.