- References
- Out-of-the-box Reactive Streams with Java 9
- Java Streams vs Reactive Streams: Which, When, How, and Why? by Venkat Subramaniam
- Reactive Programming by Venkat Subramaniam
- From Functional to Reactive Programming, Venkat Subramaniam
- WJUG #239 - Jacek Kunicki: Jak (nie) używać Reactive Streams w Javie 9+
- GOTO 2018 • Real-world Reactive Programming in Java: The Definitive Guide • Erwin de Gier
- https://www.manning.com/books/akka-in-action
- https://github.com/rucek/reactive-streams-java9
- https://github.com/reactive-streams/reactive-streams-jvm
- https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html
- https://blog.softwaremill.com/how-not-to-use-reactive-streams-in-java-9-7a39ea9c2cb3
- https://thepracticaldeveloper.com/2018/01/31/reactive-programming-java-9-flow/
- https://www.quora.com/Whats-the-difference-between-push-and-pull-protocols
- http://blog.amitinside.com/Java-Iterator-Pattern-vs-Stream-vs-RxObservable/
- http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/BackpressureStrategy.html
- http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/Schedulers.html
The main goal of this project is to explore basic features of
reactive streams
introduced in Java 9
:
- Publisher
- Subscriber
- Subscription (Backpressure)
- Processor (SubmissionPublisher)
- reactive programming (idea formulated by Eric Meijer)
- the applications we developer, the programs we create must be really responsive and be able to react to stimuli in a system
- the main objective of the reactive programming is NOT to be as fast as possible
but to use resources (CPU, memory, ...) in the most efficient manner
- core idea behind reactive is to release resource whenever possible
- reactive programming is functional programming+
- before java 8: completely imperative + object oriented
- please refer: https://github.com/mtumilowicz/java12-introduction-to-functional-programming-workshop
- imperative: tell me what tell me how
- declarative: tell me what and NOT how
- functional (style): declarative + higher-order function
- functional programming: function composition + lazy evaluation
- functional programming and exceptions are mutually exclusive
- exceptions + java 8 streams: if you are driving a car and
have a problem with a radio - the most illogical thing to do is to reverse back
- you should turn off the radio and continue journey
- Michael Feather: OO makes code understandable by encapsulating moving parts. FP makes code understandable by
minimizing moving parts.
- I moving part: immutability
- II moving part: control flow (in imperative - we are going up and down to follow the flow)
- https://www.reactivemanifesto.org/
- OOP four pillars:
- abstraction (programming is based on making abstractions),
- encapsulation (best practice for every programming style),
- inheritance (golang has no inheritance),
- polymorphism (makes actually OOP)
- reactive four pillars
- responsive
- infinite-scrolling
- providing responsiveness
- efficiency is attained not by doing tasks faster, but by avoiding those that shouldn't be done in the first place
- resilient - make failure first-class citizen (it is okay to fail)
- elastic - the only reasonable direction to scale is horizontally
- message driven - do not expose your database instead export your data
- responsive
- reactive manifesto conclusions
- blocking I/O limits opportunities for parallelism, so nonblocking I/O is preferred
- synchronous interaction limits opportunities for parallelism - asynchronous interaction is preferred
- polling reduces opportunity to use fewer resources, so an event-driven style is preferred
- if one node can bring down all other nodes, that’s a waste of resources
- you need isolation of errors (resilience) to avoid losing all your work
- systems need to be elastic: if there’s less demand - use fewer resources
- if there’s more demand - use more resources, but never more than required
- OOP four pillars:
- shared mutability
Thread th = new Thread(new Runnable() { public void run() { // we don't produce anything and don't consume anything // it could be useful only by shared mutability // how to work with threads if shared mutability is dangerous // and thread API forces us to use it } })
- in the past the structure of sequential code was very different from the structure
of concurrent code
- with stream: the structure of sequential code is the same as the structure of concurrent code
- how many threads should you create?
- computation intensive <= # of cores
Schedulers.computation()
- IO intensive =
NCPU * UCPU * (1 + W/C)
- NCPU is the number of cores, available through
Runtime.getRuntime().availableProcessors()
- UCPU is the target CPU utilization (between 0 and 1)
- W/C is the ratio of wait time to compute time
Schedulers.io()
- NCPU is the number of cores, available through
- number of threads is therefore strictly limited
- by memory also
- computation intensive <= # of cores
- example of reactive application
- excel: if you modify one cell, it propagates to other cells
- google docs - hundreds of people use it simultaneously
- stream is not a data structure it is is actually a collection of functions (with a data source: network, file, etc.)
- Martin Fowler: Collection Pipeline Pattern
- limitations
- stream cannot be reused
- single pipeline (a single terminal operation)
- cannot split into two
- no exceptions handling
-
vs java 8 streams
java streams reactive streams pipeline pipeline push data push/pull data lazy lazy 0, 1, oo 0, 1, oo data only 3 channels: data, error, complete exceptions: good luck deal with it downstream (error is just another form of data) sequential vs parallel synch vs async single pipeline (one terminal operation) multiple subscribers -
vs CompletableFuture
reactive streams CompletableFuture/Promises 0, 1, oo 0, 1 3 channels 2 channels (data, error) -
there are three key factors that make a stream reactive:
- the data is processed asynchronously
- the backpressure (strategy of co-op with very fast producer) mechanism is non-blocking
- the fact that the downstream can be slower than the upstream is somehow represented in the domain model
- the Twitter streaming API, where you can be disconnected if consuming too slow
-
Observable (producer) vs Observer pattern
- it's that, plus
- signal end of data stream
- propagate error
- evaluation may be synchronous, asynchronous or lazy
- it's that, plus
-
nonblocking backpressure
- BUFFER - buffers all onNext values until the downstream consumes it
- DROP - drops the most recent onNext value if the downstream can't keep up
- ERROR - signals a MissingBackpressureException in case the downstream can't keep up
- LATEST - keeps only the latest onNext value, overwriting any previous value if the downstream can't keep up
- MISSING - OnNext events are written without any buffering or dropping
-
hot vs cold
- cold = every subscriber starts fresh subscription
- like iterator, if you start again you start from the beginning
- hot = start from a point in time, like football online transmission
- cold = every subscriber starts fresh subscription
-
visualizations: https://rxmarbles.com/
- times when you have to go to bank and talk with a person
- times when you have to go to travel agency to buy tickets
- in the past companies made products for their employees to use and make those employees (nobody cares what they think) available to us the customers
- now companies build product for real-users, IOT
-
summary
-
push protocols:
- the client opens a connection to the server and keeps it constantly active
- the server will send (push) all new events to the client using that single always-on connection
- in other words, the server PUSHes the new events to the client
-
pull protocols:
- the client periodically connects to the server, checks for and gets (pulls) recent events and then closes the connection and disconnects from the server
- the client repeats this whole procedure to get updated about new events
- in this mode, the clients periodically PULLs the new events from the server
-
code example
private static void pullExample() { final List<String> list = Lists.newArrayList("Java", "C", "C++", "PHP", "Go"); final Iterator<String> iterator = list.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next()); } } private static void pushExample() { final List<String> list = Lists.newArrayList("Java", "C", "C++", "PHP", "Go"); final Observable<String> observable = Observable.from(list); observable.subscribe(System.out::println, System.out::println, () -> System.out.println("We are done!")); }
- Flow.Publisher - source of data
- Flow.Subscriber - destination of data
- Flow.Subscription -
message control linking a
Flow.Publisher
andFlow.Subscriber
(Subscriber
signal demand toPublisher
usingSubscription
) - Flow.Processor -
a component that acts as both a
Subscriber
andPublisher
(can consume input and produce output). - Flow.SubmissionPublisher -
the only one implementation (in
JDK
) ofFlow.Publisher
; has ability to asynchronously issue submitted (non-null) items to current subscribers until it is closed.
PUBLISHER -> PROCESSOR -> PROCESSOR -> SUBSCRIBER
We have two scenarios:
Publisher
is slow,Subscriber
is fast (best scenario)Publisher
is fast,Subscriber
is slow (theSubscriber
must deal
with excessive data - the most naive approach is just to drop all excessive data - so the data will be lost)
Note that if we have multiple subscribers
and one publisher
- they
are receiving elements in the same order.
- implement
Flow.Publisher
(using, for exampleSubmissionPublisher<T>
) andFlow.Subscriber
- the subscriber attempts to subscribe to the publisher by calling the
`subscribe(Flow.Subscriber<? super T> subscriber)
- take a look at
SubmissionPublisher.subscribe(Subscriber<? super T> subscriber)
method of the publisher - success: the publisher asynchronously calls the
onSubscribe(Flow.Subscription subscription)
method of the subscriber - failure:
onError(Throwable throwable)
method of the subscriber is called with anIllegalStateException
, and the interaction ends
- take a look at
- the subscriber sends a request to the publisher for
N
items calling therequest(N)
on theSubscription
- multiple requests are send regardless if earlier are already fulfilled (non-blocking)
- the publisher calls the
onNext(T item)
method of the subscriber and sends an item in each call- if there is no more items to send the publisher calls the
onComplete()
method of the subscriber to signal the end of stream, and interaction ends - note that if subscriber requests
Long.MAX_VALUE
items, the stream becomes not reactive - it is effectively a push stream
- if there is no more items to send the publisher calls the
- if the publisher encounters an error - calls
onError(Throwable throwable)
on subscriber - the subscriber can cancel its subscription by calling the
cancel()
method on its subscription- if a subscription is cancelled, the interaction ends
- it is possible for the subscriber to receive items after cancellation if there were pending requests before
Correctly @Override
method onSubscribe
looks as below:
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription == null) {
this.subscription = subscription;
this.subscription.request(1); // we handle backpressure through subscription
}
else {
subscription.cancel(); // we handle cancellation through subscription
}
}
-
because we want our
Subscriber
talk to only onePublisher
-Subscription
represents a link between singleSubscriber
and singlePublisher
so you have to cancel the incoming one (if we already have one, we don't accept any furthers)- think about subscriber as a radio receiver, subscriptions as radio waves, and publisher as radio station
-
the actual goal of having them included in the JDK is to provide something called a Service Provider Interface (or SPI) layer
- this should eventually serve as a unification layer for different components that have reactive and streaming nature, but may expose their own custom APIs, and thus not be able to interoperate with other similar implementations
We test it by running:
RunAnswer
RunWorkshop
the output should be:
onNext, item: new mapping: 2
onNext, item: new mapping: 4
onNext, item: new mapping: 6
onNext, item: new mapping: 8
onNext, item: new mapping: 10
onNext, item: new mapping: 12
...