Let's start the fourth post of the Java 8 series about the changes that you can find from version 6 to 8. Now we will talk about the new way to manipulate collections, the streams.

Summary


What is Stream

The streams are an important implementation in Java 8. Before JDK 8 the collections were manipulated through iterators (for or while loops), based on the imperative programming paradigm where explicitly sad how the algorithm should works. To help that, java implements the API Stream (package java.util.stream) where stream operations do the iteration behind the scenes for us. It will help the programmer does not worry about how the iteration must take place. In other words, you let the Streams API manage how to process the data.

Other important difference between stream and collections is that a collection is in memory (all the values) and stream is a conceptually fixed data structure whose elements are computed on demand. Consequently, stream is a lazily constructed collection and collection is eagerly constructed.

A stream in Java is a sequence of data. A stream pipeline is the operations that run on a stream to produce a result. [1]

Reference: OCP8 Stream


Some important features are:

  • Streams don't store its elements
  • Streams are immutable
  • Streams are not reusable
  • Streams don't support indexed access to their elements
  • Streams are easily parallelizable
  • Stream operations are lazy when possible (values are computed when they are solicited by a consumer)
  • It is not possible for the programmer to create his own operators.

The use of streams usually return a new stream and it makes possible the use of chained operation. The complete line from stream to finish the process and give a result is the pipeline. So, formally, the pipeline has three parts:

  • Source: where the stream is created
  • Intermediate Operations: operations that can be connected to form the pipeline and where result in another stream.
  • Terminal Operation: the operation that closes a stream and where you have the result (no more streams).

Reference: Harnessing the Power of Java 8 Streams


The OCP Certified Study Guide gives us a great example to understand how this works.

List list = Arrays.asList("Toby", "Anna", "Leroy", "Alex");
list.stream()
   .filter(n -> n.length() == 4)
   .sorted().limit(2)
   .forEach(System.out::println);

stream [list] -> intermediate [filter, sorted, limit] -> terminal [forEach]

Java will figure out how to best implement the stream pipeline. To that, limit will warn when the number of elements is ok and sorted will execute when it has all elements to sort only once. So:

  1. Each element is (a) sent from stream to filter, (b) the filter verifies the length of the element, then (c) if the length is not ok the element is out of the assembly line process, if the length is ok the element is (d) sent to sorted.
  2. When every element was being processed, the sorted execute.
  3. After that, sorted send the elements, one by one to limit. The limit verifies if the quantity of elements is ok.
  4. If it's ok the limit send one by one element to forEach.
  5. When all elements were processed so java stops the line, and no more processing occurs in the pipeline.

Create a stream

The interface to use streams is java.util.stream.Stream, and the related primitives specialization are IntStreamLongStream and DoubleStream.

# Collections
Stream stream = Arrays.asList(new String[]{"test"}).stream();

# Individual
values Stream stream = Stream.of("test", "test2");

# Arrays
Stream stream = Stream.of({"test", "test2"});

Integer[] nums = {1, 2, 3, 4, 5}; Stream.of(nums); # Five element

# Arrays of primitives
int[] nums = {1, 2, 3, 4, 5};

Stream.of(nums); //One element //Stream<int[]> //One object (int[])
Arrays.stream(nums).count();  // Five Elements
IntStream.of(nums).count(); // Five Elements

# static  
Stream generate(Supplier s)
Stream s = Stream.generate(()->Math.random()).limit(5);
Stream s = Stream.generate(Math::random).limit(5);

# static  
Stream iterate(T seed, UnaryOperator f)
Stream s = Stream.iterate(1, t -> t * 2).limit(5);

#void accept(T t) 
Stream.Builder add(T t)

# IntStream and LongStream
static IntStream range(int startInclusive, int endExclusive)
static IntStream rangeClosed(int startInclusive, int endInclusive)
static LongStream range(long startInclusive, long endExclusive)
static LongStream rangeClosed(long startInclusive, long endInclusive)  
IntStream s1 = new Random().ints(5, 1, 10);  

# Infinite
Stream.generate(Math::random).forEach(System.out::println);
Stream.iterate(1, n -> n + 2).forEach(System.out::println);

# Primitive IntStream: Used for the primitive types int, short, byte, and char
#LongStream: Used for the primitive type long
# DoubleStream: Used for the primitive types double and float

Intermediate Operations

