Structured Concurrency in Java 23: Improved Reliability and Thread Management

Structured Concurrency is a preview feature of Java 23, which aims to streamline and structure multithreading in Java. It treats multiple tasks running in different threads as a single unit of work, thereby creating conceptual clarity and ease of implementing the business logic of your program. It helps improve reliability,  enhances observability, and streamlines error handling and cancellation. 

Structured concurrency was previously incubated in JDK 19. In Java 21, there was a significant change in that the StructuredTaskScope::fork (…) method returns a [Subtask] rather than a Future: more on this shortly. It has an API to coordinate, orchestrate and observe the  threads, delivering even more reliable and maintainable concurrent code.    


Fig: Structured Task Scope

Source:https://docs.oracle.com/en/java/javase/21/core/img/basic-usage-structuredtaskscope-class.png

Core to the idea of structured concurrency is the StructuredTaskScope class in the java.util.concurrent package. It enables you to coordinate a group of concurrent subtasks as a unit. You can fork many subtasks, where each subtask runs in a thread of its own, and later these subtasks can be “joined” as a single unit.  Thus, StructuredTaskScope ensures that each of the subtasks is completed before the main task completes.  In other words, the task scope has to wait for all subtasks to complete, and then it can further process the results of these subtasks.  

Let’s illustrate this with two code samples: one prior to structured concurrency and one which uses it. 

Prior to Structured Concurrency, a group of threads representing subtasks of a bigger task  were handled using the java.util.concurrent.ExecutorService API

This Executor Service allows creation of an ExecutorService by thread A. Another thread B could submit work to it, and yet another thread C could be waiting for thread B. Additionally, some code in a totally unrelated thread D could JOIN any of these with just a reference to this Future

So, any code with a reference to a Future can join it (i.e., await its result by calling get()), even code in a thread other than the one which obtained the Future

In other words, if a subTask1 is created by Task1, then subTask1 need not return to Task1 (the task that submitted it), and it can return to any of a number of other tasks — or even none. 

E.g. 

package org.example ;

import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;

public class UnstructuredBankingExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(4); // Use Executors.newVirtualThreadPool(nn) for many virtual threads

        Future<Double> balance1Future = executor.submit(() -> fetchAccountBalance("Account-1"));
// A Future, representing the result of an asynchronous computation, is returned by the executor, indicating that the results may or may not be realised. No clear Subtask is returned.
        Future<Double> balance2Future = executor.submit(() -> fetchAccountBalance("Account-2"));

Future<Boolean> doSomething = executor.submit(() -> someOperation("Starter String"));
// Other unrelated threads can submit work to this executor, no clear task and subtask relationship. 

        try {
            double balance1 = balance1Future.get(); //Blocking call, result retrieved only when the get() computation has completed
            double balance2 = balance2Future.get();
final double balance1f= balance1;
final double balance2f= balance2;

            System.out.println("Initial Balances:");
            System.out.println("Account-1: $" + balance1);
            System.out.println("Account-2: $" + balance2);

            double transferAmount = 1000.0;
            Future<Boolean> transferFuture = executor.submit(() -> transferFundsEligibility(balance1f, balance2f, transferAmount));

// To JOIN subtasks as a single unit of work, one has to manually check if each subtask has performed as desired

            if (transferFuture.get()) { // Blocking call
                balance1 -= transferAmount;
                balance2 += transferAmount;
                System.out.println("Transfer Successful! New Balances:");
            } else {
                System.out.println("Transfer Failed! Insufficient funds.");
            }

            System.out.println("Account-1: $" + balance1);
            System.out.println("Account-2: $" + balance2);

        } catch (InterruptedException | ExecutionException e) {
    balance1Future.cancel(true); // Manual cancellation in case of an error, the thread executing this task should be interrupted to stop the task.
balance2Future.cancel(true); // Manual cancellation in case of an error
            System.err.println("Error: " + e.getMessage());
        } finally {
            executor.shutdown(); // Requires manual shutdown to allow reclamation of its resources.
        }
    }

    private static double fetchAccountBalance(String accountId) throws InterruptedException {
        System.out.println("Fetching balance for " + accountId);
        TimeUnit.SECONDS.sleep(2); // Simulate API call delay
        return 2000.0 + (Math.random() * 3000); // Simulated balance
    }

    private static boolean transferFundsEligibility(double balance1, double balance2, double amount) throws InterruptedException {
        System.out.println("Processing transfer of $" + amount + " from Account-1 to Account-2...");
        TimeUnit.SECONDS.sleep(1); // Simulate processing delay
        return balance1 >= amount; // Only transfer if Account-1 has enough funds
    }

private static boolean someOperation( String init ) throws InterruptedException {
        System.out.println("Processing some Operation ...");
        TimeUnit.SECONDS.sleep(1); // Simulate processing delay
        return true ; // random boolean returned
    }

}

As you can see above, an ExecutorService should be manually shut down to reclaim its resources. ExecutorService ensures that on termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted

Most of the thread orchestration has to be done by the programmer, who is responsible for proper shutting down of threads.

While all this gives power and flexibility into the hands of the programmer, it’s also a dangerous proposition which could lead to deadlocks, resource leaks and lack of readability/extensibility.  

