In my last post I defined a new type of subject that is useful for modelling snapshot/update style Publish/Subscribe communications. An alternative to building your own subject is to create an envelope type that represents an update delta & the current state of the world.
// Envelope
public class Update
{
public StockTick Delta { get; set; }
public StockTick Image { get; set; }
public Update Create(StockTick delta)
{
return new Update
{
Delta = delta,
Image = Image == null ? delta : StockTick.Merge(Image, delta)
};
}
public override string ToString()
{
return string.Format("\nDELTA: {0}\nIMAGE: {1}", Delta, Image);
}
}
public class StockTick
{
public string Symbol { get; set; }
public decimal? Bid { get; set; }
public decimal? Ask { get; set; }
public decimal? Last { get; set; }
public long? BidSize { get; set; }
public long? AskSize { get; set; }
public long? LastSize { get; set; }
public long? Volume { get; set; }
public DateTime? QuoteTime { get; set; }
public DateTime? TradeTime { get; set; }
public override string ToString()
{
return new { Symbol, Bid, Ask, Last, BidSize, AskSize, LastSize, Volume, QuoteTime, TradeTime }.ToString();
}
public static StockTick Merge(StockTick a, StockTick b)
{
return new StockTick
{
Bid = b.Bid ?? a.Bid,
Ask = b.Ask ?? a.Ask,
Last = b.Last ?? a.Last,
BidSize = b.BidSize ?? a.BidSize,
AskSize = b.AskSize ?? a.AskSize,
LastSize = b.LastSize ?? a.LastSize,
Volume = b.Volume ?? a.Volume,
QuoteTime = b.QuoteTime ?? a.QuoteTime,
TradeTime = b.TradeTime ?? a.TradeTime,
};
}
}
You can then use Scan + Replay to achieve similar behaviour.
source
.Scan(new Update(), (a, v) => a.Create(v))
.Replay(1);
Sample program;
public static void Main()
{
// underlying source
var ibm = new Subject();
// Scan into Update & then Replay(1)
var published = ibm
.Scan(new Update(), (a, v) => a.Create(v))
.Replay(1);
published.Connect();
// start Foo
published.Subscribe(x => Console.WriteLine("FOO {0}\n", x));
// publish initial state of the world
ibm.OnNext(new StockTick
{
Symbol = "IBM",
Bid = 89.02m,
Ask = 89.08m,
Last = 89.06m,
BidSize = 300,
AskSize = 1000,
LastSize = 200,
Volume = 7808,
QuoteTime = DateTime.Now,
TradeTime = DateTime.Now,
});
// and then publish a single update
ibm.OnNext(new StockTick { Symbol = "IBM", BidSize = 400 });
// start Bar (will receive state of the world automatically
published.Subscribe(x => Console.WriteLine("BAR {0}\n", x));
// publish some more updates
ibm.OnNext(new StockTick { Symbol = "IBM", Bid = 89.00m, Ask = 89.06m, BidSize = 500, AskSize = 600 });
ibm.OnNext(new StockTick { Symbol = "IBM", AskSize = 400 });
}
OUTPUT
In some ways I prefer this approach. It is a more flexible API in that the observer can choose between an update delta or the fully hydrated object. Interested to here what you think.
Comments