This guide provides background about the Java Reactive Streams driver and its asynchronous API. The guide also lists and explains sample custom subscriber implementations.
Note
For instructions on how to install the driver, see the Get Started guide.
Reactive Streams
This library is an implementation of the reactive streams specification. The reactive stream API consists of the following components:
A Publisher is a provider of a potentially
unbounded number of sequenced elements, published
according to the demand received from its Subscriber or multiple
instances of Subscriber.
In response to a call to Publisher.subscribe(Subscriber), the
possible invocation sequences for methods on the Subscriber class
are given by the following protocol:
onSubscribe onNext* (onError | onComplete)?
This means that onSubscribe is always
signaled, followed by a possibly unbounded number
of onNext signals, as requested by
Subscriber. This is followed by an onError signal
if there is a failure or an onComplete signal
when no more elements are available, as long as
the Subscription is not canceled.
Tip
To learn more about reactive streams, visit the Reactive Streams documentation.
Subscribers
The Java Reactive Streams driver API mirrors the Java Sync driver API and any methods that cause network
I/O to return a Publisher<T> type, where T is the type of response
for the operation.
Note
All Publisher types returned from the API are
cold,
meaning that nothing happens until they are subscribed to. So just
creating a Publisher won’t cause any network I/O. It’s not until
you call the Publisher.subscribe() method that the driver executes
the operation.
Publishers in this implementation are unicast. Each
Subscription to a Publisher relates to a single MongoDB
operation, and the Publisher instance's Subscriber receives its
own specific set of results.
Custom Subscriber Implementations
In the Java Reactive Streams documentation, we have implemented different Subscriber types. Although this is an
artificial scenario for reactive streams, we do
block on the results of one example before starting
the next to ensure the state of the database. To see the source code for all
the custom subscriber implementations, see SubscriberHelpers.java
in the driver source code.
ObservableSubscriber- The base subscriber class is the
ObservableSubscriber<T>,
a
Subscriberthat stores the results of thePublisher<T>. It also contains anawait()method so we can block for results to ensure the state of the database before going on to the next example.
OperationSubscriber- An implementation of the
ObservableSubscriberthat immediately callsSubscription.request()when it is subscribed to.
PrintSubscriber- An implementation of the
OperationSubscriberthat prints a message when theSubscriber.onComplete()method is called.
ConsumerSubscriber- An implementation of
OperationSubscriberthat takes aConsumerand callsConsumer.accept(result)whenSubscriber.onNext(T result)is called.
PrintToStringSubscriber- An implementation of
ConsumerSubscriberthat prints the string version of theresultwhen theSubscriber.onNext()method is called.
PrintDocumentSubscriber- An implementation of the
ConsumerSubscriberthat prints the JSON version of aDocumenttype when theSubscriber.onNext()method is called.
Blocking and Non-Blocking Examples
As our Subscriber types contain a latch that is only
released when the onComplete() method of the
Subscriber is called, we can use that latch
to block further actions by calling the await method.
The following two examples use our auto-requesting
PrintDocumentSubscriber.
The first is non-blocking and the second blocks
waiting for the Publisher to complete:
// Create a publisher Publisher<Document> publisher = collection.find(); // Non-blocking publisher.subscribe(new PrintDocumentSubscriber()); Subscriber<Document> subscriber = new PrintDocumentSubscriber(); publisher.subscribe(subscriber); subscriber.await(); // Block for the publisher to complete
Publishers, Subscribers, and Subscriptions
In general, Publisher, Subscriber and
Subscription types comprise a low level API and it’s
expected that users and libraries will build more
expressive APIs upon them rather than solely use
these interfaces. As a library solely implementing
these interfaces, users will benefit from this
growing ecosystem, which is a core design principle
of reactive streams.