Digital Garden
Computer Science
Concurrent & Parallel Programming
The Executor Framework

The Executor Framework

The Java executor framework is used to run and manage Runnable objects, so-called Tasks. It does this using so-called workers or worker threads which are most often managed as part of a ThreadPool. Depending on the configuration of the pool instead of creating new threads every time the so-called channel will try and reuse already created threads. Any excess tasks flowing into the channel that the threads in the pool can't handle at the minute are held in some form of data structure like a BlockingQueue. Once one of the threads has finished its task and gets free, it picks up the next task from the channel.

The Executor interface provides a single function void execute(Runnable task) which executes the given task and depending on the implementation will do this using a thread pool or a single thread etc.

Custom Executors

This is just a custom executor that uses a thread pool.

class MyThreadPoolExecutor implements Executor {
    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
 
    public void execute(Runnable r) { queue.offer(r); }
 
    public MyThreadPoolExecutor(int nrThreads) {
        for (int i = 0; i < nrThreads; i++) { activate(); }
    }
 
    private void activate() {
        new Thread(() -> {
            try {
                while (true) { queue.take().run(); }
            } catch (InterruptedException e) { /* die */ }
        }).start();
    }
}

You can also create an executor that just executes the given task on the current thread.

class DirectExecutor implements Executor {
    public void execute(Runnable r) { r.run(); }
}

Or you can create an executor that creates a new thread for each task.

class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}

Builtin Executors

The executor framework has some built-in executors that you can access using the factory methods in the Executors class. All the factories return instances of the ExecutorService interface which extends the Executor interface and adds some life-cycle management methods.

interface ExecutorService extends Executor {
    void shutdown(); // kind, finish all pending tasks, don't accept new ones
    List<Runnable> shutdownNow(); // all running tasks are interrupted, a list of the tasks that were awaiting execution
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // blocks until all tasks completed execution after a shutdown request
}
  • Executors.newFixedThreadPool(int nThreads): Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. Threads that die due to an exception are replaced.
  • Executors.newCachedThreadPool(): Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
  • Executors.newSingleThreadScheduledExecutor(): Creates an Executor that uses a single worker thread operating off an unbounded queue. The worker thread is replaced if it dies due to an exception.
  • Executors.newScheduledThreadPool(int corePoolSize): Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.

Callable and Future

Because the runnable interface does not allow for exceptions or results we need to use something different if we wish to have this functionality. The executor framework has a few tools for this. We have the Callable interface which is our alternative for the Runnable interface and then the Future interface which is similiar to a promise in JavaScript and represents a future result of a task.

interface Callable<V> {
    V call() throws Exception;
}
interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException, CancellationException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException;
}

Instead of then using the execute function from the Executor interface, we have a few additional functions in the ExecutorService interface along with life-cycle methods.

interface ExecutorService extends Executor {
    // ...lifecycle methods
    <T> Future<T> submit(Callable<T> task); // the key function
    Future<?> submit(Runnable task);
    <T> Future<T> submit(Runnable task, T result);
    // takes a list of tasks and returns a list of the matching results
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    // Executes the given tasks, returning the result of one that has completed successfully if any do.
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
}

FactorialCalculator example

In this example, each task will return the factorial of a given number.

public class Main {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
 
        List<Future<Long>> resultList = new ArrayList<>();
 
        for (long i = 1; i <= 20; i++) {
            Future<Long> result = executor.submit(new FactorialCalculator(i));
            resultList.add(result);
        }
 
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
 
        for (int i = 0; i < resultList.size(); i++) {
            Future<Long> result = resultList.get(i);
            Long number = null;
            number = result.get(); // waits for next result
            System.out.println(i+"::\t"+number);
        }
 
        executor.shutdown();
    }
 
    private static class FactorialCalculator implements Callable<Long> {
        private final Long number;
 
        public FactorialCalculator(Long number) {
            this.number = number;
        }
 
        @Override
        public Long call() throws Exception {
            long result = 1;
            if (number == 0 || number == 1) {
                result = 1;
            } else {
                for (int i = 2; i <= number; i++) {
                    result *= i;
                }
            }
            return result;
        }
    }
}

Fork-Join

If we try and implement a divide and conquer algorithm like the merge-sort with executors we run into some issues, especially because we create a lot of threads that are waiting and not doing anything.

public class MergeSortTask implements Runnable {
    public final int[] elems, temp;
    private final int start, end;
 
    private final ExecutorService ex;
 
    public MergeSortTask(int[] elems, ExecutorService ex) {
        this.elems = elems;
        this.start = 0;
        this.end = elems.length;
        this.temp = new int[end];
        this.ex = ex;
    }
 
