Dynamic Interval Operator

December 30, 2019 11 min read

Rxjs has such a variety of operators that it’s taken two years for me to write one which couldn’t be built using the lazy approach.

For anyone unfamiliar, rxjs operators are composable pieces of logic; basic building blocks which we combine to create our own data transformations. Operators are to rxjs as components are to React—with a big difference in that unlike React, rxjs comes with its own comprehensive pre-built component library!

Most custom operators can be built by reusing these existing operators. They cover so many different situations that most requirements can be expressed this way without hand-coding the logic.

filter(value => typeof value === 'number'),
const input = [1, 2, 3, 4, 5]
const pow = exponent => map(x => Math.pow(x, exponent))
of(input).pipe(
pow(2)
)
// -> [1, 4, 9, 16, 25]
of(input).pipe(
pow(3)
)
// -> [1, 8, 27, 64, 125]
const pow = exponent => map(x => Math.pow(x, exponent))
filter(value => typeof value === 'number'),
const pow = exponent => map(x => Math.pow(x, exponent))

As an example, here’s a simple custom operator which wraps Math.pow for convenience.

It transforms every value passed into it, raising the numbers by the exponent specified when we call the operator. If we call pow(2), it squares every number passed in; if we call pow(3) it cubes them.

It is implemented here as an extension of the standard rxjs map operator.

We can also combine multiple existing operators using the pipe method.

For example, what if we need to use our pow operator somewhere else, and we’re unable to guarantee that every value we pass in is a number? If we try to call Math.pow on something which is not a number we’ll get broken output, so our operator built using map alone is not sufficient.

Instead, we need to ignore any non-numeric values passed in completely. We can do this by using the filter operator in addition to map.

We can compose any operators together, including the custom ones we create.

This is a powerful approach, which lets us express complex functionality in a concise and consistent way, built on top of code which has been battle-tested in millions of apps for many years.

Beyond this type of operator, there’s another level of flexibility available if we write our operator using raw observables.

While experimenting with an API idea, I wanted to create some behaviour which I was unable to build by composing operators. My experiment felt a bit contrived, but it’s a use case which could exist in the real world, so let’s explore it.

I wanted to combine two things: an interval, and a counter. I wanted the delay of the interval to equal the current value of the counter. When we increase the counter, the time between each interval value should increase; when we decrease the counter, the time between each interval value should decrease. Ideally, when the counter changes, it should wait until the next iteration before changing the interval’s delay.

alter delay: -----o--------->
alter delay: ------o---------->
2s interval: -o-o-o-o|
4s interval: ---o---o->
combination: -o-o-o-o---o---o->
alter delay: -----o--------->
alter delay: ------o-o-o---->
alter delay: ------o---------->
alter delay: -----o--------->
alter delay: ------o-o-o---->
alter delay: -----o-o-o----->
alter delay: -----o--------->
alter delay: -----o-o-o----->
alter delay: -----o--------->

Let’s walk through some marble diagrams so we can visualise what’s happening, and understand the flawed behaviour which emerged from my first couple of attempts (built by composing existing operators).

We’ll use a simplified example. The interval stream starts with a 2 second delay. When we emit an event in the alter delay stream, the delay changes to 4 seconds. The combination stream in the marble diagram shows how we want our operator to combine these events.

My first attempt looked like it did the job at first.

It’s slightly different to the original intent - when the alter delay stream emits an event, the interval immediately switches over to a new interval. This seemed to work, but it felt like the timing was a little off when transitioning from one delay to another.

After playing with it for a while, the flaw became clear.

When we alter the delay too quickly, we don’t get the expected number of events, since the interval is cleared and re-created each time. I was altering the delay via button clicks—when clicking quickly this flaw was painfully obvious.

I also created a similar version of this operator with tweaked behaviour to see if a simple workaround was possible.

It differs from the previous attempt in one way: when a new interval stream is created, it immediately starts with an initial event. This meant that clicking quickly would no longer cause the combination observable to appear frozen.

However, it’s still far from what I had in mind—it also breaks when the delay changes too quickly.

