You must be familiar with “data streams” if you have written code in any modern programming language. As the name suggests, data streams are continuous, ordered sequences of incoming data in various formats, and could vary in speed and frequency.
They are often generated in real-time from various sources. They represent a flow of information rather than static datasets, allowing for immediate processing and analysis.
Most data streams mandate real time processing to get tangible useful results, as there is no time to collect batches and perform processing later.
Earlier versions of Java had “Collectors” that helped in summarizing, grouping, partitioning and accumulating data.
A Collector defines four functions:
- Supplier: Creates a new mutable result container, or a data collection.
- Accumulator: Adds an element to the mutable result container.
- Combiner: Merges two mutable result containers when processing in parallel.
- Finisher (optional): Performs a final transformation on the accumulated result, and then pipes it to a downstream processor.
With the passage of time, Java developers started demanding new and sophisticated processes/transformations to be added to Collectors. As these demands were diverse and often specific, Java decided to introduce the concept of “Gatherers”, which provides the basic infrastructure on which custom transformations can be programmed and executed. It’s something like providing a “rail and communication” framework to develop custom business processes on.
The Stream Gatherers Feature
The Stream Gatherers feature, introduced as a preview in Java 24 (JEP 461), provides a flexible way to transform streams using custom intermediate operations, extending the expressiveness of the Stream API.
It offers significant improvements in efficiency and resource optimization, particularly for data processing pipelines. It achieves this by enabling custom intermediate operations, which allow for more expressive and concise stream transformations compared to the built-in operations. This can lead to reduced boilerplate code and improved execution speed, especially in scenarios like big data processing and real-time analytics, where efficient data collection is crucial.
We will demonstrate below how this API provides better performance, especially when data size and data volume increase.
Some significant features of the Stream Gatherers API are:-
- Customizable Intermediate Operations: The Stream Gatherers API allows developers to define their own intermediate operations, tailoring the stream processing logic to specific needs.
- Improved Scalability: Stream Gatherers can be particularly beneficial for large-scale data processing, enabling more efficient handling of data streams and potentially reducing memory consumption.
- Parallel Processing: The API is designed to work well with parallel streams, allowing for efficient utilization of multi-core processors for faster data processing.
- Stateful Operations: The Gatherers API allows for stateful operations within a stream, which can be important in scenarios where you need to maintain context or accumulate data across multiple elements.
So, Stream Gatherers enhance the Stream API by allowing the creation of custom intermediate operations, and this way it enables more flexible and expressive data processing pipelines. It supports incoming data from various data streams; the streams can be diverse in nature and speed of inflowing data.
Stream Gatherers: An Example
The code below will show how to best manage and streamline such situations, and how it helps in modern data analytics.
We take a list of integers with integers in increasing order of magnitude, then perform a simple stateful transformation (i * 2), followed by basic filtering (% 5 == 0), and then batching into groups of 100. Even in this simple operation, you will see how gatherers outperform collectors in efficiency and resource conservation.
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.*;
import java.util.stream.Gatherers;
public class ExtendedGathererBenchmark {
static class MyBatchingGatherer {
public static <T> Gatherer<Integer, List<Integer>, List<Integer>> create(int batchSize) {
return Gatherer.ofSequential(
ArrayList::new, // The initializer, the type of state used by this gatherer
(list, item, downstream) -> { //The "integrator" or "processor" function, "list" - Input element used, "item" - the
//type of input elements this gatherer consumes.
int transformed = (Integer) item * 2;
if (transformed % 5 == 0) {
list.add( (Integer)transformed);
if (list.size() == batchSize) {
downstream.push(new ArrayList<>(list));
list.clear();
}
}
return true;
},
(list, downstream) -> { //the finisher function for the new gatherer, transforms the intermediate result and
//pushes them to the provided Downstream
if (!list.isEmpty()) {
downstream.push(new ArrayList<>(list));
}
}
);
}
}
public static void main(String[] args) {
int[] dataSizes = {10, 100, 1000, 10_000, 50_000, 100_000, 500_000, 1_000_000, 10_000_000, 100_000_000};
for (int size : dataSizes) {
System.out.println("\n--- Data Size: " + size + " ---");
System.gc(); // Try to clear memory
pause();
long memBeforeGatherer = usedMemory();
long gathererTime = runWithGatherer(size, false);
long memAfterGatherer = usedMemory();
System.gc();
pause();
long memBeforeCollector = usedMemory();
long collectorTime = runWithCollector(size);
long memAfterCollector = usedMemory();
System.gc();
pause();
long gathererParallelTime = runWithGatherer(size, true);
System.out.printf("Gatherer Time: %d ms, Memory: %d KB\n", gathererTime, (memAfterGatherer - memBeforeGatherer)/1024);
System.out.printf("Collector Time: %d ms, Memory: %d KB\n", collectorTime, (memAfterCollector - memBeforeCollector)/1024);
System.out.printf("Parallel Gatherer Time: %d ms\n", gathererParallelTime);
}
runWithDistinctAdjacent();
runWithSlidingWindow();
}
static long runWithGatherer(int size, boolean parallel) {
long start = System.nanoTime();
Stream<Integer> stream = Stream.iterate(0, i -> i + 1).limit(size);
if (parallel) stream = stream.parallel();
stream.gather(MyBatchingGatherer.create(100))
.forEach(batch -> {
int ignored = batch.size();
});
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}
static long runWithCollector(int size) {
long start = System.nanoTime();
List<List<Integer>> batches = Stream.iterate(0, i -> i + 1).limit(size)
.map(i -> i * 2)
.filter(x -> x % 5 == 0)
.collect(Collectors.collectingAndThen(
Collectors.toList(),
list -> {
List<List<Integer>> out = new ArrayList<>();
for (int i = 0; i < list.size(); i += 100) {
out.add(list.subList(i, Math.min(i + 100, list.size())));
}
return out;
}
));
for (List<Integer> batch : batches) {
int ignored = batch.size();
}
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}
static void runWithDistinctAdjacent() {
System.out.println("\n--- Distinct Adjacent Demo ---");
List<String> list = List.of("a", "a", "b", "b", "b", "c", "a", "a");
list.stream().gather(Gatherers.windowSliding(4))
.forEach(System.out::println); // Output: a, b, c, a
}
static void runWithSlidingWindow() {
System.out.println("\n--- Sliding Window Demo ---");
Stream.iterate(1, i -> i + 1).limit(10)
.gather(Gatherers.windowSliding(3))
.forEach(window -> System.out.println(window));
// Output: [1,2,3], [2,3,4], ..., [8,9,10]
}
static long usedMemory() {
Runtime runtime = Runtime.getRuntime();
return runtime.totalMemory() - runtime.freeMemory();
}
static void pause() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
To compile the above code, you will have to use the “enable preview” feature:-
javac --enable-preview --target 24 ExtendedGathererBenchmark.java
When I ran the code in my local machine, I got the following results:-

You can clearly see that as the data size ( here, size of the integer number) grows, the time taken by the “Collector” method increases a lot, while the “Gatherer” methods are getting much quicker. Moreover, “Gatherer” methods consume less JVM memory and are CPU efficient too.
The graph below demonstrates the performance gains clearly.

The data stream used was very small: the numbers in it were a few thousands in magnitude only. Still, “Gatherer” methods performed much better and faster.
You can imagine the performance gains you would experience when the data streams are huge and the number to be processed are bigger in magnitude.
To confirm, the above program was run using the “Collector-only” function and then with “Gatherer-only” function ( i.e. not both of them together), and GC logs were evaluated.
You can see our GCeasy reports below:-
1. “Collector-only” – when only the runWithCollector() function was executed, the JVM memory size went up to 2.86GB, the throughput was 70.743% only, the average pause GC time and max pause GC time were quite high: 159ms and 280ms respectively.

Fig: Garbage Collection Statistics for Collector-only Sample Program
Source:- https://gceasy.io/diamondgc-report.jsp?oTxnId_value=e601722f-4c3b-49d7-85cb-d33b6d91e9c2
2. “Gatherer-only” – when only the runWithGatherer() function was executed, the JVM memory size was 192.62MB only, the throughput went up to 98.857%, the average pause GC time and max pause GC time were quite low: 4.58ms and 10ms respectively.

Fig: Garbage Collection Statistics for Gatherer-only Sample Program
Source:https://gceasy.io/diamondgc-report.jsp?oTxnId_value=861a7c7f-3b4e-4ec4-9e25-9f5c1ab07f5c
You can clearly see that though the operations are memory intensive, “Gatherers” perform much better and use lesser computing resources.
Conclusion
Stream Gatherers provide substantial performance and memory gains for:
- High-throughput stream processing
- Complex transformations with batching or filtering
- Real-time
- Near-real-time pipelines.
