Struct TimingWheels

This structure implements scheme 6.2 thom the http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf and supports several primitives:

Each operation take O(1) time.

struct TimingWheels(T) ;

Methods

NameDescription
advance Adnvance wheel and return all timers expired during wheel turn.
cancel Cancel timer
currStdTime Return internal view on current time - it is time at the call to init plus total number of steps multiplied by tick duration.
schedule Schedule timer to ticks ticks forward from internal 'now'.
ticksToCatchUp Number of ticks to rotate wheels until internal wheel 'now' catch up with real world realTime. Calculation based on time when wheels were stared and total numer of ticks pasded.
timeUntilNextEvent Time until next scheduled timer event. You provide tick size and current real world time. This function find ticks until next event and use time of the start and total steps executed to calculate time delta from realNow to next event.

Example

import std;
globalLogLevel = LogLevel.info;
auto rnd = Random(142);

/// track execution
int  counter;
SysTime last;

/// this is our Timer
class Timer
{
    static ulong __id;
    private ulong _id;
    private string _name;
    this(string name)
    {
        _id = __id++;
        _name = name;
    }
    /// must provide id() method
    ulong id()
    {
        return _id;
    }
}

enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs

// each tick span 5 msecs - this is our link with time in reality
auto Tick = getValue!Duration();
TimingWheels!Timer w;
w.init();
auto durationToTicks(Duration d)
{
    // we have to adjust w.now and realtime 'now' before scheduling timer
    auto real_now = Clock.currStdTime;
    auto tw_now = w.currStdTime(Tick);
    auto delay = (real_now - tw_now).hnsecs;
    return (d + delay)/Tick;
}
void process_timer(Timer t)
{
    switch(t._name)
    {
        case "periodic":
            if ( last.stdTime == 0)
            {
                // initialize tracking
                last = Clock.currTime - 50.msecs;
            }
            auto delta = Clock.currTime - last;
            shouldApproxEqual((1e0*delta.split!"msecs".msecs), 50e0,1e-1);
            writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs);
            last = Clock.currTime;
            counter++;
            w.schedule(t, durationToTicks(50.msecs)); // rearm
            break;
        default:
            writefln("@ %s", t._name);
            break;
    }
}
// some random initial delay
auto randomInitialDelay = uniform(0, 500, rnd).msecs;
Thread.sleep(randomInitialDelay);
//
// start one arbitrary timer and one periodic timer
//
auto some_timer = new Timer("some");
auto periodic_timer = new Timer("periodic");
w.schedule(some_timer, durationToTicks(32.msecs));
w.schedule(periodic_timer, durationToTicks(50.msecs));

while(counter < 10)
{
    auto realNow = Clock.currStdTime;
    auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs;
    auto nextTimerEvent = max(w.timeUntilNextEvent(Tick, realNow), 0.msecs);
    // wait for what should happen earlier
    auto time_to_sleep = min(randomIoInterval, nextTimerEvent);
    writefln("* sleep until timer event or random I/O for %s", time_to_sleep);
    Thread.sleep(time_to_sleep);
    // if we waked up early by the IO event then timeUntilNextEvent will be positive
    // otherwise it will be <= 0 and we have something to process.
    realNow = Clock.currStdTime;
    int ticks = w.ticksToCatchUp(Tick, realNow);
    if (ticks  > 0)
    {
        auto wr = w.advance(ticks);
        foreach(t; wr.timers)
        {
            process_timer(t);
        }
    }
    // some random processing time
    Thread.sleep(uniform(0, 5, rnd).msecs);
}

Example

import std;
globalLogLevel = LogLevel.info;
auto rnd = Random(142);

/// track execution
int  counter;
SysTime last;

/// this is our Timer
class Timer
{
    static ulong __id;
    private ulong _id;
    private string _name;
    this(string name)
    {
        _id = __id++;
        _name = name;
    }
    /// must provide id() method
    ulong id()
    {
        return _id;
    }
}

enum IOWakeUpInterval = 100; // to simulate random IO wakeups in interval 0 - 100.msecs

// each tick span 5 msecs - this is our link with time in reality
auto Tick = getValue!Duration();
TimingWheels!Timer w;
w.init();
auto durationToTicks(Duration d)
{
    // we have to adjust w.now and realtime 'now' before scheduling timer
    auto real_now = Clock.currStdTime;
    auto tw_now = w.currStdTime(Tick);
    auto delay = (real_now - tw_now).hnsecs;
    return (d + delay)/Tick;
}
void process_timer(Timer t)
{
    switch(t._name)
    {
        case "periodic":
            if ( last.stdTime == 0)
            {
                // initialize tracking
                last = Clock.currTime - 50.msecs;
            }
            auto delta = Clock.currTime - last;
            shouldApproxEqual((1e0*delta.split!"msecs".msecs), 50e0,1e-1);
            writefln("@ %s - delta: %sms (should be 50ms)", t._name, (Clock.currTime - last).split!"msecs".msecs);
            last = Clock.currTime;
            counter++;
            w.schedule(t, durationToTicks(50.msecs)); // rearm
            break;
        default:
            writefln("@ %s", t._name);
            break;
    }
}
// some random initial delay
auto randomInitialDelay = uniform(0, 500, rnd).msecs;
Thread.sleep(randomInitialDelay);
//
// start one arbitrary timer and one periodic timer
//
auto some_timer = new Timer("some");
auto periodic_timer = new Timer("periodic");
w.schedule(some_timer, durationToTicks(32.msecs));
w.schedule(periodic_timer, durationToTicks(50.msecs));

while(counter < 10)
{
    auto realNow = Clock.currStdTime;
    auto randomIoInterval = uniform(0, IOWakeUpInterval, rnd).msecs;
    auto nextTimerEvent = max(w.timeUntilNextEvent(Tick, realNow), 0.msecs);
    // wait for what should happen earlier
    auto time_to_sleep = min(randomIoInterval, nextTimerEvent);
    writefln("* sleep until timer event or random I/O for %s", time_to_sleep);
    Thread.sleep(time_to_sleep);
    // if we waked up early by the IO event then timeUntilNextEvent will be positive
    // otherwise it will be <= 0 and we have something to process.
    realNow = Clock.currStdTime;
    int ticks = w.ticksToCatchUp(Tick, realNow);
    if (ticks  > 0)
    {
        auto wr = w.advance(ticks);
        foreach(t; wr.timers)
        {
            process_timer(t);
        }
    }
    // some random processing time
    Thread.sleep(uniform(0, 5, rnd).msecs);
}