One important update that Java 8 brings to us is the introduction of parallel streams. Other important to mention is the Arrays class that now supports parallel operations. But Is it the same thing as concurrency?

councurrencyParallelism2.png
Concurrency vs Parallelism

So, it's time to start the fifth post of the Java 8 series about the changes that you can find from version 6 to 8.

Summary


What is it?

The concurrency is when more than one processes run simultaneously and use the same resource. That race condition needs to be monitored and controlled to avoid two or more threads have access to the same state information. It assures that one thread does not modify the state while the other is performing an atomic state-dependent operation.

Thread problems

Liveness is the ability of an application to be able to execute in a timely manner. Liveness problems, then, are those in which the application becomes unresponsive.

  • Deadlock: two or more threads are blocked forever, each waiting on the other.
  • Starvation: a single thread is perpetually denied access to a shared resource or lock.
  • Livelock: two or more threads are conceptually blocked forever, although they are each still active and trying to complete their task. It's a special case of resource starvation.
  • Race Condition: two or more threads try to complete a related task at the same time.

Effects of Concurrency


How to coordinate the access?

When concurrent activities interact, some sort of coordination is required. The Synchronization (synchronized keyword) is a solution to ensure that only one thread at a time can access the code and guarantee the consistency. When a thread uses a code this thread locks the code and any other thread can use that one. Every other thread is blocked. The first thread need release the code to that be available to be accessed by other threads.

# Lock an object
synchronized(object) { /*....*/ }

# Lock a method
private synchronized void methodA() {
    System.out.print((++count)+" ");
}

# static code
public static synchronized void methodB() {
    System.out.print("Finished work");
}

Lock, ReadWriteLock and ReentrantLock

To use synchronized keyword is a limited solution. It's not possible, for example, to check if a locked object is available. Concurrent API added the Lock framework to help on that. This framework offer method to acquire and release the lock explicitly. It is used to avoid deadlock.

Lock lock = new ReentrantLock();
try {
    lock.lock();
    System.out.print((++count)+" ");
} finally {
    lock.unlock();
}

PS: All other threads will wait until this thread call the unlock.
PS: You should unlock the thread that have the lock.
If you don't do that you will get a runtime exception (IllegalMonitorStateException)

The tryLock method is an alternative to the lock because it returns a boolean result indicating whether was obtained, which avoid thread to wait indefinitely.

Lock lock = new ReentrantLock();
if(lock.tryLock()) {
    try {
       System.out.print((++count)+" ");
    } finally { lock.unlock(); }
} else {
    System.out.println("Unable to acquire lock!");
}

# Alternativaly
lock.tryLock(10, TimeUnit.SECONDS)
// Return TRUE if the lock is acquired within this 10 seconds.

The lock guarantees mutually exclusivity of an object. But sometimes only the write processes need to lock the object, making the read available to other threads. To make this possible you can use the ReadWriteLock interface with the realLock and writeLock methods. This class doesn't implement the Lock interface, but these two methods return Lock objects. In fact, ReentrantReadWriteLock class implements the ReadWriteLock interface.

private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock lockRead = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();

Atomic Package

Atomic operations are performed in a single step (package java.concurrent.atomic). This package helps to coordinate access to primitive value and the reference object. Furthermore, it offers classes such as AtomicInteger and AtomicLong that support atomic operation on single variables. Some common atomic methods are get, set, getAndSet, incrementAndGet, getAndIncrement, decrementAndGet, getAndDecrement.

# Example
private AtomicInteger count = new AtomicInteger(0);
private void incrementAndReport() {
    System.out.print(count.incrementAndGet()+" ");
}

The new methods are [JavaInAction]:

  • getAndUpdate - Atomically updates the current value with the results of applying the given function, returning the previous value.
  • updateAndGet - Atomically updates the current value with the results of applying the given function, returning the updated value.
  • getAndAccumulate - Atomically updates the current value with the results of applying the given function to the current and given values, returning the previous value.
  • accumulateAndGet - Atomically updates the current value with the results of applying the given function to the current and given values, returning the updated value.
// atomically set the minimum between an observed value of 10 and an
// existing atomic integer
int min = atomicInteger.accumulateAndGet(10, Integer::min);

[JavaInAction]: "The Java API recommends using the new classes LongAdder, LongAccumulator, Double-Adder, and DoubleAccumulator instead of the Atomic classes equivalent when multiple threads update frequently but read less frequently. These classes are designed to grow dynamically to reduce thread contention".

Concurrent Collections

If you are working in concurrent environments you can use directly the collections from the package java.util.concurrent that are already prepared for this. Beside the concurrent collections often include performance enhancements that avoid unnecessary synchronization.

Examples of concurrent interfaces available when you use Concurrency API are BlockingQueue, BlockingDeque, ConcurrentMap and NavigableMap.

# Using synchronized keyword
Map<String,Object> map = new HashMap<String,Object>();
public synchronized void put(String key, String value) {
    map.put(key, value);
}

