Dynamic Interval Operator
December 30, 2019 • 11 min read
Rxjs has such a variety of operators that it’s taken two years for me to write one which couldn’t be built using the lazy approach.
For anyone unfamiliar, rxjs operators are composable pieces of logic; basic building blocks which we combine to create our own data transformations. Operators are to rxjs as components are to React—with a big difference in that unlike React, rxjs comes with its own comprehensive pre-built component library!
Most custom operators can be built by reusing these existing operators. They cover so many different situations that most requirements can be expressed this way without hand-coding the logic.
As an example, here’s a simple custom operator which wraps Math.pow
for convenience.
It transforms every value passed into it, raising the numbers by the exponent specified when we call the operator. If we call pow(2)
, it squares every number passed in; if we call pow(3)
it cubes them.
It is implemented here as an extension of the standard rxjs map
operator.
We can also combine multiple existing operators using the pipe
method.
For example, what if we need to use our pow
operator somewhere else, and we’re unable to guarantee that every value we pass in is a number? If we try to call Math.pow
on something which is not a number we’ll get broken output, so our operator built using map
alone is not sufficient.
Instead, we need to ignore any non-numeric values passed in completely. We can do this by using the filter
operator in addition to map
.
We can compose any operators together, including the custom ones we create.
This is a powerful approach, which lets us express complex functionality in a concise and consistent way, built on top of code which has been battle-tested in millions of apps for many years.
Beyond this type of operator, there’s another level of flexibility available if we write our operator using raw observables.
While experimenting with an API idea, I wanted to create some behaviour which I was unable to build by composing operators. My experiment felt a bit contrived, but it’s a use case which could exist in the real world, so let’s explore it.
I wanted to combine two things: an interval, and a counter. I wanted the delay of the interval to equal the current value of the counter. When we increase the counter, the time between each interval value should increase; when we decrease the counter, the time between each interval value should decrease. Ideally, when the counter changes, it should wait until the next iteration before changing the interval’s delay.
Let’s walk through some marble diagrams so we can visualise what’s happening, and understand the flawed behaviour which emerged from my first couple of attempts (built by composing existing operators).
We’ll use a simplified example. The interval
stream starts with a 2 second delay. When we emit an event in the alter delay
stream, the delay changes to 4 seconds. The combination
stream in the marble diagram shows how we want our operator to combine these events.
My first attempt looked like it did the job at first.
It’s slightly different to the original intent - when the alter delay
stream emits an event, the interval
immediately switches over to a new interval. This seemed to work, but it felt like the timing was a little off when transitioning from one delay to another.
After playing with it for a while, the flaw became clear.
When we alter the delay too quickly, we don’t get the expected number of events, since the interval is cleared and re-created each time. I was altering the delay via button clicks—when clicking quickly this flaw was painfully obvious.
I also created a similar version of this operator with tweaked behaviour to see if a simple workaround was possible.
It differs from the previous attempt in one way: when a new interval
stream is created, it immediately starts with an initial event. This meant that clicking quickly would no longer cause the combination
observable to appear frozen.
However, it’s still far from what I had in mind—it also breaks when the delay changes too quickly.
The combination
gets very noisy when the delay changes very often—no better a result than the previous broken version.
So how can we build this in a way which isn’t broken?
Here’s the marble diagram for the desired behaviour again:
alter delay: ------o---------->2s interval: -o-o-o-o|4s interval: ---o---o->combination: -o-o-o-o---o---o->
And the original intent:
Given an interval and a counter, the delay of the interval should equal the current value of the counter. When we increase the counter, the time between each interval value should increase; when we decrease the counter, the time between each interval value should decrease. Ideally, when the counter changes, it should wait until the next iteration before changing the interval’s delay.
The solution was to dive a little deeper and handle things manually. We’ll start by creating an operator which does not alter any events, as a kind of blank slate.
This code would be used as a pipeable operator, like delay$.pipe(dynamicInterval())
. The inner function takes a source observable as its only argument, and returns a new observable which will forward data on to its subscribers.
- We call
new Observable
to create an observable - We return a subscription to the
source
observable - Inside this subscription, we forward the
next
,error
, andcomplete
signals on to theobserver
.
The standard rxjs interval
operator returns an observable which continually counts upwards. Since we want a custom variant of this, we’ll need to keep track of some state. We’ll add this into the inner closure, so that the state persists for each individual subscription.
We’ll also need to replace the next
function. For the error
or complete
events we don’t want to do anything special, so we can leave them unchanged.
For starters, we’ll store a count
as our first piece of internal state.
Here, we increment the count every time the source emits a value, and forward the count on to the subscriber. The code so far essentially counts the number of events output by the source observable.
Let’s start working on our next
function by trying setInterval
, to see how it could easily go wrong.
Every time the source emits a new duration, we clear the previous interval, and create a new one with the new duration. Every time the interval runs, it increments the count
state and forwards it to the observer.
This results in the problem behaviour we encountered earlier where we clear the interval before it has a chance to emit the next value.
Instead, we need to completely decouple the interval from the source.
We’ll store the duration as another piece of state. Whenever the source sends the next duration, we’ll update this state, but we’ll handle the interval separately.
Rather than using setInterval
, we’ll call setTimeout
recursively to get the effect we’re looking for.
Every time we call our loop
function, the following steps occur:
- We set a timeout which waits for the correct
duration
to pass, then fires the callback. - The
count
state is sent to the observer and incremented. - The
loop
function is called again.
Note that this means the output always waits until the timeout has completed before it is sent. The loop
function always uses the most recent duration
from source observable when setting the next timeout.
This behaviour is exactly what we’re looking for! Now we just need to kickstart the loop.
For the final step, we want to kickstart the loop
when the source emits its first value, without unintentionally creating more than one loop.
To do this, we add one more piece of state: a reference to the current timeout id. If there is no timeout id when the next
function gets called, we start the loop. If a timeout exists, we let the existing loop continue.
This gives us exactly the functionality we wanted.
The subscriber receives a continuously updated count, and the delay in between each value dynamically changes to the latest value forwarded into the operator by the source observable.
This cleanly encapsulates the concept of a dynamic interval into one operator. We can now use it as easily as this:
duration$.pipe(dynamicInterval())
Reactive programming has a reputation for being intimidating, but when we break it down to its building blocks it’s a lot less difficult than we might expect. We just have to take it one step at a time.
As for the API idea I’ve been experimenting with? Let’s just say I like it a lot more than hooks…