The combination gets very noisy when the delay changes very often—no better a result than the previous broken version.

So how can we build this in a way which isn’t broken?

Here’s the marble diagram for the desired behaviour again:

alter delay: ------o---------->
2s interval: -o-o-o-o|
4s interval: ---o---o->
combination: -o-o-o-o---o---o->

And the original intent:

Given an interval and a counter, the delay of the interval should equal the current value of the counter. When we increase the counter, the time between each interval value should increase; when we decrease the counter, the time between each interval value should decrease. Ideally, when the counter changes, it should wait until the next iteration before changing the interval’s delay.

const dynamicInterval = () => source =>
new Observable(observer => {
return source.subscribe({
next: value => observer.next(value),
error: err => observer.error(err),
complete: () => observer.complete(),
})
})
next: value => observer.next(count++),
next: value => observer.next(value),
() => observer.next(count++),
next: value => observer.next(count++),
// TODO: somehow call the observer.next() function
() => observer.next(count++),
// TODO: somehow call the observer.next() function
complete: () => observer.complete(),
currentIteration = setTimeout(() => {
complete: () => observer.complete(),
currentIteration = setTimeout(() => {
currentIteration = setTimeout(() => {

The solution was to dive a little deeper and handle things manually. We’ll start by creating an operator which does not alter any events, as a kind of blank slate.

This code would be used as a pipeable operator, like delay$.pipe(dynamicInterval()). The inner function takes a source observable as its only argument, and returns a new observable which will forward data on to its subscribers.

  1. We call new Observable to create an observable
  2. We return a subscription to the source observable
  3. Inside this subscription, we forward the next, error, and complete signals on to the observer.

The standard rxjs interval operator returns an observable which continually counts upwards. Since we want a custom variant of this, we’ll need to keep track of some state. We’ll add this into the inner closure, so that the state persists for each individual subscription.

We’ll also need to replace the next function. For the error or complete events we don’t want to do anything special, so we can leave them unchanged.

For starters, we’ll store a count as our first piece of internal state.

Here, we increment the count every time the source emits a value, and forward the count on to the subscriber. The code so far essentially counts the number of events output by the source observable.

Let’s start working on our next function by trying setInterval, to see how it could easily go wrong.

Every time the source emits a new duration, we clear the previous interval, and create a new one with the new duration. Every time the interval runs, it increments the count state and forwards it to the observer.

This results in the problem behaviour we encountered earlier where we clear the interval before it has a chance to emit the next value.

Instead, we need to completely decouple the interval from the source.

We’ll store the duration as another piece of state. Whenever the source sends the next duration, we’ll update this state, but we’ll handle the interval separately.

Rather than using setInterval, we’ll call setTimeout recursively to get the effect we’re looking for.

Every time we call our loop function, the following steps occur:

  1. We set a timeout which waits for the correct duration to pass, then fires the callback.
  2. The count state is sent to the observer and incremented.
  3. The loop function is called again.

Note that this means the output always waits until the timeout has completed before it is sent. The loop function always uses the most recent duration from source observable when setting the next timeout.

This behaviour is exactly what we’re looking for! Now we just need to kickstart the loop.

For the final step, we want to kickstart the loop when the source emits its first value, without unintentionally creating more than one loop.

To do this, we add one more piece of state: a reference to the current timeout id. If there is no timeout id when the next function gets called, we start the loop. If a timeout exists, we let the existing loop continue.

This gives us exactly the functionality we wanted.

The subscriber receives a continuously updated count, and the delay in between each value dynamically changes to the latest value forwarded into the operator by the source observable.

This cleanly encapsulates the concept of a dynamic interval into one operator. We can now use it as easily as this:

duration$.pipe(dynamicInterval())

Reactive programming has a reputation for being intimidating, but when we break it down to its building blocks it’s a lot less difficult than we might expect. We just have to take it one step at a time.

As for the API idea I’ve been experimenting with? Let’s just say I like it a lot more than hooks…


 Impostor Syndrome vs the Dunning-Kruger Effect

Want more?

Subscribe to get my latest content via email. I won’t send you spam, and you can unsubscribe at any time.