Rxjs has such a variety of operators that it’s taken two years for me to write one which couldn’t be built using the lazy approach.
For anyone unfamiliar, rxjs operators are composable pieces of logic; basic building blocks which we combine to create our own data transformations. Operators are to rxjs as components are to React—with a big difference in that unlike React, rxjs comes with its own comprehensive pre-built component library!
Most custom operators can be built by reusing these existing operators. They cover so many different situations that most requirements can be expressed this way without hand-coding the logic.
As an example, here’s a simple custom operator which wraps
Math.pow for convenience.
It transforms every value passed into it, raising the numbers by the exponent specified when we call the operator. If we call
pow(2), it squares every number passed in; if we call
pow(3) it cubes them.
It is implemented here as an extension of the standard rxjs
We can also combine multiple existing operators using the
For example, what if we need to use our
pow operator somewhere else, and we’re unable to guarantee that every value we pass in is a number? If we try to call
Math.pow on something which is not a number we’ll get broken output, so our operator built using
map alone is not sufficient.
Instead, we need to ignore any non-numeric values passed in completely. We can do this by using the
filter operator in addition to
We can compose any operators together, including the custom ones we create.
This is a powerful approach, which lets us express complex functionality in a concise and consistent way, built on top of code which has been battle-tested in millions of apps for many years.
Beyond this type of operator, there’s another level of flexibility available if we write our operator using raw observables.
While experimenting with an API idea, I wanted to create some behaviour which I was unable to build by composing operators. My experiment felt a bit contrived, but it’s a use case which could exist in the real world, so let’s explore it.
I wanted to combine two things: an interval, and a counter. I wanted the delay of the interval to equal the current value of the counter. When we increase the counter, the time between each interval value should increase; when we decrease the counter, the time between each interval value should decrease. Ideally, when the counter changes, it should wait until the next iteration before changing the interval’s delay.
Let’s walk through some marble diagrams so we can visualise what’s happening, and understand the flawed behaviour which emerged from my first couple of attempts (built by composing existing operators).
We’ll use a simplified example. The
interval stream starts with a 2 second delay. When we emit an event in the
alter delay stream, the delay changes to 4 seconds. The
combination stream in the marble diagram shows how we want our operator to combine these events.
My first attempt looked like it did the job at first.
It’s slightly different to the original intent - when the
alter delay stream emits an event, the
interval immediately switches over to a new interval. This seemed to work, but it felt like the timing was a little off when transitioning from one delay to another.
After playing with it for a while, the flaw became clear.
When we alter the delay too quickly, we don’t get the expected number of events, since the interval is cleared and re-created each time. I was altering the delay via button clicks—when clicking quickly this flaw was painfully obvious.
I also created a similar version of this operator with tweaked behaviour to see if a simple workaround was possible.
It differs from the previous attempt in one way: when a new
interval stream is created, it immediately starts with an initial event. This meant that clicking quickly would no longer cause the
combination observable to appear frozen.
However, it’s still far from what I had in mind—it also breaks when the delay changes too quickly.
combination gets very noisy when the delay changes very often—no better a result than the previous broken version.
So how can we build this in a way which isn’t broken?
Here’s the marble diagram for the desired behaviour again:
alter delay: ------o---------->2s interval: -o-o-o-o|4s interval: ---o---o->combination: -o-o-o-o---o---o->
And the original intent:
Given an interval and a counter, the delay of the interval should equal the current value of the counter. When we increase the counter, the time between each interval value should increase; when we decrease the counter, the time between each interval value should decrease. Ideally, when the counter changes, it should wait until the next iteration before changing the interval’s delay.
The solution was to dive a little deeper and handle things manually. We’ll start by creating an operator which does not alter any events, as a kind of blank slate.
This code would be used as a pipeable operator, like
delay$.pipe(dynamicInterval()). The inner function takes a source observable as its only argument, and returns a new observable which will forward data on to its subscribers.
- We call
new Observableto create an observable
- We return a subscription to the
- Inside this subscription, we forward the
completesignals on to the
The standard rxjs
interval operator returns an observable which continually counts upwards. Since we want a custom variant of this, we’ll need to keep track of some state. We’ll add this into the inner closure, so that the state persists for each individual subscription.
We’ll also need to replace the
next function. For the
complete events we don’t want to do anything special, so we can leave them unchanged.
For starters, we’ll store a
count as our first piece of internal state.
Here, we increment the count every time the source emits a value, and forward the count on to the subscriber. The code so far essentially counts the number of events output by the source observable.
Let’s start working on our
next function by trying
setInterval, to see how it could easily go wrong.
Every time the source emits a new duration, we clear the previous interval, and create a new one with the new duration. Every time the interval runs, it increments the
count state and forwards it to the observer.
This results in the problem behaviour we encountered earlier where we clear the interval before it has a chance to emit the next value.
Instead, we need to completely decouple the interval from the source.
We’ll store the duration as another piece of state. Whenever the source sends the next duration, we’ll update this state, but we’ll handle the interval separately.
Rather than using
setInterval, we’ll call
setTimeout recursively to get the effect we’re looking for.
Every time we call our
loop function, the following steps occur:
- We set a timeout which waits for the correct
durationto pass, then fires the callback.
countstate is sent to the observer and incremented.
loopfunction is called again.
Note that this means the output always waits until the timeout has completed before it is sent. The
loop function always uses the most recent
duration from source observable when setting the next timeout.
This behaviour is exactly what we’re looking for! Now we just need to kickstart the loop.
For the final step, we want to kickstart the
loop when the source emits its first value, without unintentionally creating more than one loop.
To do this, we add one more piece of state: a reference to the current timeout id. If there is no timeout id when the
next function gets called, we start the loop. If a timeout exists, we let the existing loop continue.
This gives us exactly the functionality we wanted.
The subscriber receives a continuously updated count, and the delay in between each value dynamically changes to the latest value forwarded into the operator by the source observable.
This cleanly encapsulates the concept of a dynamic interval into one operator. We can now use it as easily as this:
Reactive programming has a reputation for being intimidating, but when we break it down to its building blocks it’s a lot less difficult than we might expect. We just have to take it one step at a time.
As for the API idea I’ve been experimenting with? Let’s just say I like it a lot more than hooks…