Sunday, June 4, 2017

RxJava simple example



What is RxJava? It's extended implementation of Observer pattern:
It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.
http://reactivex.io/intro.html


1. Observable

In RxJava, Observable - component which emits data.  Subscriber - consumer of these emitted data.
To "send" data to subscriber, observable has to execute "onNext" method on subscriver. When no more available data - observable execute "onComplete" method. If exception has happen - "onError" method.

Let's create a simple class which is taking JSON from rest server, parsing it and on every portion of data - emits it to subscriber. Sample JSON was taken from here. I used GROOVY, because JSON parsing in JAVA much more complicated:

class UsersParser {

    static Observable getObservable(String json) {

        Observable result = Observable.create { subscriber ->

            try {
                def jsonSlurper = new JsonSlurper()
                def usersList = jsonSlurper.parseText(json)
                usersList.each({ e ->
                    def val = "[$e.email] $e.username"                    subscriber.onNext((String) val)
                })
                subscriber.onComplete()
            } catch (Throwable ex) {
                subscriber.onError(ex)
            }
        }

        return result    }

}


JSON contains array of USER objects. For every element in array we are producing result value and "sending" it to subscriber by:  subscriber.onNext((String) val)
When all data are processed, we are executing: subscriber.onComplete()
And if something went wrong: subscriber.onError(ex)


2. Subscriber

Now, to "receive"(consume) data which was "sent"(emitted) by Observable, we need to subscribe (in fact, create Subscriber):   

Observable<String> userObservable = UsersParser.getObservable(json);
userObservable.subscribe(
        t -> System.out.println(t),        e -> e.printStackTrace(),        () -> System.out.println("done")
);


Parameters in "subscribe" method corresponds to methods described above in Observable:

 t -> System.out.println(t)
- corresponds to subscriber.onNext((String) val) - here we just process received object (in current example we just print them)

e -> e.printStackTrace()
- corresponds to subscriber.onError(ex)  - error processing

() -> System.out.println("done")
- corresponds to subscriber.onComplete() - receiving is over(in current example we just print "done")


3. Operations

We can easily transform one observable to another by operator. Why are very similar to STREAMS api from Java 8:

Observable<String> filteredObservable = userObservable        .map(e -> e.toUpperCase())
        .filter(e -> e.indexOf("BIZ") > 0);filteredObservable.subscribe(e -> System.out.println(e));

Also, where are different ways to create Observable from array, callable, future......

Observable.just(1, 2, 3, 4, 2, 1, 3)
        .distinct()
        .subscribe(e -> System.out.print(e));


4. The end

After release of Java 8, RxJava is not really actual for regular java developers, but for Android developers it can be very useful. Full source code can be downloaded from here.