I learned about Reactive Extensions (Rx) about four years ago and have been using it since on several projects where appropriate, learning new stuff on the way. Yesterday I wanted to implement a sliding expiration algorithm using Rx and since other people might benefit from it, I thought I'd share my code.

First of all, what is a sliding expiration? The best example is a standard http session timeout. Most web frameworks support sessions (via a cookie for example) that are kept alive after a maximum period of inactivity. So while you keep clicking through the website your session state is maintained but if you stop interacting with the website for half an hour, your session expires.

My problem was actually the reverse: I'm interacting with a web API that supports sessions and I want to keep the session alive from my end. So when the session expiry time approaches because I've been inactive for a while, I'd like to send a 'ping' to the server to force it not to expire my session. The easy approach would of course be to run some code in a loop that sends a ping periodically. But, well, I didn't want to send a ping when there is regular session activity, only when there actually is a period of inactivity.

How does one model this problem using Rx? First of all, regular session activity can be modeled as a stream of events. Each time a request is sent to the server, an event can be generated. In my case, this event stream is an IObservable<DateTimeOffset>. Second of all, we want someone to notify us when the session is about to expire (or rather some time before that happens). This can also be modeled as a stream of events and again I use IObservable<DateTimeOffset> for that. By the way, DateTimeOffset could just as well have been something else, it just provides a nice opportunity to log when something happens.

So what we need is a function that transforms a stream of regular session activity events to a stream of session timeout notifications: Func<IObservable<DateTimeOffset>, IObservable<DateTimeOffset>>. This function looks as follows (note I use the task pool scheduler to schedule work, this could be any other scheduler):

const int MaxInactivityTime = 25;
var taskPoolScheduler = TaskPoolScheduler.Default;

Func<IObservable<DateTimeOffset>, IObservable<DateTimeOffset>> pendingSessionTimeouts =
  activityNotifications => Observable.Create(pendingSessionTimeoutHandler =>
    // Initial CTS.
    var cts = new CancellationTokenSource();

    IDisposable scheduleAsync = taskPoolScheduler.ScheduleAsync(
      async (scheduler, token) =>
        // Run a loop until the subscription is disposed of.
        while (!token.IsCancellationRequested)
            // Pause scheduler and yield control to other threads. The scheduler pauses for
            // the maximum inactivity time we want to allow before we want a session refresh.
            // When regular session activity happens, the CTS is canceled and we reenter the
            // loop.
            await scheduler.Sleep(TimeSpan.FromMinutes(MaxInactivityTime), cts.Token);

            // Notify interested subscribers that the session is about to time out.
          catch (OperationCanceledException)
            // Expected behavior: while regular session activity is happening, we'd like to
            // slide our expiration to the future.
            cts = new CancellationTokenSource();
          catch (Exception e)
            // This should never happen and be logged somewhere. To enable the loop to
            // continue, we create a new CTS here as well.
            cts = new CancellationTokenSource();

      .Subscribe(_ =>
        // Cancel the current session inactivity sleep.

The code above runs a loop that start a Sleep. When this operation ends after the allowed session inactivity time, we notify subscribers that a session refresh is needed. Moreover, the loop is reentered and a new Sleep is started. If the Sleep is canceled, we reenter the loop and start a new sleep immediately. Cancellations come from regular session activity: while this happens, there is no need to refresh the session.

Note that the regular session activity notifications are observed on the task pool scheduler and sampled (in this case once every 30 seconds). We do not want regular session activity to be slowed down by a component that prevents session timeouts.

How do we use this function? That has become rather simple, given that we have some observable that generates regular session activity events. In my actual scenario I use a Subject<DateTimeOffset> to receive and publish notifications.

IObservable sessionActivity = ...

  .Subscribe(_ => {
    // Perform any action that prevents a session timeout, for example by sending
    // a small request to the web server.

So, that's it. I'm not certain there isn't a better way to accomplish this. I'm now using exception handling (the OperationCanceledException) for control flow which is usually not a good idea. If anyone has a better solution for the same problem, let me know.


Let's talk!

Knowledge is key for our existence. This knowledge we use for disruptive innovation and changing organizations. Are you ready for change?

"*" indicates required fields

First name*
Last name*