Search code examples
javamultithreadingexecutorservicefiberquasar

Quasar Fiber equivalent of Java's ThreadPoolExecutor?


I've been curious about Quasar and its light weight Fibers as a replacement for Threads. After consulting their API docs, I have not been able to figure out how to go about converting a typical ThreadPoolExecutor into a pool of Fibers.

int maxThreadPoolSize = 10;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        maxThreadPoolSize,
        maxThreadPoolSize,
        10, TimeUnit.MINUTES,
        new ArrayBlockingQueue<Runnable>(maxThreadPoolSize),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.CallerRunsPolicy()
);

for (int i = 0; i < 100; i++) {
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // run some code
        }
    });
}

The above code creates a pool with 10 threads, a queue in front of the pool that can hold 10 elements and a rejection policy (when queue is full) to have main thread execute a Runnable task itself. As the for loop creates 100 runnables, they will be executed 10 at a time in the pool, 10 queued up, and main thread picks up a Runnable itself until others are finished, after which main thread goes back to adding Runnables to executor.

How would you do this with Quasar's Fibers? Is it meant to be used as such in the first place?


EDIT: My original question was poorly phrased. Essentially I was trying to find a mechanism to limit how many Fibers can run concurrently. For example, do not launch more Fibers if there is already 200 Fibers running. If max number of Fibers are running, wait until one finishes before launching a new one.


Solution

  • java.util.concurrent.Semaphore ended up working well in my particular setup.

    General gist of my solution:

    • create Semaphore with desired max number of permits (aka max concurrent Fibers)
    • main thread is in charge of picking up tasks to process from a queue
    • main thread calls semaphore.acquire():
      • if a permit is available, then launch new Fiber to process task
      • if all permits are taken, then semaphore will block main thread and wait until a permit becomes available
    • once Fiber is launched, main thread repeats its logic. Picks up a new task from queue and attempts to launch a new Fiber.

    Bonus: standard Java's Semaphore is fixed and number of permits can not be dynamically adjusted. To make it dynamic this link came in handy: http://blog.teamlazerbeez.com/2009/04/20/javas-semaphore-resizing/