The intermediate operations return streams. It makes possible chain operations, similar to the builder pattern. They are considered lazy because they don't need process the elements until a terminal operation is invoked. It happens because the intermediate operator can usually be merged or optimized by a terminal operation.

Some operations, such as limit, can be called short-circuit because of no necessity to process all the elements of the stream.

  • Stateless Operations: the elements are processed independently each other - no state. Examples: filter, flatMap, map, peek.
  • Stateful Operation: it retains the state - next process depends on the previous element. Examples: distinct, limit, skip, sorted.
# filter
Stream s = Stream.of("monkey", "gorilla", "bonobo");
s.filter(x -> x.startsWith("m")).forEach(System.out::print);
// monkey

# distinct
Stream s = Stream.of("duck", "duck", "duck", "goose");
s.distinct().forEach(System.out::print); // duckgoose

# limit # skip
Stream s = Stream.iterate(1, n -> n + 1);
s.skip(5).limit(2).forEach(System.out::print); // 67

# map: it creates a one-to-one mapping from the elements in
the stream to the ele- ments of the next step in the stream
Stream s = Stream.of("monkey", "gorilla", "bonobo");
s.map(String::length).forEach(System.out::print); // 676

# flatMap: list of a list
List zero = Arrays.asList();
List one = Arrays.asList("Bonobo");
List two = Arrays.asList("Mama Gorilla", "Baby Gorilla");
Stream animals = Stream.of(zero, one, two);
animals.flatMap(l -> l.stream()).forEach(System.out::println);

// Bonobo
// Mama Gorilla
// Baby Gorilla

# sorted
Stream s = Stream.of("brown-", "bear-");
s.sorted().forEach(System.out::print);
// bear-brown-

# peek: it is our final intermediate operation
Stream stream = Stream.of("black bear", "brown bear", "grizzly");
long count = stream.filter(s -> s.startsWith("g"))
      .peek(System.out::println).count();  // grizzly
System.out.println(count); // 1

PS: Examples copied from OCP8

Remember that an intermediate operation doing nothing until a terminal operation gets started. Then, in the example bellows nothing happens.

List list = Arrays.asList(1,2,3,4);
list.stream.filter( i-> {
    Systemout.print(i);
    return i%2==0;
});

Terminal Operator

It's possible to use terminal operator without any intermediate operations. It does not return a stream, then, when it is finished, the stream pipeline cannot be used anymore. However, it's possible to have only one terminal operation in a stream pipeline.

# findFirst, findAny (return Optional)
Stream s = Stream.of("monkey", "gorilla", "bonobo");
s.findAny().ifPresent(System.out::println); // monkey

# anyMatch, noneMatch, allMatch (return boolean)
List list = Arrays.asList("monkey", "2", "chimp");
Predicate pred = x -> Character.isLetter(x.charAt(0));
System.out.println(list.stream().anyMatch(pred)); // true

# forEach
Stream s = Stream.of("Monkey", "Gorilla", "Bonobo");
s.forEach(System.out::print); // MonkeyGorillaBonobo
PS: Streams cannot use a traditional for loop to run because
they dont implement the Iterable interface.

PS: Examples copied from OCP8

The * Match methods, for example, can be called lazy because as soon as you find a matching element, there's no need to continuing processing the stream.

Let's check your understanding of terminal operations. Can you see the problem in the example?

Stream stream = Stream.iterate("", (s) -> s + "a");
System.out.println(stream.limit(2).map(x -> x + "b"));

There is no terminal operation!!! So, the result is another stream. It will print something like java.util.stream.ReferencePipeline$3@65ab7765. The stream contains "b" and "ab" elements.

One more concept to say here is about the reductions. Examples of reduction operations: collect, count, min, max, reduce.

Reductions are a special type of terminal operation where all of the contents of the stream are combined into a single primitive or Object.

Let’s see some examples:

