avatarSatya Pavan Kantamani

Summary

This context discusses different ways to create Observables in RxJava.

Abstract

The context explains that RxJava is a reactive library that can be integrated into applications, with Observables being the source that emits data to Observers. The article lists the different types of Observables in RxJava, including Observable, Flowable, Single, Maybe, and Completable. It then goes on to describe various methods for Observable creation, such as create(), just(), defer(), empty(), never(), error(), range(), interval(), timer(), and from(). The article also provides examples and explanations for each method.

Opinions

The author suggests that the create() method is the preferred way to implement custom observable sequences and that the just() method is one of the easiest and most convenient ways to create an Observable. The author also notes that the defer() method waits until an observer subscribes to it before generating an Observable, which can ensure it contains the latest data. Additionally, the author explains that the empty() method creates an Observable that emits no items to the Observer and immediately invokes its onComplete() method, while the never() method creates an Observable that emits no items and does not terminate.

Different Ways to Create Observables in RxJava

How many ways to create an Observable are there?

Photo by Mika Korhonen on Unsplash

Rx stands for Reactive Extensions. RxJava is an awesome reactive library that we can easily integrate into our applications.

We can understand RxJava as data emitted by one component, called Observable, and the underlying structure provided by the Rx libraries will propagate changes to another component, Observer. Simply put, it’s an API for asynchronous programming with observable streams.

RxJava — Multi-Threading in Android helps to understand the basics of Rx, everything about Observables, Observers, Schedulers, etc. So, hoping that you already know about basics of RxJava lets start by discussing Observable.

What is an Observable?

In RxJava, Observables are the source that emits data to the Observers. We can understand observables as suppliers — they process and supply data to other components. It does some work and emits some values.

The following are the different types of Observables in RxJava

  • Observable
  • Flowable
  • Single
  • Maybe
  • Completable

We’ll discuss each type in detail in the next post but just remember that there are different types of Observables for different purposes.

There are many methods provided by the RxJava library for Observable creation. Let's look at these methods and understand when to use each method:

  • create()
  • just()
  • defer()
  • empty()
  • never()
  • error()
  • range()
  • interval()
  • timer()
  • from()

create()

Create an Observable from scratch by means of a function:

val createObserver = Observable.create(ObservableOnSubscribe<String> { emitter ->
    emitter.onNext("Hello World")
    emitter.onComplete()
})

The create factory method is the preferred way to implement custom observable sequences. Essentially, this method allows you to specify a delegate that will be executed every time a subscription is made.

Note: Flowable.create() must also specify the backpressure behavior to be applied when the user-provided function generates more items than the downstream consumer has requested.

ObservableOnSubscribe is a functional interface that has a subscribe() method that receives an instance of an ObservableEmitter instance that allows pushing events in a cancellation-safe manner. Have a look at the interface:

