I’m finally returning to my blog after a 2 month hiatus. In a previous series of posts, I was looking into various heart beating patterns in Rx. I generally don’t like “multi part blog series” as there is a pressure to finish them off and I have a tendency to get distracted.
1. http://enumeratethis.com/2010/10/19/heart-beats-keep-alives-rx/
2. http://enumeratethis.com/2010/10/21/refining-the-heat-beat-timeout/
3. http://enumeratethis.com/2010/10/22/monitoring-cluster-nodes-in-rx/
4. Does my cluster have quorum. (You’re looking at it)
Last time we ended up with a query that tells us which nodes in a cluster are currently sending us heart beats;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication1
{
class Program
{
static void Main()
{
//..
// query tells us which nodes are up or down.
var query =
from nodeId in source
group nodeId by nodeId into grp
from isConnected in IsConnected(grp, timeout)
select new { nodeId = grp.Key, isConnected };
//...
}
// takes a stream of heart beats & a timeout period.
// transforms into an observable bool that tells us if we are connected or not.
static IObservable IsConnected(IObservable heartbeats, TimeSpan timeout)
{
var connected = Observable.Return(true);
var disconnected = Observable.Return(false).Delay(timeout);
return Observable.Switch
(
from hb in heartbeats
select connected.Concat(disconnected)
).DistinctUntilChanged();
}
}
}
However in my case, what I’m actually interested in is the clusters “quorum”;
Quorum – “The minimal number of officers and members of a committee or organization, usually a majority, who must be present for valid transaction of business.”
Let us assume that quorum >= 1/2 the nodes in the cluster. Really, we just need a reactive “counter”, that is incremented & decremented as nodes go on & offline. Scan is perfect for reactive counters.
query.Scan(0, (a, v) => v.isConnected ? a + 1 : a - 1)
We can then transform this into a reactive Boolean telling us if the cluster has quorum or not.
var hasQuorum =
from count in query.Scan(0, (a, v) => v.isConnected ? a + 1 : a - 1)
select count >= 2;
I hope you find this useful. Here is a complete sample program that covers everything discussed in this series. As a TODO I’d like to revisit all of this with the new join/group join/window operators in Rx, it’s possible they could be applied here.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication1
{
class Program
{
static void Main()
{
// timeout period
var timeout = TimeSpan.FromSeconds(1);
// test source
var source = new Subject();
// query tells us which nodes are up or down.
var query =
from nodeId in source
group nodeId by nodeId into grp
from isConnected in IsConnected(grp, timeout)
select new { nodeId = grp.Key, isConnected };
query.Subscribe(Console.WriteLine);
// query tells us if the cluster has quorum.
var hasQuorum =
from count in query.Scan(0, (a, v) => v.isConnected ? a + 1 : a - 1)
select count >= 2;
hasQuorum
.DistinctUntilChanged()
.Subscribe(b => Console.WriteLine("has quorum: " + b));
source.OnNext("martyn");
source.OnNext("warne");
source.OnNext("ponting");
Console.ReadLine();
}
// takes a stream of heart beats & a timeout period.
// transforms into an observable bool that tells us if we are connected or not.
static IObservable IsConnected(IObservable heartbeats, TimeSpan timeout)
{
var connected = Observable.Return(true);
var disconnected = Observable.Return(false).Delay(timeout);
return Observable.Switch
(
from hb in heartbeats
select connected.Concat(disconnected)
).DistinctUntilChanged();
}
}
}
Comments