// Alternative solution
Map<String,Object> map=new ConcurrentHashMap<String,Object>();
public void put(String key, String value) {
    map.put(key, value);
}

# More examples
Queue queue = new ConcurrentLinkedQueue<>();
queue.offer(31);
Deque deque = new ConcurrentLinkedDeque<>();
deque.offer(10);
deque.push(4);
BlockingQueue q = new LinkedBlockingQueue<>();
q.offer(3, 4, TimeUnit.SECONDS);
q.poll(10, TimeUnit.MILLISECONDS);

BlockingDeque d = new LinkedBlockingDeque<>();
d.offerFirst(5, 2, TimeUnit.MINUTES);
d.offerLast(47, 100, TimeUnit.MICROSECONDS);
d.pollFirst(200, TimeUnit.NANOSECONDS);
d.pollLast(1, TimeUnit.SECONDS);

CopyOnWrite Collections

CopyOnWriteArrayList and CopyOnWriteArraySet copy all of their elements to a new structure when an element is added, modified (change reference), or removed from the collection. CopyOnWriteArrayList is most useful when you have few updates and inserts and many concurrent reads because the CopyOnWrite classes can use a lot of memory.

Concurrent Collections by methods

It's possible to obtain synchronized versions of existing non-concurrent collection objects by methods from Collections class. Examples: synchronizedCollection, synchronizedList, synchronizedMap.

List list = Collections.synchronizedList(
        new ArrayList<>(Arrays.asList(1,6,16)));

synchronized(list) {
    for(int data: list){
        System.out.print(data+" ");
    }
}

CyclicBarrier and ForkPool

The CyclicBarrier and ForkPool are classes included in Concurrency API to help to coordinate concurrent tasks.

The CyclicBarrier's constructor you say a number of threads to wait for. When each thread finish the await method is called. When the number of calls is equal to the number informed in the constructor the threads can continue.

public void methodA(CyclicBarrier c1, CyclicBarrier c2) {
    // ...
    action1();
    c1.await();
    action2();
    c2.await();
    action3();
    // ...
}
// ...
ExecutorService service = Executors.newFixedThreadPool(4);
ClassA manager = new ClassA();
CyclicBarrier c1 = new CyclicBarrier(4);
CyclicBarrier c2 = new CyclicBarrier(4,
     () -> System.out.println("Continue!"));

for(int i=0; i<4; i++)     service.submit(() -> manager.methodA(c1,c2));
// ...

# Organized Result
1. Action 1 of all threads will be executed.
2. After that, action 2 of all threads,
3. after that will be printed "Continue"
4. and then action 3 of all threads.

If the current thread is interrupted, the await method throws an InterruptedException. If another thread is interrupted or time out a BrokenBarrierException is thrown.

The Fork/Join Framework

The ExecutorService was the solution to run asynchronous tasks, but it works best when the tasks are homogeneous and independent. The Fork/Join Framework is a new alternative that complements the concurrency API using the divide and conquer approach. It uses the work-stealing algorithm.

This framework is used to run large tasks split into small tasks and execute, using recursion,  in parallel to get results faster. The recursive process has to arrive at a base case. The small results are combined to get the final result. The split phase is the FORK and the combining phase is the JOIN.

  1. Create a ForkJoinTask: define the recursive process. The fork/join solution extending RecursiveAction class (implement the void compute method) or RecursiveTask class (implement compute method with a generic return type) which implement ForkJoinTask interface
  2. Create the ForkJoinPool: new ForkJoinPool(). It creates an instance with a number of threads equal to the number of processors of your machine. You can define the number of threads using ForkJoinPool(int parallelism) constructor. It uses the work-stealing algorithm (when a thread is free, it steals the pending work of other threads that are still busy)
  3. Start the ForkJoinTask: pool.invoke(task)
# RecursiveAction
protected class A extends RecursiveAction{
    //...
    @Override
    protected void compute(){
        if(....) { .... }
        else {
            // ...
            invokeAll(new A(...), new A(...));
        }
    }
}
// ...
ForkJoinTask<?> task = new A(...);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(task);