/* @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

just()

This is one of the easiest and convenient ways to create observable. just() constructs a reactive type by taking a pre-existing object and emitting that specific object to the downstream consumer upon subscription. The just operator converts an item into an Observable that emits that item.

val justObservable = Observable.just(4, 5, 6, null)

Remember that if you pass null to Just, it will return an Observable that emits null as an item. Don’t make the mistake of assuming this will return an empty Observable to Just — it will return an Observable that emits null as an item.

Observable.just() emits whatever is present inside the just function. It can take between two and nine parameters. If you pass a list or array in just() it will emit the list or array only.

defer()

defer() does not create the Observable until the observer subscribes and creates a fresh Observable for each observer. The Defer operator waits until an observer subscribes to it, then it generates an Observable, typically with an Observable factory function. It does this creation for each subscriber — although each subscriber may think it’s subscribing to the same Observable, in fact, each subscriber gets its own individual sequence.

val observable = Observable.defer {
    val time = System.currentTimeMillis()
    Observable.just(time)
}

In some circumstances, waiting until the last minute (that is, until subscription time) to generate the Observable can ensure it contains the latest data.

empty()

empty() creates an Observable that emits no items to but terminates normally. This type of source signals completion immediately upon subscription. It returns an Observable that emits no items to the Observer and immediately invokes its onComplete() method.

val empty = Observable.empty()

empty.subscribe(
        { v -> println("This should never be printed!") },
        { error -> println("Or this!") },
        { println("Done will be printed.") })

never()

never() Creates an Observable that emits no items and does not terminate. This type of source does not signal any onNext, onSuccess, onError or onComplete. This type of reactive source is useful for testing or disabling certain sources in combinator operators.

val never = Observable.never()

never.subscribe(
        { v -> println("This should never be printed!") },
        { error -> println("Or this!") },
        { println("This neither!") })

error()

error() signals an error, either pre-existing or generated via a java.util.concurrent.Callable, to the consumer.

val error = Observable.error(IOException())
error.subscribe(
        { v -> println("This should never be printed!") },
        { e -> e.printStackTrace() },
        { println("This neither!") })

onErrorResumeNext() instructs an ObservableSource to pass control to another ObservableSource, rather than invoking Observer.onError(), if it encounters an error in a chain of sequence.

If you pass another ObservableSource resume sequence to an ObservableSource’s onErrorResumeNext() method, if the original ObservableSource encounters an error, instead of invoking its Observer’s onError() method, it will relinquish control to resume sequence which will invoke the Observer’s onNext() method if it is able to do so. In such a case, the Observer may never know that an error has occurred. You can use this to prevent errors from propagating or to supply fallback data should errors be encountered.

val observable = Observable.fromCallable {
    if (Math.random() < 0.5) {
        throw IOException()
    }
    throw IllegalArgumentException()
}

val result = observable.onErrorResumeNext { error ->
    if (error is IllegalArgumentException) {
        return@onErrorResumeNext Observable.empty()
    }
    Observable.error(error)
}
for (i in 0..9) {
    result.subscribe(
            { v -> println("This should never be printed!") },
            { error -> error.printStackTrace() },
            { println("Done") })
}

range()

range() creates an Observable that emits a particular range of sequential integers. The Range operator emits a range of sequential integers in order, where you select the start of the range and its length. It generates a sequence of values for each individual consumer. The range() method generates Integers, the rangeLong() generates Longs.

val greeting = "Hello World!"

val indexes = Observable.range(0, greeting.length)

val characters = indexes
        .map { index -> greeting[index] }

characters.subscribe({ character -> print(character) }, { error -> error.printStackTrace() },
        { println() })

interval()

interval() creates an Observable that emits a sequence of integers spaced by a given time interval. The Interval operator returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between emissions.

val clock = Observable.interval(1, TimeUnit.SECONDS)

clock.subscribe { time ->
    if (time!! % 2 == 0L) {
        println("Tick")
    } else {
        println("Tock")
    }
}

timer()

timer() creates an Observable that emits a particular item after a given delay that we specify.

val eggTimer = Observable.timer(5, TimeUnit.MINUTES)

eggTimer.blockingSubscribe { v -> println("Egg is ready!") }

from

from is used to convert various other objects and data types into Observables. When we work with Observables, it can be more convenient if all the data you mean to work with can be represented as Observables, rather than as a mixture of Observables and other types. This allows you to use a single set of operators to govern the entire lifespan of the data stream.

fromIterable()

fromIterable() signals the items from a java.lang.Iterable source (such as Lists, Sets or Collections or custom Iterables) and then completes the sequence. Converts an Iterable sequence into an ObservableSource that emits the items in the sequence.

val numbers = ArrayList<Int>()
numbers.add(1)
numbers.add(2)
numbers.add(3)
numbers.add(4)
val fromObservable = Observable.fromIterable(numbers)

fromArray()

fromArray() converts an Array into an ObservableSource that emits the items in the Array.

val observable = Observable.fromArray(array)

observable.subscribe(
        { item -> println(item) }, 
        { error -> error.printStackTrace() },
        { println("Done") })

Note: RxJava does not support primitive arrays, only (generic) reference arrays.

fromCallable()

When a consumer subscribes, the given java.util.concurrent.Callable is invoked and its returned value (or thrown exception) is relayed to that consumer.

In other words, it returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function. This allows you to defer the execution of the function you specify until an observer subscribes to the ObservableSource. That’s to say, it makes the function “lazy.”

val callable = Callable {
    println("Hello World!")
    return@Callable ("Hello World!")
}
val observable = Observable.fromCallable(callable)

observable.subscribe(
        { item -> println(item) }, 
        { error -> error.printStackTrace() },
        { println("Done") })

fromAction()

fromAction() returns a Completable instance that runs the given Action for each subscriber and emits either an unchecked exception or simply completes.

val action = Action{ println("Hello World!") }

val completable = Completable.fromAction(action)

completable.subscribe(
        { println("Done") }, 
        { error -> error.printStackTrace() }
)

fromRunnable()

fromRunnable() returns a Completable instance that subscribes to the given Observable, ignores all values and emits only the terminal event.

val runnable = { println("Hello World!") }

val completable1 = Completable.fromRunnable(runnable)

completable.subscribe(
        { println("Done") },
        { error -> error.printStackTrace() }
)

Note: The difference between fromAction and fromRunnable is that the Action interface allows throwing a checked exception while the java.lang.Runnable does not.

fromFuture()

fromFuture() converts a java.util.concurrent.Future into an ObservableSource. We can convert any object that supports the Future interface into an ObservableSource that emits the return value of the Future.get() method of that object, by passing the object into the from() method.

val executor = Executors.newSingleThreadScheduledExecutor()

val future = executor.schedule({ "Hello world!" }, 1, TimeUnit.SECONDS)

val observable = Observable.fromFuture<String>(future)

observable.subscribe(
        { item -> println(item) },
        { error -> error.printStackTrace() },
        { println("Done") })

That’s not everything there is to know about Observables — there’s much more.

Bonus

Eager to learn more about Rx please continue your reading on the Series Complete Guide on RxJava.

References

Creating Observables

Please let me know your suggestions and comments. Thank you for reading.

Android
Android App Development
Programming
Kotlin
Java
Recommended from ReadMedium