I've been playing around with different ways of expressing the timeout described in my last post. To refresh your memory, I originally defined it like this;
// Once we've told the consumer that we are connected, we will start a timer that will yield false if it ever expires.
Observable.Timer(TimeSpan.FromSeconds(2).Select(_ => false)
There are lots of way you can define this behaviour in Rx, for example;
Observable.Never<bool>().Timeout(TimeSpan.FromSeconds(2), Observable.Return(false))
Observable.Return(false).Delay(TimeSpan.FromSeconds(2))
I'm settling on the latter in this case. Less characters is better :)
To recap, I've now generalized the solution described in the last post into the following method.
private IObservable<bool> IsConnected<T>(IObservable<T> heartbeats, TimeSpan timeout)
{
var connected = Observable.Return(true);
var disconnected = Observable.Return(false).Delay(timeout);
return Observable.Switch
(
from hb in heartbeats
select connected.Concat(disconnected)
).DistinctUntilChanged();
}
It takes a stream of heartbeats for a session & introduces a true/false switch, telling you if the session is active or not.
This will be useful in my next post, "Monitoring cluster nodes using Rx".
Comments