So in the observable extensions class there is a nice little extension method that stands out from the rest. RefCount() is different from all the other kids in the class
in that it's for IConnectableObservable<T> rather than IObservable<T>.
RefCount, "Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence."
This is particularly useful in the real world, when combined with the publish extension method.
Publish, "Returns a connectable observable sequence that shares a single subscription to the underlying source."
Lets say we have an observable that does something like this;
var priceUpdates = Observable.Create<PriceUpdate>(o =>
{
// Connect to server & subscribe to stream
Subscribe(raceId);
// mechanism for receiving push notifications -> o.OnNext(..)
() => Unsubscribe(raceId); // when disposed tell the server we don't care anymore
});
Now if you have multiple consumers of this observable in the client you have a problem. For example;
var priceUpdates = Observable.Create<int>(o =>
{
Console.WriteLine("Connect to server & subscribe to stream");
// mechanism for recieving push notifications -> o.OnNext(..)
return () => Console.WriteLine("when disposed tell the server we don't care anymore");
});
Console.WriteLine("+ first");
using (priceUpdates.Subscribe(u => { }))
{
Console.WriteLine("+ second");
using (priceUpdates.Subscribe(u => { }))
{
Console.WriteLine("- second");
}
Console.WriteLine("- first");
}
OUTPUT
+ first
Connect to server & subscribe to stream
+ second
Connect to server & subscribe to stream
- second
when disposed tell the server we don't care anymore
- first
when disposed tell the server we don't care anymore
In fact there are a variety of problems here.
- We sent a second (pointless) subscription request to the server.
- We told the server we didn't care anymore when we still had an outstanding subscription on the client.
- We sent a second (pointless) subscription request to the server.
Luckily the Rx framework has us covered here.
var priceUpdates = Observable.Create<int>(o =>
{
Console.WriteLine("Connect to server & subscribe to stream");
// mechanism for recieving push notifications -> o.OnNext(..)
return () => Console.WriteLine("when disposed tell the server we don't care anymore");
});
// just add this!
priceUpdates = priceUpdates.Publish().RefCount();
Console.WriteLine("+ first");
using (priceUpdates.Subscribe(u => { }))
{
Console.WriteLine("+ second");
using (priceUpdates.Subscribe(u => { }))
{
Console.WriteLine("- second");
}
Console.WriteLine("- first");
}
OUTPUT
+ first
Connect to server & subscribe to stream
+ second
- second
- first
when disposed tell the server we don't care anymore
I think will become a really common pattern for reactive client server communications. I hope you find this as useful as I did.
James
First post!
Comments