Problems With Old-style Pre-19 Concurrency 

  • Thread dumps/Observability tools show related threads in different call stacks though they are logically/programmatically dependent on each other (part of the same bigger task). 
  • There is no well defined structure to implementing concurrency. Different developers may implement it differently, thus making the code error-prone, and difficult to trace and  debug. A developer is free to logically structure the task-subtask relationships in his own style. 
  • Future does not offer a way to wait for a task that has been cancelled.  
  • Keeping track of inter-task relationships, and manually restoring equilibrium in case of errors/failure, is a significant burden on developers. 
  • There are no constraints on creation, relating and ordering of threads; and these unrestricted patterns of concurrency could lead to confusion and a whole lot of bugs and/or  resource wastage. 

What We Would Rather Expect From the Executor Service: 

  • There should be a logically-structured clear relationship between tasks and subtasks, ideally relating in a tree-like structure and being dependent on each other for completion of a task. 
  • Clear boundaries and demarcations between subtasks of two different tasks, i.e. no random intermingling between subtasks of different parent tasks. 
  • There should be an enforceable relationship between tasks and subtasks, i.e. when subtasks are submitted and joined in the same parent task, the failure of one subtask should  automatically cause the cancellation of other subtasks.  
  • There should also be the flexibility to either consider a task complete only when all its subtasks complete OR even if one of them completes, or consider it failed if one or more subtasks fail. 
  • It should reduce the risk of resource leaks and unhandled errors. 

In Java 23, we have the StructuredTaskScope class to implement structured concurrency.  
Here, the StructuredTaskScope forms a tree structure, and parent-child relationship is established when a thread started in a task scope opens its own task scope. If task scope “A” opens task scope “B” , then task scope “A” is the parent of task scope “B”. Further nesting of parent-child relationship trees is supported. 

Let’s see how things work here:

package org.example ;

import java.util.concurrent.*;
import java.util.function.*;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;

public class StructuredBankingExample {
    public static void main(String[] args) throws InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            StructuredTaskScope.Subtask<Double> balance1Task = scope.fork(new StructuredTaskScope.Subtask<Double>() {
                public Double get() throws InterruptedException {
                    return fetchAccountBalance("Account-1");
                }
});

            StructuredTaskScope.Subtask<Double> balance2Task = scope.fork(new StructuredTaskScope.Subtask<Double>() {
                public Double get() throws InterruptedException {
                    return fetchAccountBalance("Account-2");
                }
            });

            scope.join(); //Wait for all subtasks to complete, ONLY threads started with fork() can join(), no unrelated threads can join()
            scope.throwIfFailed(); // Propagate any failure

            double balance1 = balance1Task.get(); // Processing after join() confirms that all subtasks have finished
            double balance2 = balance2Task.get();

            System.out.println("Initial Balances:");
            System.out.println("Account-1: $" + balance1);
            System.out.println("Account-2: $" + balance2);

            double transferAmount = 1000.0;
            StructuredTaskScope.Subtask<Boolean> transferTask = scope.fork(new StructuredTaskScope.Subtask<Boolean>() {
                public Boolean get() throws InterruptedException {
                    return transferFundsEligibility(balance1, balance2, transferAmount);
                }
            });

            scope.join();
            scope.throwIfFailed();

            if (transferTask.get()) { // Processing after join() confirms that all subtasks have finished
                balance1 -= transferAmount;
                balance2 += transferAmount;
                System.out.println("Transfer Successful! New Balances:");
            } else {
                System.out.println("Transfer Failed! Insufficient funds.");
            }

            System.out.println("Account-1: $" + balance1);
            System.out.println("Account-2: $" + balance2);
        } // StructuredTaskScope is automatically shut down here, at the end of the TRY block
    }

    private static double fetchAccountBalance(String accountId) throws InterruptedExceptio n {
        System.out.println("Fetching balance for " + accountId);
        Thread.sleep(2000); // Simulate API call delay
        return 2000.0 + (Math.random() * 3000); // Simulated balance
    }

    private static boolean transferFundsEligibility(double balance1, double balance2, double amount) throws InterruptedException {
        System.out.println("Processing transfer of $" + amount + " from Account-1 to Account-2...");
        Thread.sleep(1000); // Simulate processing delay
        return balance1 >= amount; // Only transfer if Account-1 has enough funds
    }
}

To compile, you have to use the “–enable-preview”‘ flag, with the target release number, e.g. 

>javac --release 23  --enable-preview  StructuredBankingExample.java 

To run: 

java  --enable-preview  org.example.StructuredBankingExample

Here, to ensure correct usage and enforce semantics, only the thread that opened/created the task scope can call join/close methods, and close throws an exception if join was never called after fork. 

For specific use cases, StructuredTaskScope has a subclass named ShutdownOnSuccess to implement a policy that shuts down a task scope as soon as at least one  subtask completes successfully, and a complementary subclass ShutdownOnFailure to shut down a task scope if any one (or more) subtask fails/throws an exception. So, you can easily implement a policy that doesn’t require all subtasks to successfully finish or fail.  

Conclusion

You can see how the StructuredTaskScope class makes things conceptually clearer and enforces a certain structure that improves correctness and readability. 

To monitor what’s actually happening inside the JVM with this new feature, you can use our comprehensive tool, YC_CRASH 360* Script. This tool gives a holistic view of how the Java programs are running on the JVM . It gives stats like: 

vmstat/thread dump/memory dump/netstat etc. etc. 

Thanks for your time. 

One thought on “Structured Concurrency in Java 23: Improved Reliability and Thread Management

Add yours

Share your Thoughts!

Up ↑

Index

Discover more from yCrash

Subscribe now to keep reading and get access to the full archive.

Continue reading