Reactive systems using Reactor
Reactor 3.x is a Java library for writing reactive
applications using the Reactive Streams standard. Reactor Core
provides a reactive streams implementation similar in style to
RxJava 2. Because it is based on standard reactive streams, Reactor
can easily integrate with any other reactive streams library (particularly
RxJava 2). There are adapters available for RxJava 1 and the
CompleteableFuture
API as well as conversions for the new JDK 9
Flow
reactive stream interfaces. Reactor also provides an IPC
API for implementing reactive streams systems for general networking and off-JVM
communication. The main implementation of the IPC API is Reactor Netty
which provides TCP, UDP, and HTTP DSLs. There are other modules in the works,
but Reactor 3.x is still under heavy development.
Reactive Streams
Reactive streams are an API and pattern to handling streams of data asynchronously and non-blocking while providing backpressure to stream publishers so that stream subscribers do not get overwhelmed and require maintaining internal buffers or cause blocking. The reactive streams standard provides the minimal API to support this style of architecture which is also being introduced in Java 9.
The main classes in this API are Publisher<T>
and Subscriber<T>
. A
Publisher
is the source of events T
in the stream, and a Subscriber
is
the destination for those events. The basic idea of how these classes
interact is that a Subscriber
subscribes to a Publisher
which invokes
Subscriber::onSubscribe(Subscription)
. When the Subscriber
is ready to
start handling events, it signals this via a request to that Subscription
Upon receiving this signal, the Publisher
begins to invoke
Subscriber::onNext(T)
for each event T
. This continues until either
completion of the stream (Subscriber::onComplete()
) or an error occurs
during processing (Subscriber::onError(Throwable)
).
Reactor
Clearly, the reactive streams API is too low level to make practical use of it
in reactive applications. This is where Reactor Core (or RxJava) come into play
by providing a DSL for handling these streams. Reactor provides two main types
of publishers: Flux
and Mono
. A Flux
is a general purpose
publisher that can contain an unbounded number of events. A Mono
is a specialized
publisher that can contain only zero or one events. For a hands-on introduction
to these classes, take a look at this GitHub project and try to
implement all the test stubs. We’ll go over some of the tests below.
Trying Out Reactor
As a neat way to get familiar with the Reactor Core API, we’ll be implementing
the code necessary to pass these test stubs. These tests will cover Flux
,
Mono
, Scheduler
, StepVerifier
, and bridges to other APIs.
Flux
Our first task is to create an empty Flux
. Such a Flux
will publish no events
and will immediately complete without error. Flux
provides a static factory
method for creating such a Flux
:
Flux<String> emptyFlux() {
return Flux.empty();
}
Our next task is to create a Flux
that will stream through two strings and
complete. Flux
provides another static factory method for this:
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
Next, we need to create a Flux
from a standard List
collection. Again, we
can use a factory method:
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
Next, we want a Flux
that errs immediately instead of completing normally.
Did you guess that there’s a factory method for it?
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
Finally, we start to get a little interesting by creating a Flux
that emits
the sequence 0 through 9, and each emission should be spaced out by 100 ms.
Here we’ll use the Flux.interval()
factory and limit it to only 10 items:
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.take(10);
}
Mono
The next test class revolves around basic Mono
APIs. As with Flux
, our
first task is to create an empty Mono
. This Mono
will complete immediately
as with an empty Flux
:
Mono<String> emptyMono() {
return Mono.empty();
}
Next, we want to create a Mono
that never signals an event, completion, or
error. This is available through a factory method:
Mono<String> monoWithNoSignal() {
return Mono.never();
}
The next task is to create a Mono
with a single value. This is available
through a similar factory as Flux.just(T...)
:
Mono<String> fooMono() {
return Mono.just("foo");
}
Finally, we create a Mono
that does not emit any events but instead emits
an error right away. This works the same as Flux.error(Throwable)
:
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
Transforming Streams
Now that we know the basics of creating Flux
and Mono
publishers, let’s do
something with them. There are two styles of mapping events to new objects:
synchronously and asynchronously. The synchronous style is rather simple and
works the same with both Flux
and Mono
:
Mono<String> toUpperCase(Mono<String> mono) {
return mono.map(String::toUpperCase);
}
However, suppose that our mapped function is asynchronous or could be made
asynchronous in order to prevent blocking in our stream. In this style mapping
function, instead of returning the same object type, we wrap it into a Mono
so that the computed value can be taken when ready:
Flux<String> toUpperCase(Flux<String> flux) {
return flux.flatMap(s -> Mono.just(s.toUpperCase()));
}
But wait, that’s still executing s.toUpperCase()
right away! What if we want
to wait until it’s consumed to make the computation? We can wrap the execution
into a deferred publisher:
Flux<String> asyncToUpperCase(Flux<String> flux) {
return flux.flatMap(s -> Mono.defer(() -> Mono.just(s.toUpperCase())));
}
Combining Streams
Suppose we have two streams and we wish to interleave their events together into a single stream. We can merge them like so:
Flux<String> interleave(Flux<String> flux1, Flux<String> flux2) {
return flux1.mergeWith(flux2);
}
Now suppose we wish to combine two streams, but we want events from one stream to all happen before events from the other stream. We can concatenate them:
Flux<String> combine(Flux<String> flux1, Flux<String> flux2) {
return flux1.concatWith(flux2);
}
Both of these operations can also be performed on Mono
publishers.
Handling Errors
One of the things that really make reactive streams more useful than the Java 8
streams is the ability to specify error handling at various levels. For example,
suppose we have a Mono
that we wish to provide a default value to return in
case of an error:
Mono<String> provideDefaultForError(Mono<String> mono) {
return mono.otherwiseReturn("error string");
}
Perhaps on error we wish to substitute our stream with a different stream:
Flux<String> provideFallbackStreamForError(Flux<String> flux) {
return flux.onErrorResumeWith(e -> Flux.just("foo", "bar"));
}
Other Operations
Suppose we have a Person
class:
public class Person {
public final String firstName;
public final String lastName;
public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
}
Now suppose we have two streams of data, one corresponding to first names, and the other corresponding to last names. We can zip these streams together to form a new stream:
Flux<Person> zipIntoPersons(Flux<String> firstNames, Flux<String> lastNames) {
return Flux.zip(firstNames, lastNames)
.map(pair -> new Person(pair.getT1(), pair.getT2()));
}
Suppose we have two Mono
streams and we want to get an item from whichever one
can provide it faster:
Mono<String> getFasterStream(Mono<String> mono1, Mono<String> mono2) {
return mono1.or(mono2);
}
In a similar situation, suppose we have two Flux
streams and want to use
whichever stream can begin emitting items first:
Flux<String> getFasterStream(Flux<String> flux1, Flux<String> flux2) {
return flux1.firstEmittingWith(flux2);
}
Finally, we can take a Flux
and return a Mono<Void>
that is only used to
indicate when the stream has completed or erred:
Mono<Void> getCompletionOf(Flux<String> flux) {
return flux.then();
}
Reactive/Blocking Conversions
Suppose we have a Mono
or Flux
that we wish to make a blocking call to
obtain values from for use in a synchronous API. Using a Mono
, we do:
String getValue(Mono<String> mono) {
return mono.block();
}
Using a Flux
, we do:
Iterable<String> getValues(Flux<String> flux) {
return flux.toIterable();
}
Next, we go in the other direction. Suppose we wish to convert a collection
into a Flux
. Perhaps obtaining the collection is expensive, so we defer
looking it up until necessary. In this case, we’ll also specify a
Scheduler
so that we don’t block our main thread. This scenario is a fast
subscriber but slow publisher:
Flux<Person> asyncPersonLookup(PersonRepository repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
.subscribeOn(Schedulers.elastic());
}
In the reverse scenario, suppose we have a fast publisher and a slow subscriber
such as storing a record in a database. Using a Flux
for the publisher, we
can publish on a separate Scheduler
. Since we’ll be saving data, we’ll
just return a Mono<Void>
to indicate when the stream has finished
processing:
Mono<Void> asyncSavePersons(Flux<Person> flux, PersonRepository repository) {
return flux.publishOn(Schedulers.parallel())
.doOnNext(repository::save)
.then();
}
Blog Microservice Example
In the previous Lagom article, we implemented a blog microservice. Let’s do the same thing here, but we’ll use Reactor Core along with Spring 5. If you’re not already familiar with Lombok, take a look at the Lagom article for more information. The source code is on GitHub for this example.
The goal of this microservice is to provide a way to create, read, and update blog posts. We’ll begin with a data model for our blog posts:
@Value
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
public class PostContent {
@NonNull
String title;
@NonNull
String author;
@NonNull
String body;
}
There are a bit less annotations to specify here than in the Lagom example, but
that’s because Spring does not have a strong opinion about mutability. This
data model will be used for our REST API and has the same structure as
PostContent
from the Lagom version.
Next, we’ll specify our REST controller which is equivalent to our Lagom service. Since we’re working with reactive microservices, we’ll be using reactive streams rather than directly using our data model.
@RestController
@RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE)
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class BlogController {
private final BlogService service;
@GetMapping("/api/blog/{id}")
public Mono<PostContent> getPost(@PathVariable final UUID id) {
return service.getPost(id);
}
@PostMapping("/api/blog")
public Mono<UUID> addPost(@RequestBody Mono<PostContent> content) {
return service.addPost(content);
}
@PutMapping("/api/blog/{id}")
public Mono<Void> updatePost(@PathVariable final UUID id, @RequestBody final Mono<PostContent> content) {
return service.updatePost(id, content);
}
}
Already we can see that Spring 5 has added support for using Mono
and Flux
for both request bodies and response bodies. For our API, we only use single
objects, so we only use Mono
. Also note that Mono
has replaced our use of
Optional
as it functions similarly to a reactive Optional
.
Next, we’ll define our entity class. You may notice that we are no longer using the CQRS pattern; this is because Lagom made it easy to follow by implementing a bit of the tedium involved in tracking snapshots of state along with doing most of the work of implementing the read side of things. Instead of using an event log directly, we’ll create a keyspace and table in Cassandra manually and use that. Note that Cassandra itself does implement its storage engine using an event log, so updates are just as fast as creates are regardless of our pattern.
@Value
@AllArgsConstructor(onConstructor = @__(@JsonCreator))
@Table
public class BlogPost {
@PrimaryKey
@NonNull
UUID id;
@NonNull
String title;
@NonNull
String author;
@NonNull
String body;
}
The only new annotations here are @Table
and @PrimaryKey
which are both
from the spring-data-cassandra API. We’ll also define a spring-data repository
interface for this class:
public interface BlogRepository extends CrudRepository<BlogPost, UUID> {
}
Since we don’t have any special querying needs, we don’t need to create any custom repository methods.
To set up our Cassandra data store, we’ll create a single keyspace and table for our data. In the example repository, this is handled on startup using an embedded Cassandra instance, but it could be easily refactored to use an external one that was set up ahead of time.
CREATE KEYSPACE blogs WITH REPLICATION = { 'class' : 'SimpleStrategy' };
CREATE TABLE blogs.blogpost (
id uuid PRIMARY KEY,
title text,
author text,
body text
);
Finally, we get to the interesting part: bridging the non-reactive Spring Data API into our reactive REST API. Using the reactive stream concepts from earlier, we implement our service class:
@Service
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class BlogService {
private final BlogRepository repository;
public Mono<PostContent> getPost(final UUID id) {
return Mono.defer(() -> Mono.justOrEmpty(repository.findOne(id)))
.subscribeOn(Schedulers.elastic())
.map(post -> new PostContent(
post.getTitle(), post.getAuthor(), post.getBody()));
}
public Mono<UUID> addPost(final Mono<PostContent> contentMono) {
return contentMono
.map(content -> new BlogPost(UUID.randomUUID(),
content.getTitle(), content.getAuthor(), content.getBody()))
.publishOn(Schedulers.parallel())
.doOnNext(repository::save)
.map(BlogPost::getId);
}
public Mono<Void> updatePost(final UUID id, final Mono<PostContent> mono) {
return mono
.map(content -> new BlogPost(id,
content.getTitle(), content.getAuthor(), content.getBody()))
.publishOn(Schedulers.parallel())
.doOnNext(repository::save)
.then();
}
}
Notice that our getPost()
method uses Mono.defer()
to use a blocking API
(in this case, the Spring Data CrudRepository
API) and fetch an item which
may be null
, then maps it to our desired data format. Our addPost()
method
works in reverse by mapping to our entity format and then publishes on a
scheduler to save the repository item. Finally, our updatePost()
method works
essentially the same way.
As can be seen, Spring 5 is making reactive streams programming much easier by building in direct support for Reactor and RxJava into Spring Web. However, there is still room for improvement in further Spring releases to add first class reactive stream support to other Spring projects, particularly Spring Data.
Edit (December 2016): Spring Data has made the first milestone release of the next generation of Data that enables native Reactor support. More info is available on their blog.