CodeDevStack

Zip a Mono with a Flux stream using Reactor

January 31, 2020

Let’s imagine we have a Flux and Mono stream we want to zip:

Mono<String> myMono = Mono.just("monoValue");
Flux<String> myFlux = Flux.just("fluxValue1", "fluxValue2");

Zipping them is actually pretty simple, but you need to consider that Flux.zip() will stop when the shortest stream is finished. In this case we are going to be zipping monoValue with fluxValue1 and then the zip() will stop.

Flux.zip(myMono, myFlux, (a,b) -> a + " " +  b)
//monoValue fluxValue1
//Done zipping

Of course, this is not what we want. We want to use the same Mono with every value in the myFlux stream. In order to achieve this we can use repeat() on the Mono. This way, the Mono will repeat itself indefinitely and the zip will run until the myFlux runs out of values.

Repeat will repeatedly and indefinitely subscribe to the source upon completion of the previous subscription.

The final snippet will look like this:

Flux.zip(myMono.repeat(), myFlux, (a,b) -> a + " " +  b)
//monoValue fluxValue1
//monoValue fluxValue2