avatarKondah Mouad

Summary

This article explores the FlatMap operator in Reactive Programming, focusing on its implementation and usage in Reactor Project.

Abstract

In this article, the author delves into the complexities of the FlatMap operator, an essential component of Reactive Programming. The FlatMap operator projects each element from upstream to a new sequence and aggregates them into a single stream, similar to a fork-join operation. The article discusses the merge operator, which is used when you want to listen for values from several publishers of events and emit them in a single Flux. The author then explains the FlatMap operator's implementation, including its fields, such as mapper, delayError, maxConcurrency, prefetch, and innerQueueSupplier. The article also covers the FlatMapMain and FlatMapInner components, which play crucial roles in the FlatMap mechanism. The author provides code examples and explanations to illustrate the concepts discussed.

Opinions

  • The FlatMap operator is a complex but essential component of Reactive Programming.
  • The merge operator is used when you want to listen for values from several publishers of events and emit them in a single Flux.
  • The FlatMap operator projects each element from upstream to a new sequence and aggregates them into a single stream, similar to a fork-join operation.
  • The FlatMapMain and FlatMapInner components play crucial roles in the FlatMap mechanism.
  • The article provides code examples and explanations to illustrate the concepts discussed.
  • The author recommends understanding the basic principles of reactive programming before diving into the FlatMap operator.
  • The author plans to cover Schedulers and Error Handling in future parts of this series.

Reactive Programming (Reactor)- Part 2- FlatMap

In Reactive Programming (Reactor)- Part 1, we explored the fundamentals of Reactive programming and discussed briefly Reactor Project.

In this article we will explore one of the most important and complex operator, FlatMap.

I highly recommend spending some time reading part 1 or any article explaining the basic principle of reactive programming (Reactor or RxJava doesn’t matter), so that you can follow easily.

QueueSubscription

It is basically just queue and subscription under one interface. it allows negotiating the fusion mode between subsequent operators.

The idea is that upstream can provide not only subscription but also allows subscribers to directly access internal queue instead of instantiating their own. Generally after fusion is established, publishers can emit their items in a pull fashion.

Operator Fusion is another topic and is not covered in this post, if you are interested, here is an excellent article explaining the concept:

https://proandroiddev.com/operator-fusion-in-rxjava-2-dcd6612cffae

Merge operator

The merge operator is used when you want to listen for values from several Publishers of events (so you subscribe to all of them) and emitting them in a single Flux.

FlatMap operator

FlatMap projects each element from upstream to a new sequence and aggregate them into a single stream, think of it like the fork-join operation, tasks would be executed in parallel, and once completed, we combine the results together (but keep in mind that order is not preserved in this case, if it’s something that matters for you, use concatMap).

FlatMap Implementation

FlatMap create many inner publishers based on received elements, then flatten these publishers into a single Flux through merging.

When it comes to the implementation, one can ask the question which operator should we implement. If you implement flatMap in terms of merge, you have to use two operators: merge and map, but having more operators means having more assembly allocation, more garbage and more GC churn.

Let’s sketch out the skeleton of flatMap operator:

We have a couple of fields here:

  • mapper: mapper callback function that will generate inner publishers.
  • delayError: whether to delay throwing an exception.
  • maxConcurency: it allows to control how many publishers can be subscribed to and merged in parallel.
  • prefetch: the prefetch parameter means the size of the first request to the merged Publisher.
  • innerQueueSupplier: will be used to track completed inner subscribers in order to unSusbscrbibe them.

When subscribe signal is triggered, subscribeOrReturn gets invoked in order to obtain the inner subscriber for the corresponding operator (each source will set a producer on their subscribers in order to process the stream), in the case of flatMap, FluxFlatMap.FlatMapMain is used (if the optimization didn’t work).

FlatMapMain is the pilot of whole flatmap mechanism. By default Reactor will set maxConcurency to 256, it will allocate 32 slots queue and prefetch equally to keep each queue filled and after sources complete, we request 1 more to keep the first request(maxConcurency) afloat.

