It’s very easy to overlook the fact that Delphi Prism fully supports .NET 3.5 and parts of .NET 4 and with it, the amazing range of Frameworks and Libraries that the .NET eco-system contains. One such framework that I’ve been waiting to get working with is the Microsoft Reactive Extensions for .NET (aka Rx Framework).
So what is the Rx Framework?
It’s a framework for asynchronous programming from Microsoft Research (including Erik Meijer & Wes Dyer amongst others) which could potentially be para-phrased as LINQ to Events. The basic problem is that Asynchronous programming is hard, the exacerbating issue for this problem is that it is entirely necessary for modern development. It is pretty common for Desktop applications to need to chain a series of events together with the correct order and sometimes correct time but doing so at present can land you in a spaghetti-like sea of event callbacks. Enter the Microsoft Rx Framework.
What do I need?
The Framework requires a minimum .NET Framework version of .NET 3.5 SP1 or .NET 4 but downloads for both versions are available on the Microsoft Research site. I couldn’t work out why but Delphi Prism appears to have a problem with the Libraries use of Extension Methods but this issue appears to be fixed in the latest Beta of Delphi Prism. I now suspect it might have started working as a result of the updated support for Extension methods which marc hoffman recently posted about.
How does it work?
In Brief: The Rx Framework introduces a of a pair of interfaces, the IObserver & IObservable, which allow you to represent push-based observable collections, along with a library of extension methods (the bit that wasn’t previously working in Delphi Prims) that implement the Standard LINQ Query Operators.
And…
I always find that the easiest way to understand it is to view some examples. First you’ll need to make sure reference the System.CoreEx and System.Reactive assemblies.
As with always, we’ll start with an entirely useless example just to demonstrate a basic example which uses an Observable Timespan interval to print out a number which doubles every 2 seconds:
class method ConsoleApp.Main(args: array of string);
begin
var oneNumberPerSecond := Observable.Interval(TimeSpan.FromSeconds(2));
var numbersTimesTwo := from n in oneNumberPerSecond
select n * 2;
Console.WriteLine('Numbers * 2:');
numbersTimesTwo.Subscribe(num -> begin Console.WriteLine(num) end);
Console.ReadKey();
end;
Clearly, this is a contrived example but I think this hints at a few of the more powerful features and versatility of the Rx Framework. The first thing to note is that this is an Asynchronous process, imagine the same code using the standard .NET Timer class. This is a demonstration of the power of the Observable Interfaces and how it can allow standard interfaces to concepts that would otherwise complicate our code such time windows to be easily expressed. The second lines to notice is the highlighted LINQ To Events code on lines 5 and 6 which show how easily you can filter specific events from any events. This is a truly powerful and important concept because it is fairly common to generate a stream of events, only some of which we want to react to and the Linq extensions allow us to express this in a succinct way.
This is only a basic console application but it’s not hard to imagine how this could make the complex ordering and timing of GUI events in many applications much easier. As another example I have translated the C# ISubject ping pong example which mimics the classic Ping Pong Actor example that the Scala folks get so excited about.
First of all the Ping ISubject and Pong ISubject classes:
namespace PingPongExample;
interface
uses
System.Collections.Generic,
System.Linq,
System.Text;
type
Pong = public class (ISubject<Ping, Pong>)
private
protected
public
method OnError(exception: System.Exception);
method OnCompleted;
method Dispose;
method Subscribe(observer: System.IObserver<PingPongExample.Pong>): System.IDisposable;
method OnNext(value: PingPongExample.Ping);
end;
implementation
method Pong.Dispose;
begin
OnCompleted();
end;
method Pong.OnCompleted;
begin
Console.WriteLine('Pong finished Ponging');
end;
method Pong.OnError(exception: System.Exception);
begin
Console.WriteLine('Exception');
end;
method Pong.OnNext(value: PingPongExample.Ping);
begin
Console.WriteLine('Pong received Ping..');
end;
method Pong.Subscribe(observer: System.IObserver<PingPongExample.Pong>): System.IDisposable;
begin
result := Observable.Interval(TimeSpan.FromSeconds(2.5))
.Where(n -> n < 10)
.Select(n -> Self)
.Subscribe(observer);
end;
end.
namespace PingPongExample;
interface
uses
System.Collections.Generic,
System.Linq,
System.Text;
type
Ping = public class (ISubject<Pong, Ping>)
private
protected
public
method OnNext(value: Pong);
method OnError(error: Exception);
method OnFinished;
method Subscribe(observer: System.IObserver<PingPongExample.Ping>): System.IDisposable;
method OnCompleted;
method Dispose;
end;
implementation
method Ping.OnNext(value: Pong);
begin
Console.WriteLine('Ping received Pong..');
end;
method Ping.OnError(error: Exception);
begin
Console.WriteLine('Exception');
end;
method Ping.OnFinished;
begin
Console.WriteLine('Completed Ping Ponging');
end;
method Ping.Dispose;
begin
OnFinished();
end;
method Ping.OnCompleted;
begin
end;
method Ping.Subscribe(observer: System.IObserver<PingPongExample.Ping>): System.IDisposable;
begin
result := Observable.Interval(TimeSpan.FromSeconds(2))
.Where(n -> n < 10)
.Select(n -> Self)
.Subscribe(observer);
end;
end.
And then our setup code:
class method ConsoleApp.Main(args: array of string);
begin
var ping := new Ping();
var pong := new Pong();
Console.WriteLine('Any key to stop..');
var pingSubscription := ping.Subscribe(pong);
var pongSubscription := pong.Subscribe(ping);
Console.ReadKey();
pongSubscription.Dispose();
pingSubscription.Dispose();
Console.WriteLine('Ping Pong Completed..');
end;
You can embrace IObservable in your own business objects quite easily and begin pushing events throughout your application asynchronously:
type
Invoice = class
private
paidDate: DateTime;
paidSubject: Subject<Invoice>;
public
constructor;
method MarkPaid(paidDate: DateTime);
property Paid: IObservable<Invoice> read paidSubject.Hide();
end;
ConsoleApp = class
public
class method Main(args: array of string);
end;
implementation
class method ConsoleApp.Main(args: array of string);
begin
var myInvoice := new Invoice();
myInvoice.Paid.Subscribe(n -> Console.WriteLine('Paid'));
myInvoice.MarkPaid(DateTime.Now);
Console.ReadLine();
end;
constructor Invoice;
begin
Self.paidSubject := new Subject<Invoice>;
end;
method Invoice.MarkPaid(paidDate: DateTime);
begin
Self.paidDate := paidDate;
Self.paidSubject.OnNext(Self);
end;
It’s true that the examples above can be replicated quite easily without using the Rx Framework but within a short time of experimenting with the Rx Framework, I would hope that you should be able to see that it allows asynchronous event handling patterns to be expressed in a standard and more expressive manner.
I realise that this post is a little short on more practical examples but in my next post I’ll expand on some slightly more practical examples and some examples which demonstrate the Frameworks Power in handling GUI events in WPF Applications. It is also worth noting that the Rx Team recently released the RxJs framework which brings the Reactive Programming environment to Javascript.
Further Reading
For further information I would recommend:
- The Rx Wiki
- Wes Dyer’s introduction to the Rx Framework
- Wes Dyer and Erik Meijer from MS Research explaining the Rx Framework
- 101 Rx Samples (not quite 101 yet!)
Does the framework appeal to you? As always, if you have any thoughts or questions about using the Rx Framework with Delphi Prism then let me know in the comments below.

I am a Delphi Developer, .NET and Web Developer and General Geek. I am an enthusiastic advocate of hobbyist development and in particular tools which allow for hobbyist development. Please have a good look around and enjoy anything that you find useful on this site. 
