Coroutines and RxJava – Part 2: Cancelling Execution

Tram Ho

In Part 1, we learned how to perform heavy computing tasks in the background. What if we wanted to interrupt that computation? Part 2 is about Canceling Execution.

What does Canceling Execution mean?

We want to be able to execute a computation thread that was created using RxJava or Coroutines. This computation flow may or may not be synchronous.

This is important in different use cases when developing Android – the most common one could be when the View is destroyed. If that happens, we may want to cancel ongoing executions like network request, heavy object initialization, etc.


As in Part 1, we will ignore the possibility of passing element streams. How can we cancel execution with RxJava?

Imagine we create a timer using the interval operator.

When you subscribe to this observable, the timer starts and it sends an event to the subscriber every second after subscribing.

How can you cancel that timer?

When you subscribe to the timer (called .subscribe() ), it returns a Disposable object.

You can call dispose() on the Disposable object to cancel execution. Observable will end emitting items.

Thus, we have to cancel the asynchronous calculation that Observable created.


If you create your own Observable manually without using any constructor (like interval ), you don’t need to handle the computation cancel yourself.

This Observable is not ready to be destroyed. If we want that to happen, we need to check if the emitter is still registered before calling it.

If the subscriber is no longer available, we can skip releasing the remaining items. If we don’t do that, the code will keep running and ignore the emitter.onNext(i) under the Observable.create source code.


Coroutine is an instance of computation. Canceling a coroutine means stopping its lambda suspending.

We can cancel the implementation with Coroutine Job, which is part of the Coroutine Context.

Coroutine Job shows a method to cancel the Coroutine execution. As we might expect, that method is called cancel() .

For example, the Coroutine Builder launch returns the Coroutine Job it creates.

We can assign that Job to a variable and call cancel.

That is an example of taking a Job from Coroutine and destroying it. Can we do it in a different way? Yes, it is also possible to specify a Coroutine Job. We can do that in many ways.

Some Coroutine Builders (eg launch and async ) take a parent parameter where you can set the Job for the Coroutine to be created.

One of the benefits of this method is that you can share that parentJob with multiple Coroutines, so when you call parentJob.cancel (), you will cancel the execution of coroutines whose parentJob is their Job.

This approach is similar to RxJava CompositeDisposable in that you can cancel multiple registers at once.

NOTE: You should be careful when sharing Jobs between different Coroutines. When you cancel a Job, you need to reassign it. You cannot start another Coroutine with that Job, you will have to create a new Coroutine.

When you cancel a Job, you need to reassign it.

Another way to do this is to incorporate Coroutine Contexts. You can use the plus operator to do this.

In this case, Coroutine’s Coroutine Context result is a combination of parentJob and CommonPool. The threading policy will be defined from CommonPool and the Job value from parentJob.

If you want to learn more about combining contexts, you can read this section of the Kotlin Coroutines documentation.


Just like RxJava, you must consider canceling in Coroutines.

If we tried to execute this code, it would repeat the heavy computation 5 times because the code was not ready to be destroyed.

How can we improve it?

Much like how we check if the subscriber is present in RxJava, we need to check if Coroutine is working or not.

isActive is an internal variable that can be accessed inside Coroutine ( coroutineContext is another variable).

Some suspending functions available in the Standard Coroutines library handle the cancellation for us. Consider delay.

A delay is a suspending function that can handle cancellations for us. However, if you use Thread.sleep instead of delay, since it is blocking the thread and not suspending coroutine, it won’t cancel.

Thread.sleep does not cancel the execution for us. It’s not even a suspending function! That coroutine will not be canceled even if we call job.cancel ().

Thread.sleep is not something you should use in this case. If you really need to, one way to deactivate coroutine is to check if it works just before and after blocking the thread.

Next part

The third part of this series will cover streaming elements.

What is the difference between Observable and Channels? Subjects and Broadcast Channels?


Share the news now

Source : Viblo