A bit of theory
If you’ve played with java a little bit, you can agree with me that this code would take at least 6 seconds to complete.
public static void main(String[] args) {
int[] ids = new int[]{1, 2, 3};
for (int id : ids) {
doFakeNetworkCall();
}
}
static void doFakeNetworkCall() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
This code waits for an item to finish its sleep
before going for the next one. This is a common problem when were talking about slow I/O, but we can solve this.
First, let’s understand how our applications interact with the system.
What our programs are in the OS?
In the Operating System (OS), a program (or application) runs inside an isolated and independent structure called process. This allows the OS to securely allocate CPU and memory to each application.
Remember the problem I showed before? One solution for this is called forking, which creates a full copy of a process. Therefore, each copy could process an item from the list. Another solution is threads, it copies only a piece of the process, being way lighter.
Okay, but what is concurrency?
Concurrency is about managing multiple tasks that appear to run simultaneously, even if they are actually executed sequentially in small time slices. One thread can run while another is waiting for a response
Parallelism happens when multiple threads are in different CPU cores.
Hands-on Java Concurrency API!
Picture I took of Deshmuk book about java 17 OCP certification
Although you don’t manage OS threads directly in Java, you can use Platform threads, which are built on top of OS threads, so you’re still doing the work with copying processes, isolating your memory etc
Remember the public static void main
method java forces us to create? Well, we also have a Main thread! And It’s the one you’re in when not creating other threads.
Creating Platform threads
You can create a thread in Java just like that:
Thread thread = new Thread(() -> System.out.println("Hello, World! I'm inside a Thread"));
thread.
start(); // now it will run :)
You can loop through a small list and create a thread for each item:
public static void main(String[] args) {
int[] ids = new int[]{1, 2, 3};
for (int id : ids) {
new Thread(() -> {
doFakeNetworkCall(id);
}).start();
}
}
static void doFakeNetworkCall(int id) {
try {
System.out.println("Starting " + Thread.currentThread().getName());
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " finished");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
If you run it, you’re going to notice that instead of taking 6 seconds it takes only 2 seconds! Remember why? It’s because one thread is running while the other is “blocked”.
Daemon VS non-daemon threads
Daemon threads are background threads that do not prevent the JVM from exiting when the program finishes. Non-daemon threads, on the other hand, are user threads that keep the JVM running until they complete their execution.
You can set a thread as a daemon by calling setDaemon(true)
before starting it.
public static void main(String[] args) {
Thread daemonThread = new Thread(() -> {
while (true) {
System.out.println("Daemon thread is running");
}
});
daemonThread.setDaemon(true);
daemonThread.start();
System.out.println("Main thread is finishing");
}
See? The infinite loop in the daemon thread will not prevent the JVM from exiting after the main method finishes. However, if you set daemon to false (which is the default value), the JVM will wait for the thread to finish before exiting, so… in this example it will never finish
Virtual Threads
Since threads are a piece of a process, they are not cheap. They consume memory and CPU resources and can lead to some problems. For example, if you create too many threads, the OS may not be able to handle them all, leading to performance issues or even crashes.
In Java 19, a new feature called Project Loom was introduced, which allows you to create lightweight threads called Virtual Threads. They are lighter, so is easier to create and manage them. They are designed to be more efficient than traditional threads.
For some reason that is not documented they are daemon threads by default, and you can’t change it.
public static void main(String[] args) {
Thread thread = Thread.ofVirtual();
thread.start(() -> System.out.println("Hello, World! I'm inside a Virtual Thread"));
// Since all virtual threads are daemon, JVM will not wait. Therefore, this is necessary.
thread.join(); // It's like we're saying "hey wait for this thread to finish!";
}
Executors
Managing threads gets ugly after a while if we’re doing everything manually, so Java provides a higher-level API called the Executor framework. It allows you to manage a pool of threads and submit tasks to be executed by those threads. (A Thread pool is a collection of threads that can be reused).
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3); // This method receives the number of threads in the pool
int[] ids = new int[]{1, 2, 3};
for (int id : ids) {
executor.submit(() -> doFakeNetworkCall(id));
}
executor.close();
}
Completable Future
CompletableFuture
is a class that represents a future result of an asynchronous computation, letting you think in a more functional & asynchronous way.
It allows you to write non-blocking code and chain multiple asynchronous operations together. Kinda like promises in JS.
import java.util.concurrent.CompletableFuture;
public static void main(String[] args) {
CompletableFuture
.supplyAsync(() -> "Hello") // Add a supplier to run in a separate thread
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World!")) // Chaining with another CompletableFuture
.thenApply(s -> s + " from CompletableFuture!") // Transforming the result
.thenAccept(System.out::println) // Consuming the final result
.exceptionally(e -> { // Handling exceptions
System.out.println("Error: " + e.getMessage());
return null;
})
.join(); // Wait for the completion of the CompletableFuture
}
When you use a completable future without defining a thread pool, it will use the common ForkJoinPool, which is a thread pool that is shared by all threads in the application. This means that if you have multiple CompletableFutures running at the same time, they will all share the same pool of threads, which can lead to contention and performance issues. To avoid this, you can create your own thread pool and pass it to the CompletableFuture.
Also: Don’t mistake it with the Future class. CompletableFuture is a more powerful and flexible alternative to Future.
import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
Future<String> future = executor.submit(() -> {
Thread.sleep(2000);
return "Hello, World!";
});
try {
String result = future.get(); // This blocks the thread until the result is available
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
Locks and Synchronization Mechanisms
Well… with great power comes great responsibility. When multiple threads access shared resources, you need to ensure that only one thread can access the resource.
Because things can get UGLY if you don’t do that.
Let’s make a multi-threaded counter:
class Counter {
private int count = 0;
public void increment() {
count++;
}
public int getCount() {
return count;
}
}
public class Main {
public static void main(String[] args) {
Counter counter = new Counter();
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executor.submit(counter::increment);
}
executor.close();
System.out.println(counter.getCount());
}
}
If you run this code you’re going to see that the output is not always 100. Sometimes it is 99, 98, or even less. This is what we call race conditions, which occur when multiple threads access shared data and try to change it at the same time.
To fix this, we can add synchronization to the increment
method:
class Counter {
private int count = 0;
public synchronized void increment() { // This will lock the method for other threads
count++;
}
public int getCount() {
return count;
}
}
If you’re familiar with multi-threading, this is like a Mutex lock.
It ensures that only one thread can execute the increment
method at a time, while other threads are blocked until the current thread finishes executing the method.
It’s also important to know that each threads can have only one lock at the same time.
Synchronized Blocks are Reentrant
When a thread acquires a lock (such as running a synchronized method), it gets a monitor object. Think of it as a key to a door; only one thread can hold the key at a time, so only that thread can enter the room (the synchronized block)
Something being reentrant means that if a thread already holds the lock, it can enter other synchronized blocks that are locked with the same monitor object. Therefore, the thread doesn’t need to wait to get the lock again.
public class ReentrantExample {
private int count = 0;
public synchronized void increment() {
count++;
if (count < 5) {
increment(); // Reentrant call, will not wait for the lock to be released
}
}
}
Manual Locks with java.util.concurrent.locks
Locks implementations offers a more specific and flexible approach than synchronized methods or blocks.
We have methods such as:
tryLock()
- Tries to acquire the lock without blockinglockInterruptibly()
- Acquires the lock but can be interruptedlock()
- Acquires the lock and blocks until it’s availableunlock()
- Releases the lock
There is also the concept of fairness.
Fairness
Until now, I didn’t show a way to guarantee the sequence in which threads trying to unlock the lock will be able to do. If we have many threads, the last one to acquire the lock may be the first one to unlock it, while other thread may never get their access granted.
ReentrantLock
This one is, obviously, reentrant (go up to understand what this means).
Has 2 constructors:
ReentrantLock()
// Creates an instance of ReentrantLock.
ReentrantLock(boolean fair)
// Creates an instance of ReentrantLock with the given fairness policy
When you send true
to the constructor, it will guarantee that the longest waiting thread will be the first one to acquire the lock. Otherwise, no order is guaranteed.
Programs using fair locks may be less efficient (because the implementation has to check a queue of waiting threads) but it can be a worthwhile trade-off in some cases.
Always benchmark it! :)
import java.util.concurrent.locks.ReentrantLock;
public static void main(String[] args) {
try (ExecutorService executor = Executors.newFixedThreadPool(10)) { // This is called try-with-resources
ReentrantLock lock = new ReentrantLock(true); // Fair lock
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
lock.lock(); // Acquire the lock
try {
// Critical section
System.out.println("Thread " + Thread.currentThread().getName() + " is in the critical section");
} finally {
lock.unlock(); // Release the lock
}
});
}
}
}
There is also the concept of Conditions, where you can wait for signals to keep the thread going. It’s really nice.
class SimpleProducerConsumer {
private static final int MAX_CAPACITY = 1;
private int buffer = -1;
private boolean isEmpty = true;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public void produce(int value) throws InterruptedException {
lock.lock();
try {
while (!isEmpty) {
notFull.await();
}
buffer = value;
isEmpty = false;
System.out.println("Produced: " + value);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (isEmpty) {
notEmpty.await();
}
int value = buffer;
buffer = -1;
isEmpty = true;
System.out.println("Consumed: " + value);
notFull.signal();
return value;
} finally {
lock.unlock();
}
}
}
ReentrantReadWriteLock
It has 2 locks: one for reading and one for writing.
And it also has the same fairness policy as the Reentrant.
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // Fair lock
private static final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private static final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private static final List<String> sharedData = new ArrayList<>();
public static void main(String[] args) {
// Initialize shared data
sharedData.add("Initial Data");
// Executor service to manage threads
ExecutorService executor = Executors.newFixedThreadPool(10);
// Create and submit reader tasks
for (int i = 0; i < 5; i++) {
executor.submit(new WriterTask("New Data " + i));
executor.submit(new ReaderTask());
}
executor.shutdown();
}
static class ReaderTask implements Runnable {
@Override
public void run() {
readLock.lock(); // Acquire the read lock
try {
Thread.sleep(1000);
System.out.println("Thread " + Thread.currentThread().getName() + " is reading: " + sharedData);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock(); // Release the read lock
}
}
}
static class WriterTask implements Runnable {
private final String newData;
public WriterTask(String newData) {
this.newData = newData;
}
@Override
public void run() {
writeLock.lock(); // Acquire the write lock
try {
Thread.sleep(1000);
System.out.println("Thread " + Thread.currentThread().getName() + " is writing: " + newData);
sharedData.add(newData);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock(); // Release the write lock
}
}
}
The difference on this one, is that when a write lock is locked it locks read and writes until the write operation is completed. That’s why its useful only if you have more reads than writes.
StampedLock
This one is an alternative to the ReadWriteLock, the difference is that it’s not reentrant and it allows you to use opmistic locking for read operations.
*ok what optimistic locking means?
Optimistic Locking
The thread proceeds with its operations without locking at the beginning. (yeah, it’s optimistic because it assumes no other threads will interfere with its operations). Then, before commiting the changes, it checks if other thread has made conflicting changes. Therefore, it’s suitable for read-heavy workloads where write conflicts are not frequent, with no blocking threads things are more efficient :)
Pessimistic Locking
The thread acquires an exclusive lock at the beginning of its operations, so any other thread will wait for the lock to be unlocked.
Synchronizers for Coordination
Just more different ways to lock threads, making all of them synchronized :)
CountDownLatch
This one lets you wait for certain tasks to finish with a count-like approach. It’s useful when you’re using daemon threads.
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(8);
for (int i = 0; i < 8; i++) {
Thread.ofVirtual().name("Virtual Thread-" + i).start(() -> {
System.out.println(Thread.currentThread().getName() + " is doing some work...");
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + " finished work.");
});
}
try {
countDownLatch.await(); // Waiting :)
System.out.println("countDownLatch is zero!!!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
CyclicBarrier
Does quite the same as CountDownLatch with the difference that it can be reused.
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("3 threads have reached the barrier...");
});
for (int i = 0; i < 9; i++) {
Thread thread = new Thread(() -> {
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
thread.start();
}
}
Since I’m creating 9 threads, it will run 3 times the function I sent to the cyclic barrier.
Semaphore
It lets only a specified amount of threads to lock at the same time.
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 9; i++) {
Thread thread = new Thread(() -> {
try {
System.out.println("Thread " + Thread.currentThread().getName() + " is trying to acquire the semaphore");
semaphore.acquire();
System.out.println("Thread " + Thread.currentThread().getName() + " acquired the semaphore");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
thread.start();
}
}
Phaser
This one is a bit more complex than the others. It allows you to manage multiple phases of execution, where each phase can have a different number of parties (threads) involved.
public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 3 parties
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " is starting phase 1");
phaser.arriveAndAwaitAdvance(); // Wait for all parties to arrive at this phase
System.out.println(Thread.currentThread().getName() + " is starting phase 2");
phaser.arriveAndAwaitAdvance(); // Wait for all parties to arrive at this phase
});
thread.start();
}
phaser.arriveAndDeregister(); // Main thread deregisters from the phaser
}
Atomic Variables and Concurrent Data Structures
AtomicInteger, AtomicLong, AtomicReference
Although you can use the volatile keyword to make every thread see the latest value of a variable, is not enough to make it atomic. Atomic variables are designed to be thread-safe and provide many operations without the need for explicit synchronization.
import java.util.concurrent.atomic.AtomicInteger;
import java.math.random.Random;
public static void main(String[] args) {
AtomicInteger atomicCounter = new AtomicInteger(0);
try (ExecutorService executor = Executors.newFixedThreadPool(10)) {
for (int i = 0; i < 100; i++) {
executor.submit(() -> atomicCounter.updateAndGet(operand -> operand + 3));
}
}
System.out.println("Final value of atomicCounter: " + atomicCounter.get());
}
You can use AtomicReference for custom classes.
static class FizzBuzz {
int value;
public FizzBuzz(int value) {
this.value = value;
}
}
public static void main(String[] args) {
AtomicReference<FizzBuzz> fizzBuzz = new AtomicReference<>(new FizzBuzz(0));
try (ExecutorService executor = Executors.newFixedThreadPool(5)) {
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
fizzBuzz.updateAndGet(f -> {
int newValue = f.value + 3;
return new FizzBuzz(newValue);
});
});
}
}
System.out.println("Final FizzBuzz Value: " + fizzBuzz.get().value);
}
ConcurrentHashMap and synchronizedMap
ConcurrentHashMap is a thread-safe implementation of the Map interface, having atomic operations and allowing concurrent access.
public static void main(String[] args) {
List<String> keys = List.of("foo", "bar", "baz");
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>() {{
for (String key : keys) {
put(key, 0);
}
}};
try (ExecutorService executorService = Executors.newFixedThreadPool(10)) {
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
for (int j = 0; j < 100; j++) {
String key = keys.get(j % keys.size());
map.compute(key, (k, v) -> {
if (v == null) {
return 1;
} else {
return v + 1;
}
});
}
});
}
} finally {
System.out.println("final map " + map);
}
}
synchronizedMap is a wrapper around a regular Map that synchronizes access to it, but it can be less efficient than ConcurrentHashMap because it locks the entire map for each operation.
import java.util.HashMap;
public static void main(String[] args) {
Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());
// is like doing:
HashMap<String, Integer> map = new HashMap<>();
synchronized (map) {
//...
}
}
CopyOnWriteArrayList, CopyOnWriteArraySet
These are thread-safe variants of respectively ArrayList and ArraySet, in which all mutative operations (add, set, etc) are making a fresh copy of the underlying array.
It’s useful when you don’t want to synchronize traversals, helping your threads to not get a ConcurrentModificationException
. However, it can be very expensive if you do a lot of modifications, so it’s suitable for situations where reads operations vastly outnumbers writes.
CopyOnWriteArraySet<Integer> numbers = new CopyOnWriteArraySet<Integer>();
CopyOnWriteArrayList<Integer> numbers = new CopyOnWriteArrayList<Integer>();
BlockingQueue implementations
These are thread-safe queues, having blocking operations. For example:
- put(element) -> Inserts the specified element into the queue, blocking if its full until space is available.
- take() -> Retrieves the head of the queue, blocking if its empty until an element is available Some classes implement this interface:
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
LinkedBlockingQueue<Integer> queue2 = new LinkedBlockingQueue<>(10);
PriorityBlockingQueue<Integer> queue3 = new PriorityBlockingQueue<>(10);
Fork/Join Framework
ForkJoin is an executor service implementation that helps you take advantage of multiple processors, making possible for you to not only apply concurrency but also parallelism.
ForkJoinPool executor = new ForkJoinPool();
This pool follows a work-stealling approach, where all the threads in the pool attempt to find and execute all tasks submitted (with even stealing tasks from other threads). Therefore, ForkJoinPool is really useful for tasks than can be divided in smaller subtasks.
Here is an example summing a large array of numbers:
class SumTask extends RecursiveTask<Integer> {
int[] array;
int start, end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= 10) { // Case where the task is small enough to process
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// Since it's a large task, we split it into two smaller tasks
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork();
return right.compute() + left.join();
}
}
}
public class Main {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
int[] array = new int[1000000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
SumTask task = new SumTask(array, 0, array.length);
int sum = pool.invoke(task);
System.out.println("Sum: " + sum);
}
}
Streams and parallelism
In streams, intermediate operations are lazy and do not execute until a terminal operation is invoked. These Intermediate operations also runs all after the other in a single pass, and the documentation itself tells which operations are intermediate and which are terminal.
List <String> names = List.of("Alice", "Bob", "Charlie", "David", "Eve");
names.stream()
.filter(name -> {
System.out.println("Filtering: " + name);
return name.length() > 3;
}) // Intermediate operation
.map(s -> {
System.out.println("Mapping: " + s);
return s.toUpperCase();
}) // Intermediate operation
.forEach(System.out::println); // Terminal operation
It’s like all of these operations run on a single loop. Nice O(n) :);
Parallel streams
If your task is heavy on the CPU and you have a lot of data, it might be interesting to use some cores in these loops. Gladly, the usage with streams are straightforward.
List<String> names = List.of("Alice", "Bob", "Charlie", "David", "Eve");
names.parallelStream() // Now it's a parallel stream :)
.filter(name -> name.length() > 3)
.map(String::toUpperCase)
.forEach(x -> {
try {
Thread.sleep(1000); // Simulate a long-running operation
System.out.println(x);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
People on stackoverflow says that the streams implementation uses ForkJoinPool. Honestly, I tried to investigate and the code looked like greek for me so I gave up. But ForkJoinPool would be a great guess.
Next Steps
This is just a cheatsheet with simpler examples, the documentations sometimes can be a bit bad with simplification. Now, my plan is to study a bit of java.io and then build a multi-threading thing involving blocking I/O.