What is RxJava? It's extended implementation of Observer pattern:
It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.
http://reactivex.io/intro.html
1. Observable
In RxJava, Observable - component which emits data. Subscriber - consumer of these emitted data.To "send" data to subscriber, observable has to execute "onNext" method on subscriver. When no more available data - observable execute "onComplete" method. If exception has happen - "onError" method.
Let's create a simple class which is taking JSON from rest server, parsing it and on every portion of data - emits it to subscriber. Sample JSON was taken from here. I used GROOVY, because JSON parsing in JAVA much more complicated:
class UsersParser { static Observable getObservable(String json) { Observable result = Observable.create { subscriber -> try { def jsonSlurper = new JsonSlurper() def usersList = jsonSlurper.parseText(json) usersList.each({ e -> def val = "[$e.email] $e.username" subscriber.onNext((String) val) }) subscriber.onComplete() } catch (Throwable ex) { subscriber.onError(ex) } } return result } }
JSON contains array of USER objects. For every element in array we are producing result value and "sending" it to subscriber by: subscriber.onNext((String) val)
When all data are processed, we are executing: subscriber.onComplete()
And if something went wrong: subscriber.onError(ex)
2. Subscriber
Now, to "receive"(consume) data which was "sent"(emitted) by Observable, we need to subscribe (in fact, create Subscriber):Observable<String> userObservable = UsersParser.getObservable(json); userObservable.subscribe( t -> System.out.println(t), e -> e.printStackTrace(), () -> System.out.println("done") );
Parameters in "subscribe" method corresponds to methods described above in Observable:
t -> System.out.println(t)- corresponds to subscriber.onNext((String) val) - here we just process received object (in current example we just print them)
e -> e.printStackTrace()- corresponds to subscriber.onError(ex) - error processing
() -> System.out.println("done")- corresponds to subscriber.onComplete() - receiving is over(in current example we just print "done")
3. Operations
We can easily transform one observable to another by operator. Why are very similar to STREAMS api from Java 8:Observable<String> filteredObservable = userObservable .map(e -> e.toUpperCase()) .filter(e -> e.indexOf("BIZ") > 0);filteredObservable.subscribe(e -> System.out.println(e));
Also, where are different ways to create Observable from array, callable, future......
Observable.just(1, 2, 3, 4, 2, 1, 3)
.distinct() .subscribe(e -> System.out.print(e));