    public MergeSortTask(int[] elems, int[] temp, int start, int end, ExecutorService es) {
        this.elems = elems;
        this.temp = temp;
        this.start = start;
        this.end = end;
        this.ex = es;
    }
 
    @Override
    public void run() {
        if (end - start <= 1) {
            return;
        } else {
            int mid = (start + end) / 2;
 
            MergeSortTask left = new MergeSortTask(elems, temp, start, mid, ex);
            MergeSortTask right = new MergeSortTask(elems, temp, mid, end, ex);
 
            Future<?> lf = ex.submit(left);
            Future<?> rf = ex.submit(right);
            try {
                //print("Waiting for subtasks");
                lf.get();
                rf.get();
                //print("Subtasks are ready");
            } catch (Exception e) {
            }
            merge(elems, temp, start, mid, end);
        }
    }
 
    private static void merge(int[] elem, int[] tmp, int leftPos, int rightPos, int rightEnd) {
        if (elem[rightPos - 1] <= elem[rightPos]) return;
 
        int leftEnd = rightPos;
        int tmpPos = leftPos;
        int numElements = rightEnd - leftPos;
 
        while (leftPos < leftEnd && rightPos < rightEnd)
            if (elem[leftPos] <= elem[rightPos])
                tmp[tmpPos++] = elem[leftPos++];
            else
                tmp[tmpPos++] = elem[rightPos++];
 
        while (leftPos < leftEnd)
            tmp[tmpPos++] = elem[leftPos++];
 
        while (rightPos < rightEnd)
            tmp[tmpPos++] = elem[rightPos++];
 
        rightEnd--;
        for (int i = 0; i < numElements; i++, rightEnd--)
            elem[rightEnd] = tmp[rightEnd];
    }
 
    private static int[] randomInts(int n) {
        int[] l = new int[n];
        Random rnd = new Random();
 
        for (int i = 0; i < l.length; i++) {
            l[i] = rnd.nextInt(1000);
        }
        return l;
    }
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int SIZE = 4;
        int[] data = randomInts(SIZE);
 
        System.out.println("Unsorted: " + Arrays.toString(data));
 
        ExecutorService es = Executors.newCachedThreadPool();
 
        MergeSortTask ms = new MergeSortTask(data, es);
        Future<?> f = es.submit(ms);
        f.get();
 
        es.shutdownNow();
        System.out.println("Sorted: " + Arrays.toString(data));
    }
}

Instead it is better to do the work sequential after a certain threshold has been reached, this is the so-called sequential threshold.

...
public void run() {
    if(r – l <= 1000) Arrays.sort(is); return;
    else {
         int mid = (start + end) / 2;
 
        MergeSortTask left = new MergeSortTask(elems, temp, start, mid, ex);
        MergeSortTask right = new MergeSortTask(elems, temp, mid, end, ex);
...

Fork-Join Framework

For this reason, there is the fork-join framework which supports the methodology of forking work and then joining it together at the end. The Framework create a limited number of worker threads according to the CPU. Then each worker thread maintains a private double-ended work queue. When forking a worker pushes the new task to the head of its queue. When the worker is waiting or idle it pops a task off the head of its queue and executes it instead of sleeping. If a worker's queue is empty, it steals a task off the tail of another randomly chosen worker.

// RecursiveAction has no result; RecursiveTask<V> returns Result V
public class ForkJoinMergeSort extends RecursiveAction {
    public final int[] is, tmp;
    private final int l, r;
 
    public ForkJoinMergeSort(int[] is, int[] tmp, int l, int r) {
        this.is = is; this.tmp = tmp; this.l = l; this.r = r;
    }
 
    protected void compute() {
        if (r - l<= 100000) Arrays.sort(is, l, r);
        else {
            int mid = (l + r) / 2;
            ForkJoinMergeSort left = new ForkJoinMergeSort(is, tmp, l, mid);
            ForkJoinMergeSort right = new ForkJoinMergeSort(is, tmp, mid, r);
            left.fork();
            right.invoke();
            left.join();
            merge(is, tmp, l, mid, r);
        }
    }
    private void merge(int[ ] es, int[ ] tmp, int l, int m, int r) { ... }
 
    private static int[] randomInts(int n) { ... }
 
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int SIZE = 4;
        int[] data = randomInts(SIZE);
        int[] tmp = new int[data.length];
 
        System.out.println("Unsorted: " + Arrays.toString(data));
 
        ForkJoinPool fjPool = new ForkJoinPool();
        ForkJoinMergeSort ms = new ForkJoinMergeSort(data,tmp,0,data.length);
        fjPool.invoke(ms);
        fjPool.shutdown();
        System.out.println("Sorted: " + Arrays.toString(data));
    }
}