Parallel Streams

3 minute read

Parallel Streams

Martin Fowler : Collection pipeline pattern

  • Imperative style has accidental complexity
    • Converting a for loop is a nighmare to convert it onto parallel program.
    • unnecessasary complexity trying to synchronize and parallelize
      • Structure of program needs to be changed
  • Functional style has less complexity. The codes well explaining business logic.
    • Easier to parallelize
    • maintainable: Structure remains the same, just parallel gets added
  • Mutability and parallelism doesn’t go together. Watch mutability

With Pipeline pattern, just change the stream to parallelstreams whihc can parallelize the code.

Only requirement is Lambda should a pure function. No shared mutability

stream.parallel() when the source is outside and forced to use stream after it has been created

parallelStream() when you are the source of the stream

Be Careful of sequential() right before the terminal operation.

Intermediate operations returns stream and evaluated lazily Terminal operation gets evaluated right on time

list.parallelStream()
    .map(num -> incrementWith1SecDelay(num))
    .sequential()//This takes precedence due to its proximity with forEach (Reduce operation)
    .forEach(num -> System.out.print(num+" "));
Streams Reactive Streams
Sequential vs parallel Synchronous vs Asynchronous
Entire pipeline is either sequential or parallel Depends
no segments subscribeOn - no segements \n onserveOn = segments

Order of execution

Use of forEachOrdered. The execution happens parallel, but the for each run after all the threads are completed

list.parallelStream()//Simple conversion to parallel stream
      .map(num -> incrementWith1SecDelay(num))
      .forEachOrdered(num -> System.out.print(num+" "));//Enforces Ordering on the consumer function being passed
  • forEach is unordered.
  • forEachOrdered does not impose only ordering, not sequential ordering
  • forEachOrdered can guarantee ordering to only ordered streams. Eg: list can be ordered, but set cannot
  • The order is the order in which the elements appear in the collection (* *insertion order** NOT the sort order)

Parallel and Reduce

filter and map runs parallel without issues.

first argument in reduce is not inital value

Integer result = list
      .parallelStream()//Using Parallel Stream
      .reduce(1, //First Parameter is not INITIAL value, it's an identity
              (total, e) -> add(total, e));//Returns 345 for a list of 1 to 10

Reduce does nto take an initial value, it takes an identity value

  • integer and + -> identity = 0
  • integer and * -> identity = 1

Should be a monoid

  • there should be an identity values
  • the operations performed should result the values should belong to the same set

# of Threads

The total number of thread creation depends upon the task. But # of threads <= # of cores

IO intensive Problem vs Computationally intensive problem

Computationally intensive number of threads should be less than or equal to number of cores

For IO Intensive number of threads may be greater than number of cores.

Consider the equation

# of threads = # of cores / (1 - blocking factor)

For IO intensive tasks, the blocking factor is usually large. The thread keep waiting for longer duration and that waiting time can be utilized by the CPU for some other task

for example of the thread is waiting for half the time which is 50%, the blocking factor is 0.5 and the number of threads will be double the number of CPU cores, thus in effect twice the number of threads can be created.

Default number of threads

Configuring JVM - Not recommended

java.util.concurrent.ForkJoinPool.common.parallelism=100

Sending the stream to a custom FJP. the reduce operation decides which thread pool the stream gets evaluated.

Stream<Integer> integerParallelStream =
    list.parallelStream()
    .filter(num -> num == num)
    .map(num -> incrementWith1SecDelay(num))
    //.forEach(num -> {})
    ;

customizingForkJoinPool(integerParallelStream);//Sending the stream

Runs the code in the thread which resolves the reduce operation

ForkJoinPool forkJoinPool = new ForkJoinPool(100);//parallelism = 100
forkJoinPool.submit(
        () -> integerStream.forEach(e -> {}));
//Running the reduction operation in another method with another thread

forkJoinPool.shutdown();

parallel with find first

 //Find the name of the first employee greater than 40 years of age
 String empName = employees.parallelStream()
         .filter(Objects::nonNull).filter(emp -> null != emp.getAge()).filter(emp -> null != emp.getName())
         .filter(emp -> emp.getAge() > 25)
         .map(emp -> emp.getName())
         .findFirst()//Ordered and thus yields same result in both parallel and sequential
         .orElse("No Emp Found");

Erratic behaviour

//Find the name of the any employee greater than 25 years of age
 String empName = employees.parallelStream()
         .filter(Objects::nonNull).filter(emp -> null != emp.getAge()).filter(emp -> null != emp.getName())
         .filter(emp -> emp.getAge() > 25)
         .map(emp -> emp.getName())
         .findAny()//behaves erratically with Parallel stream. Runs fine with sequential execution
         .orElse("No Emp Found");

Tags:

Categories:

Updated: