Back to basics: The Publish/Subscribe Model
Many banking middleware solutions allow a consumer (either another service in the bank or perhaps an external 3rd party) to subscribe to information about a financial instrument. There are numerous ways of facilitating this, however in my experience this form of the Publish/Subscribe pattern is quite common.
1. Client sends a subscription request to the market data services, containing the “topic” that they are interested in.
2. The market data service, which is responsible for publishing this data, initially returns an “initial image” or “snapshot” of the instruments state.
3. The market data service publishes updates to the client.
If the systems involved were people, the conversation would go something like this.
From a technical point of view the communication goes like this.
Note that different middleware solutions might represent this in different ways. Using Nirvana I’ve used a Queue to model the request stream and a Channel with “Delta Delivery” for the snapshot and updates, I will cover the details of this in a future blog post, however for the time being check out Olivier Deheurles Nirvana Series. When implementing this pattern, I think it is important that the initial image is guaranteed to arrive before any updates. For this reason it is quite common to model the snapshot & updates using the same channel or queue.
This pattern is a pseudo industry standard and in systems like Reuters Market Data Service (RMDS) or Wombat MAMA, this pattern is supported out of the box. In short, what we have is an asynchronous messaging pattern that is a perfect use case for Reactive Extensions.
The Messages in Detail
Lets look at the snapshot & update messages in more detail. It is very common for these services to only publish what has changed since the previous event.
This has two main advantages. Firstly is uses less bandwidth. Secondly it actually makes it easier for the consumer to decide how to process the message. If the entire message was republished each time, the service might have to hold onto the previous message and perform a comparison when the new message arrives. It could all become rather ugly.
Inside The Consuming Process
Lets say we are calculating the price of the Foo index, and one of its main constituents is IBM.
Now lets say the client requests a price for the Bar index, which also contains the IBM stock. We can multicast the notifications to both index calculations, sharing the same underlying subscription.
We have a problem. The Bar index will never receive the existing state of the world, as the snapshot was already sent when the Foo index calculation subscribed. What we require is a multicast subject that retains some state. In Rx we have a ReplaySubject<T>, however that would only really be useful for recording a sequence of messages. We just want to track the state of the world and notify the observer when it subscribes, with a single notification. In reality we want to mirror the underlying Publish/Subscribe model in our own process.
MergeSubject<T>
To facilitate this, I’ve created a custom subject and an extension method to provide easy access. The subject’s constructor will require an Accumulator function that will fold each update into the original snapshot maintaining a the internal state of the world.
Extension method;
public static class ObservableEx
{
public static IObservable Publish(this IObservable source, Func accumulator)
{
return source.Multicast(new MergeSubject(accumulator));
}
}
Subject;
public class MergeSubject : ISubject
{
private readonly List> observers = new List>();
private readonly Func accumulator;
private Exception exception;
private bool isStopped;
private T value;
private bool hasValue;
public MergeSubject(Func accumulator)
{
this.accumulator = accumulator;
}
public void OnCompleted()
{
var observerArray = default(IObserver[]);
lock (observers)
{
if (!isStopped)
{
observerArray = observers.ToArray();
observers.Clear();
isStopped = true;
}
}
if (observerArray != null)
{
foreach (var observer in observerArray)
{
observer.OnCompleted();
}
}
}
public void OnError(Exception exception)
{
if (exception == null)
{
throw new ArgumentNullException("exception");
}
var observerArray = default(IObserver[]);
lock (observers)
{
if (!isStopped)
{
observerArray = observers.ToArray();
observers.Clear();
isStopped = true;
this.exception = exception;
}
}
if (observerArray != null)
{
foreach (var observer in observerArray)
{
observer.OnError(exception);
}
}
}
public void OnNext(T value)
{
var observerArray = default(IObserver[]);
lock (observers)
{
if (!isStopped)
{
if(hasValue)
{
this.value = accumulator(this.value, value);
}
else
{
this.value = value;
hasValue = true;
}
observerArray = observers.ToArray();
}
}
if (observerArray != null)
{
foreach (var observer in observerArray)
{
observer.OnNext(value);
}
}
}
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
lock (observers)
{
if (!isStopped)
{
observers.Add(observer);
if (hasValue)
{
observer.OnNext(value);
}
return new Subscription(this, observer);
}
}
if (exception != null)
{
observer.OnError(exception);
}
else
{
observer.OnCompleted();
}
return Disposable.Empty;
}
private class Subscription : IDisposable
{
private IObserver observer;
private readonly MergeSubject subject;
public Subscription(MergeSubject subject, IObserver observer)
{
this.subject = subject;
this.observer = observer;
}
public void Dispose()
{
if (observer != null)
{
lock (subject.observers)
{
if (observer != null)
{
subject.observers.Remove(observer);
observer = null;
}
}
}
}
}
}
I’ve created a program demonstrates how the Foo & Bar index can now safely share the same underlying stream of updates. As an exercise for the reader, consider building a more advanced subject MergeSubject<TSource, TResult>(Func<TResult, TSource, TResult> accumulator).
Our stock tick object is fairly straight forward.
class StockTick
{
public string Symbol { get; set; }
public decimal? Bid { get; set; }
public decimal? Ask { get; set; }
public decimal? Last { get; set; }
public long? BidSize { get; set; }
public long? AskSize { get; set; }
public long? LastSize { get; set; }
public long? Volume { get; set; }
public DateTime? QuoteTime { get; set; }
public DateTime? TradeTime { get; set; }
public override string ToString()
{
return new { Symbol, Bid, Ask, Last, BidSize, AskSize, LastSize, Volume, QuoteTime, TradeTime }.ToString();
}
}
We will also need a static method to merge two stock ticks together.
public static StockTick Merge(StockTick a, StockTick b)
{
return new StockTick
{
Bid = b.Bid ?? a.Bid,
Ask = b.Ask ?? a.Ask,
Last = b.Last ?? a.Last,
BidSize = b.BidSize ?? a.BidSize,
AskSize = b.AskSize ?? a.AskSize,
LastSize = b.LastSize ?? a.LastSize,
Volume = b.Volume ?? a.Volume,
QuoteTime = b.QuoteTime ?? a.QuoteTime,
TradeTime = b.TradeTime ?? a.TradeTime,
};
}
Here is the test program.
static void Main()
{
// underlying source
var ibm = new Subject();
// multicast via MergeSubject
var published = ibm.Publish(Merge);
published.Connect();
// start Foo
published.Subscribe(x => Console.WriteLine("FOO: {0}\n", x));
// publish initial state of the world
ibm.OnNext(new StockTick
{
Symbol = "IBM",
Bid = 89.02m,
Ask = 89.08m,
Last = 89.06m,
BidSize = 300,
AskSize = 1000,
LastSize = 200,
Volume = 7808,
QuoteTime = DateTime.Now,
TradeTime = DateTime.Now,
});
// and then publish a single update
ibm.OnNext(new StockTick { Symbol = "IBM", BidSize = 400 });
// start Bar (will receive state of the world automatically
published.Subscribe(x => Console.WriteLine("BAR: {0}\n", x));
// publish some more updates
ibm.OnNext(new StockTick { Symbol = "IBM", Bid = 89.00m, Ask = 89.06m, BidSize = 500, AskSize = 600 });
ibm.OnNext(new StockTick { Symbol = "IBM", AskSize = 400 });
}
OUTPUT
As you can see when Bar subscribes it receives an up to date snapshot of the stock, and subsequently receives update deltas.
Conclusion
Rx is a already a very useful technology in the world of investment banking. So many system in banks rely on asynchronous messaging protocols like the Publish/Subscribe model. MergeSubject<T> is an elegant way of representing this in process. I hope you like the kaki green diagrams (perhaps the Visio product team are communists).
Comments