← Back to all articles

Modelling Market Data Services in Rx with Scan + Replay

In my last post I defined a new type of subject that is useful for modelling snapshot/update style Publish/Subscribe communications. An alternative to building your own subject is to create an envelope type that represents an update delta & the current state of the world.

    // Envelope
    public class Update
    {
        public StockTick Delta { get; set; }
        public StockTick Image { get; set; }

        public Update Create(StockTick delta)
        {
            return new Update
            {
                Delta = delta,
                Image = Image == null ? delta : StockTick.Merge(Image, delta)
            };
        }

        public override string ToString()
        {
            return string.Format("\nDELTA: {0}\nIMAGE: {1}", Delta, Image);
        }
    }

    public class StockTick
    {
        public string Symbol { get; set; }

        public decimal? Bid { get; set; }
        public decimal? Ask { get; set; }
        public decimal? Last { get; set; }

        public long? BidSize { get; set; }
        public long? AskSize { get; set; }
        public long? LastSize { get; set; }
        public long? Volume { get; set; }

        public DateTime? QuoteTime { get; set; }
        public DateTime? TradeTime { get; set; }

        public override string ToString()
        {
            return new { Symbol, Bid, Ask, Last, BidSize, AskSize, LastSize, Volume, QuoteTime, TradeTime }.ToString();
        }

        public static StockTick Merge(StockTick a, StockTick b)
        {
            return new StockTick
            {
                Bid = b.Bid ?? a.Bid,
                Ask = b.Ask ?? a.Ask,
                Last = b.Last ?? a.Last,
                BidSize = b.BidSize ?? a.BidSize,
                AskSize = b.AskSize ?? a.AskSize,
                LastSize = b.LastSize ?? a.LastSize,
                Volume = b.Volume ?? a.Volume,
                QuoteTime = b.QuoteTime ?? a.QuoteTime,
                TradeTime = b.TradeTime ?? a.TradeTime,
            };
        }
    }

You can then use Scan + Replay to achieve similar behaviour.

    source
    .Scan(new Update(), (a, v) => a.Create(v))
    .Replay(1);

image

Sample program;

        public static void Main()
        {
            // underlying source
            var ibm = new Subject();

            // Scan into Update & then Replay(1)
            var published = ibm
                .Scan(new Update(), (a, v) => a.Create(v))
                .Replay(1);
            published.Connect();

            // start Foo
            published.Subscribe(x => Console.WriteLine("FOO {0}\n", x));

            // publish initial state of the world 
            ibm.OnNext(new StockTick
            {
                Symbol = "IBM",
                Bid = 89.02m,
                Ask = 89.08m,
                Last = 89.06m,
                BidSize = 300,
                AskSize = 1000,
                LastSize = 200,
                Volume = 7808,
                QuoteTime = DateTime.Now,
                TradeTime = DateTime.Now,
            });

            // and then publish a single update
            ibm.OnNext(new StockTick { Symbol = "IBM", BidSize = 400 });

            // start Bar (will receive state of the world automatically
            published.Subscribe(x => Console.WriteLine("BAR {0}\n", x));

            // publish some more updates
            ibm.OnNext(new StockTick { Symbol = "IBM", Bid = 89.00m, Ask = 89.06m, BidSize = 500, AskSize = 600 });
            ibm.OnNext(new StockTick { Symbol = "IBM", AskSize = 400 });
        }

OUTPUT

image

In some ways I prefer this approach. It is a more flexible API in that the observer can choose between an update delta or the fully hydrated object. Interested to here what you think.

Comments