Pay attention to the fact that asking upper publisher for 256 elements can potentially give 32 x 256 values to process , because each inner subscriber will ask for 32 elements and as we can have 256 concurrent subscribers this would give us 32 x 256 values depending on the mapper function.

Having one subscriber for all of these sources is not a good idea, we can’t really control the request this way, a FlatMapInner is introduced, it will subscribe to each generated stream.

FlatMapInner

From the flatMapMain perspective, flatMapInner are basically playing the role of queues used by the flatMapMain.

To avoid blocking and get serialized output, a queue-drain approach is used. All elements will be drained from the inner stream then emit it downstream, and request new portion of data. When the inner stream has been completed, flatMapInner on its onComplete or onError will notify flatMapMain via innerComplete or innerError respectively.

let’s briefly see what happens behind the scene.

When subscribe signal gets invoked, the publisher redirect the subscriber to the correct subscription using onSubscribe signal and request a predefined amount of elements in order to fill the queues.

Next comes onNext

When upper publisher send the elements downstream, the inner publisher gets generated, in case source implements Callable interface, we can switch to simplified version, such reactive type can either contain exactly one item or no items at all. When we invoke call method we check returned value: if it is null then reactive type doesn’t have any value, if it returned non-null value then it has only that value. we try to extract it using tryEmitScalar method.

Serialized access

Reactor serialize access in order to ensure certain methods and operations happen in a sequential way, it uses non-blocking serialization approach called queue-drain. the idea is that values will be pushed to the queue and then will be drained.

Draining the queue for the available items can happen from a different thread, to ensure that only one thread can do the work, an atomic integer wip is introduced.

wip (work in progress) indicates the amount of work to be done, we atomically retrieve the current wip count and increment it by one. The thread who increments it from 0 to 1 wins the drain right and can enter the loop. Any other thread will just increment it further and exit the method.

Whenever a work item has been drained and processed, we atomically decrement and check the wip counter (Since there is only one thread that does the decrement, we are guaranteed to not lose any signals in the process.). If it reached zero, the loop quits, if the wip value is greater than 1 before the decrement more work needs to be done.

So back to tryEmitScalar method

Let’s see how it works:

1- compareAndSet will actually check if the value is zero if it’s the case will set it to 1, the thread that manage to set it to 1 is in.

2- If the queue happens to be empty and the child runs in bounded mode, we emit the event and decrement the requested amount. We also decrement the wip count and if it is zero, we simply return. If it is non-zero, it means a concurrent request call arrived and we need to deal with it

3- we jump to the drain loop.

4- if we are in this block, it means that we didn’t win the loop, we invoke drain method, it simply increments wip indicate a running drain loop there is more work to do.

Finally, drainLoop method:

Let’s see how it works:

1- we first assume we only missed 1 drain call.

2- if the scalar queue is not empty, some elements needs to be drained from it.

3- we iterate as long as the request amount of elements is not reached or until no more value can be processed.

4- we check if the list of inner subscribers is not empty and we iterate over all inner subscribers, then we poll elements from their respective queue and process them eventually.

5- do some cleanup.

6- as discussed before, we need check if more work has arrived in the meantime.

Example

Let’s illustrate what we discussed by writing some codes. We create a list of integers, and for each element we generate a publisher (Flux.range).

this would output:

It shows exactly what we discussed, first [line 6 ] we request 256 (maxConcurency) elements, [line 18–19] an inner subscriber gets instantiated and request 32 elements (prefetch) from the inner publisher and once the inner subscriber completed [line 14] a new portion of data is requested.

Conclusion

In this post, we covered the FlatMap operator, in the next parts I will cover Schedulers and Error Handling.

References

Reactor
Reactor Project
Reactive Programming
Software Engineering
Recommended from ReadMedium