# RecursiveTask
protected class B extends RecursiveTask {
    //...
    @Override
    protected Double compute(){
        if(....) {return X;}
        else {
             // ...
             RecursiveTask o = new B(...);
             o.fork();
             return new B(...).compute() + o.join();
        }
   }
}
// ...
ForkJoinTask task = new B(...);
ForkJoinPool pool = new ForkJoinPool();
Double sum = pool.invoke(task);
  • This framework is useful for any task that can be solved recursively and can be computed independently (so the order doesn't matter)
  • The fork method  just add a task to the thread's queue
  • The fork method should be called before the join method.
  • The join method should be the last step. It'll block the program until the result is returned.
  • The compute method should be called before join method.

A complete code example you can see here.

Thread Basics

Threads are what make concurrency possible in java. In Java, it happens when multiple threads execute and process at the same time

Threads are units of code that can be executed at the same time. They are sometimes called lightweight processes, although, in fact, a thread is executed within a process (and every process has, at least, one thread, the main thread).

Generically, it's possible to say that thread uses directly a stack and, since objects are in heap, indirectly the threads use heap because a thread can have a reference of an object in the heap.

  1. Heap - Since the global variable is stored in the heap, the heap is shared among threads. All threads share a common heap. [1] [2]

  2. Stack - Since each thread can have its own execution sequence/code, it must have its own stack on which it might push/pop its program counter contents. So threads of the same process do not share a stack. Each thread has a private stack, which it can quickly add and remove items from. [1] [2]

Create Thread

The two-step process to execute a task using a thread is (1) defining the thread with the task and (2) start the execution. The first step can be done using the Runnable interface or subclass Thread. The second one is discouraged, so I will just show the first one example. And the second step is executed using the start method.

# Definition of Runnable Interface
@FunctionalInterface
public interface Runnable {
    void run();
}

# Traditional way
public class A implements Runnable {
    public void run() {        
       System.out.println("Running");    
    }
}
Thread thread = new Thread(new A());
thread.start();

# Using lambda
Thread thread = new Thread(() -> {
    System.out.println("Running");
});
thread.start();

PS: it might not start immediately
PS: Example from OCP8 - Threads

Executor

This interface introduces the replaces of the use of the Thread by Executor.

Runnable r = ...
//Thread t = new Thread(r);
//t.start();
Executor e = ...
e.execute(r);

ExecutorService

This interface comes in Concurrency API to help to create and manage the threads: get an instance and send to be processed. That instance you can get using the Executors factory introduced by Concurrency API.

ExecutorService service = null;
try {
   service = Executors.newSingleThreadExecutor();
   System.out.println("begining");
   service.execute(() -> System.out.println("A")); // Task 1
   service.execute(() -> System.out.println("B")); // Task 2
   System.out.println("end");
} finally {
   if(service != null) service.shutdown();
}

# Possible output
begining
A
end
B

PS: The threads will order their result
PS: The single thread executor will queue the tasks
PS: Each task execute after the previous task completes
PS: Task 2 always will execute after Task 1
PS: The shutdown() will not stop tasks that are running
PS: To really stop all threads you have to use shutdownNow()

# void execute(Runnable command)

The ExecutorService interface extends Executor interface. The execute method comes from that interface. A similar method is the submit() that also complete tasks asynchronously, but unlike execute, return a Future object. The Future can show the state of the task with the isDone, isCancelled, cancel and get methods.

The ExecutorService also have more two methods. The invokeAll and invokeAny methods take a collection object containing a list of tasks and execute the tasks synchronously. The first one doesn't stop until all tasks are completed. The second one will wait until at least one task complete. It's possible to inform a timeout to the execution.

//...
service = Executors.newSingleThreadExecutor();
Future<?> result = service.submit(() -> {
         for(int i=0; i<500; i++) CheckResults.counter++; });
result.get(10, TimeUnit.SECONDS);
//...

PS: After 10 seconds, throwing a TimeoutException if the task is not done.

This interface extends Executor to provide more features, like the submit method that accepts Runnable and Callable objects and allows them to return a value.

Callable

It is another important interface (a functional interface) that can be an alternative to the Runnable interface retrieving more detail after the task is complete. It returns a value and throws a checked exception.

// ...
service = Executors.newSingleThreadExecutor();
Future result = service.submit(() -> 30+11);
System.out.println(result.get());
// ...

PS: The submit is a overload method from ExecutorService to take Callable Object

If it is necessary, it is possible to schedule the execution.

ScheduledExecutorService service = Executors
       .newSingleThreadScheduledExecutor();
Runnable task1 = () -> System.out.println("A");
Callable task2 = () -> "B";
Future<?> result1 = service.schedule(task1, 3, TimeUnit.SECONDS);
Future<?> result2 = service.schedule(task2, 5, TimeUnit.MINUTES);

Thread pools

Executors provide factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory and Callable classes to act on a pool of threads. The thread pool cannot be created directly by the interfaces. The tasks in the pool can be executed concurrently.

A thread pool is a group of pre-instantiated reusable threads that are available to perform a set of arbitrary tasks.

ScheduledExecutorService service = Executors
       .newScheduledThreadPool(10);
service.scheduleAtFixedRate(command,3,1,TimeUnit.MINUTE);
  • newCachedThreadPool
  • newFixedThreadPool
  • newScheduledThreadPool

Tip

If you need to generate a random number in a thread-safe manner without performance degradation you should use ThreadLocalRandom class. Using java.util.Random and java.lang.Math.danrom is also thread-safe, but they can have poor performance.

Conclusion

Concurrency API helps a lot the work to control the shared resource. That API brings good things to help us in sequential and parallel execution. You need to pay attention to the right scenarios to use this resource and use the best of the benefits.

More examples of concurrency you can see in my GittHub.

Related Posts

Reference