# Tradicional (sum elements of a list of numbers)
int sum = 0;
for (int x : numbers) {
sum += x;

# Using reduce method (sum elements of a list of numbers)
int sum = numbers.stream().reduce(0, (a,b)->a+b);
# The first parameter is the initial value
# The second one is the lambda expression to combine all the elements

# reduce
Stream stream = Stream.of("w", "o", "l", "f");
String word = stream.reduce("", (s, c) -> s + c);
// or stream.reduce("", String::concat)
System.out.println(word); // wolf

# count
Stream s = Stream.of("monkey", "gorilla", "bonobo");
System.out.println(s.count()); // 3

# min, max
Stream s = Stream.of("monkey", "ape", "bonobo");
Optional min = s.min((s1, s2) -> s1.length()s2.length());
min.ifPresent(System.out::println); // ape

# collect: use the same mutable object while accumulating
- Example 1 Stream stream = Stream.of("w", "o", "l", "f");
StringBuilder word = stream.collect(StringBuilder::new,
    StringBuilder::append, StringBuilder:append)

- Example 2 Stream stream = Stream.of("w", "o", "l", "f");
TreeSet set = stream.collect(Collectors.toCollection(TreeSet::new));
System.out.println(set); // [f, l, o, w]

- Example 3 String [] roseQuote = "a rose is a rose is a rose".split(" ");
Set words = Arrays.stream(roseQuote).collect(Collectors.toSet());
words.forEach(System.out::println); // a // rose // is

# Collecting results: to group the results.
Some examples: averagingDouble, counting, groupingBy,
   joining, maxBy, mapping, partitioningBy, summarizingDouble, toList, toMap

Stream ohMy = Stream.of("lions", "tigers", "bears");
ohMy.collect(Collectors.joining(", "));
System.out.println(result); // lions, tigers, bears
Double result = ohMy.collect(Collectors.averagingInt(String::length));
System.out.println(result); // 5.333333333333333

Map<String, Integer> map = ohMy.collect(
   Collectors.toMap(s -> s, String::length));
System.out.println(map); // {lions=5, bears=5, tigers=6}

Map<Integer, List> map = ohMy.collect(
   Collectors.groupingBy(String::length));
System.out.println(map); // {5=[lions, bears], 6=[tigers]}

Map<Boolean, List> map = ohMy.collect(
   Collectors.partitioningBy(s -> s.length() <= 5));
System.out.println(map); // {false=[tigers], true=[lions, bears]}

You can see a complete example and compare the different way to implement using array and using stream.

Parallel streams

The streams can be split into multiple parts to be processed at the same time by different threads without writing any multithreaded code. The number of available cores of the processors will determine how many threads can be processed.

# Create parallel streams (http://ocpj8.javastudyguide.com/ch18.html)
Stream parallelStream = Stream.of("a","b","c").parallel();

# Collection
List list = Arrays.asList("a","b","c");
Stream parStream = list.parallelStream();

# Execution
Stream.of("a","b","c","d","e").parallel().forEach(System.out::print);
Result 1: cbade
Result 2: cebad
Result 3: cbdea

PS: If the order is important you shouldn't use parallel stream.

Parallel Examples: Java In Action - Parallel Streams
https://github.com/java8/Java8InAction/blob/master/src/main/java/lambdasinaction/chap7/ParallelStreams.java

The parallel process sometimes can be slower than sequential process. If you use stateful operation, for example (sorted, limit, distinct, skip), it will make necessary to go through the entire stream to produce a result.

However, a parallel process can return the correct result and sometimes not. To guarantee that you will have a correct answer is necessary to synchronize the access. For that, you can use reduce to combine the elements of a stream into a single one.

With parallel streams, the reduce method creates intermediate values and then combines them, avoiding the "ordering" problem while still allowing streams to be processed in parallel by eliminating the shared stated and keep it inside the reduction process. The only requirement is that the applied reducing operation must be associative.

To work recursively with the parallel task you can use the fork/join framework. An example you can see here.

The fork/join framework was designed to recursively split a parallelizable task into smaller tasks and then combine the results of each subtask to produce the overall result.

Rules to choose parallel or sequential:

  • For a small set of data, sequential streams are almost always the best choice due to the overhead of the parallelism.
  • When using parallel streams, avoid stateful (like sorted()) and order-based (like findFirst()) operations.
  • Operations that are computationally expensive (considering all the operation in the pipeline), generally have a better performance using a parallel stream.

Now, you can see a practical example that the Java In Action gives to us.

Conclusion

The main reason to use streams is to make readable code. Sometimes the traditional loops can be faster than a sequential stream (Java performance tutorial, Performance With Java8 Streams, 3 Reasons why You Shouldn’t Replace Your for-loops by Stream.forEach()), but probably it will be improved in new versions.

To debug code can looks like a little difficult, but you can see some tips in a StackOverflow discussion and in an ibm post.

The stream is an extensive topic. You need to see each method and test them. Go one and do your homework.

Go deeply!!

Related Posts

Reference