avatarSatya Pavan Kantamani

Summarize

Master Different Types of Observables in RxJava

Pick the right Observable for your requirements

If you don’t have any basic idea about RxJava, please go through the post RxJava: Multi-Threading in Android. If you have a basic idea of RxJava you would know that Observable is a component that emits values to Observer. This post is all about exploring the different types of Observables available in RxJava.

Different Types of Observables

An observable can deliver multiple values of any type — literals, messages, or events, depending on the context. Depending on how the Observable emits the data they were categorized as follows:

  1. Observable
  2. Single
  3. Completable
  4. Maybe
  5. Flowable

For each Observable, we have the following Observers.

  • Observer
  • SingleObservable
  • CompletableObserver
  • MaybeObservable

As in the post RxJava: Multi-Threading in Android we have already explored Observable. So let us understand other types in detail.

Single

Single is an observable that emits only one item or throws an error. In other words, Single is a reactive base type that can emit a single onSuccess or onError. The simplest usage of Single in Android is when we make a network call to consume some data. Single is one of the most frequently used Observable because we have Network calls in most of the apps. It is an implementation of the interface io.reactivex.SingleObserver<T> that has three methods.

interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}
  • onSubscribe: will be called whenever an observer is subscribed
  • onError: will be called whenever an error is thrown at some point of the stream
  • onSuccess: will be called if we get a response from the stream, in this case, we can handle the response

Let’s assume that there is a TvShows App with a TvShowsRepo which is a Retrofit interface with methods to get show details.

interface TvShowsRepo {
@Get(Constants.BaseUrl +"/show-detaisl/")
fun getShowDetails(showId: Int):Single<SeriesResponse>
}

Here we use Single because it’s a one-time task. We will inject the repo to the ViewModel and consume the data as shown below

This method getShowDetails() gets executed and returns either the response from the server or the error. This is how we generally use the Single Observable for one-time usage. Check out more about Single.

Completable

Completable is only concerned with execution completion without emitting a value. Completable observable won’t emit any data instead it notifies the status of the task either success or failure.

interface CompletableObserver<T> {
    void onSubscribe(Disposable d);
    void onComplete();    
    void onError(Throwable error);
}
  • onSubscribe will be called once by the Completable to set a Disposable on this instance, which then can be used to cancel the subscription at any time in the future
  • onError will be called once if the deferred computation throws an exception
  • onComplete will be called once the deferred computation completes normally without any error

We generally use Completable to store the values in preferences or SQLite local database, etc where the response is not required.

@Singleton
class CachedRepoImpl @Inject constructor(private val cacheService: CacheService) : CacheRepo {
fun saveUserData(userModel: String): Completable {
    return Completable.fromAction {
        cacheService.saveProfile(userModel)
    }
}
}

In the above cacheService is a class where it has an injection of Shared preferences to save and retrieve data.

We consume this as fallowing

The method saveUserDetails() method gets executed and returns success or failure callback of task execution. This is how we generally use the Completable Observable for success or failure callbacks. Check out more about Completable.

Maybe

Maybe observable may or may not emit a value. This observable can be used in a case where you are expecting an item to be emitted optionally.

interface MaybeObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
    void onComplete();
}

onSubscribe() provides the MaybeObserver with the means of canceling or disposing of the connection with the Maybe in both synchronous and asynchronous manner.

onSuccess() notifies the MaybeObserver with one item and that the has finished sending push-based notifications.

onError notifies the MaybeObserver that has experienced an error condition.

onComplete called once the deferred computation completes normally.

Let’s look at the following snippet

When it is executed the output would be:

Hello

Next, let's have a look at another snippet

When the above snippet is executed the output would be:

Completed. No items.

The second one emits onComplete Completed. No items because there are no items to emit. The first one outputs Hello inside onSuccess and does not emit an onComplete after that emission. Maybe is, to some extent. similar to Single Observable except that it’s an optional thing in Maybe to emit a value. Explore more about Maybe.

Flowable

Flowable is an observable that should be used when an Observable is generating huge amounts of data that the Observer is not able to handle these data emissions. Observable sources don’t support backpressure. There are two main aspects where we use Flowable:

  1. Complex cases where huge amounts of data flow that UI can’t handle
  2. To get notified when there is a change in data already observed

The second usage is common in chatting apps to update the UI frequently

public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}

Subscribe method requests to start streaming data. This is a factory method and can be called multiple times, each time starting a new Subscription. Each Subscription will work for only a single Subscriber. A Subscriber should only subscribe once to a single Publisher. If the Publisher rejects the subscription attempt or otherwise fails it will signal the error via onError callback.

Flowable.fromArray(1, 2, 3, 4).subscribe(
        { i: Int? -> Timber.v("Entry %d\n", i) },
        { e: Throwable? ->Timber.v("Failed to process: %s\n", e) }
) { Timber.v("Done") }

The above was a simple usage of Flowable which prints numbers from the array.

When Flowable is used, the overflown emissions have to be handled using a strategy called Backpressure. If not it throws an exception such as MissingBackpressureException or OutOfMemoryError. We will discuss this in detail in our upcoming posts.

Explore more about Flowable.

Bonus

To learn more about RxJava, read the previous parts of this from Basic to Advanced series on Rx:

References

https://github.com/ReactiveX/RxJava

Android
Programming
Mobile
Java
Reactive Programming
Recommended from ReadMedium