Writing Custom Rx Operators Easily with Kotlin

/ Development Tips / 3 min read

ReactiveX or Rx is famous for it's arsenal of powerful operators that makes working with data a piece of cake with just a few lines of code.

However, there comes situations when you need to go the extra mile to spin up your own custom operators that suit your application's needs. Although, the situation is pretty rare, you might find yourself lurking in that alley sometime during your development career.

The usecase of writing your own operator is not limited to writing operators that are totally different from the bundled ones. Sometimes, you might need to aggregate two or more standard operators to a single one or have a specific config for a particular operator defined for easy reuse.

Take this case as an example:

Preventing multiple taps

Users are notorious. Sometimes, they like to shower tap love on those fancy buttons in your app. This seemingly harmless behaviour can screw up your app's behaviour if not handled properly.

The good news is that handling this behaviour is quite simple with Rx if you can transform the button click events into emissions of an Observable.

You can either create your own Observable for this purpose or use Jake Wharton's fantastic library called RxBindings to do the job for you.

We'll be using the latter in here.

Have a look at how you can prevent multiple taps within 1 second using the RxBindings library and some Rx magic:

          .throttleFirst(1, TimeUnit.SECONDS)
          .subscribe { _ -> Toast.makeText(this, "Button clicked!", Toast.LENGTH_LONG).show() }

The magic in here is being done by the throttleFirst() operator. throttleFirst(1, TimeUnit.SECONDS) allows the first tap to pass downstream within a second, cutting off the rest within the window.

Simple, isn't it?

But wait a second. That's already an one-liner. How can we make it simpler? Also, why do we need to?

The answer is obvious.

You'll be using this throttling operator with probably the same configuration over and over again within your app. Therefore, it only makes sense that you define the config in an easy to call function saving you from some unnecessary keystrokes.


For Rx to work, you need to add the following dependency in your app's build.gradle file located within your Android project:

implementation 'io.reactivex.rxjava2:rxkotlin:2.2.0'

For RxBindings to work, you need to add this dependency:

implementation 'com.jakewharton.rxbinding2:rxbinding-kotlin:2.1.1'

Now let's get back to making things simpler.

Method 1: Utility functions

The first and the most obvious option is to write your own utility function which takes an input Observable, adds the throttling logic to it and returns it back to the caller.

Something like this:

fun <T> filterRapidClicks(observable: Observable<T>) =
            observable.throttleFirst(1, TimeUnit.SECONDS)

Then you can easily reuse this function wherever you need to prevent rapid clicks by using the compose{} operator in your Observable chain, like this:

          .compose { ObservableUtil.filterRapidClicks(it) }
          .subscribe { _ -> Toast.makeText(this, "Button clicked!", Toast.LENGTH_LONG).show() }

This little trick saved us from our initial hassle of defining the throttle duration on every tappable element in our app. Sweet!

Also, if you need to change the window size from 1 second to 500 milliseconds, you just need to change it once within this function instead to taking a cruise through your project and changing it everywhere.


However, this is Kotlin. And with Kotlin, you don't use the average solution. You put your average solution on steroids and make them exceptional.

That's why, the next method is the Upcurve recommended approach to writing custom operators in Kotlin.

Method 2: Extension functions

Kotlin's extensions functions are like armour upgrades. You take your regular classes and add more features to them without altering the actual class.

How cool is that?

You can read more about extension functions in this article here.

Now, back to the million dollar question. How do we make the existing function that we have created in the previous step simpler using extension functions?

We create an extension of the Observable<T> class and add the logic to it. That's all.

Here's how you can define such an extension function:

fun <T> Observable<T>.filterRapidClicks() = throttleFirst(1, TimeUnit.SECONDS)

Assuming that you've declared the function globally in a .kt file, you can use this function as a regular Rx operator on all of your observables, like this:

          .subscribe { _ -> Toast.makeText(this, "Button clicked!", Toast.LENGTH_LONG).show() }

And, bam! You have created your very first custom operator in record time.

Explore your codebase further to find out repeated operations and start creating your very own arsenal of custom operators.

Wrapping up

Whenever you're solving a problem, use Occam's razor:

More things should not be used than are necessary.

Don't write extra stuff or make things complicated. Use your language's powerful tools to make your coding life simpler and easier.

Loved this article? Share it and spread some love and yeah, knowledge too.