It’s very easy to overlook the fact that Delphi Prism fully supports .NET 3.5 and parts of .NET 4 and with it, the amazing range of Frameworks and Libraries that the .NET eco-system contains. One such framework that I’ve been waiting to get working with is the Microsoft Reactive Extensions for .NET (aka Rx Framework).
So what is the Rx Framework?
It’s a framework for asynchronous programming from Microsoft Research (including Erik Meijer & Wes Dyer amongst others) which could potentially be para-phrased as LINQ to Events. The basic problem is that Asynchronous programming is hard, the exacerbating issue for this problem is that it is entirely necessary for modern development. It is pretty common for Desktop applications to need to chain a series of events together with the correct order and sometimes correct time but doing so at present can land you in a spaghetti-like sea of event callbacks. Enter the Microsoft Rx Framework.
What do I need?
The Framework requires a minimum .NET Framework version of .NET 3.5 SP1 or .NET 4 but downloads for both versions are available on the Microsoft Research site. I couldn’t work out why but Delphi Prism appears to have a problem with the Libraries use of Extension Methods but this issue appears to be fixed in the latest Beta of Delphi Prism. I now suspect it might have started working as a result of the updated support for Extension methods which marc hoffman recently posted about.
How does it work?
In Brief: The Rx Framework introduces a of a pair of interfaces, the IObserver & IObservable, which allow you to represent push-based observable collections, along with a library of extension methods (the bit that wasn’t previously working in Delphi Prims) that implement the Standard LINQ Query Operators.
I always find that the easiest way to understand it is to view some examples. First you’ll need to make sure reference the System.CoreEx and System.Reactive assemblies.
As with always, we’ll start with an entirely useless example just to demonstrate a basic example which uses an Observable Timespan interval to print out a number which doubles every 2 seconds:
class method ConsoleApp.Main(args: array of string); begin var oneNumberPerSecond := Observable.Interval(TimeSpan.FromSeconds(2)); var numbersTimesTwo := from n in oneNumberPerSecond select n * 2; Console.WriteLine('Numbers * 2:'); numbersTimesTwo.Subscribe(num -> begin Console.WriteLine(num) end); Console.ReadKey(); end;
Clearly, this is a contrived example but I think this hints at a few of the more powerful features and versatility of the Rx Framework. The first thing to note is that this is an Asynchronous process, imagine the same code using the standard .NET Timer class. This is a demonstration of the power of the Observable Interfaces and how it can allow standard interfaces to concepts that would otherwise complicate our code such time windows to be easily expressed. The second lines to notice is the highlighted LINQ To Events code on lines 5 and 6 which show how easily you can filter specific events from any events. This is a truly powerful and important concept because it is fairly common to generate a stream of events, only some of which we want to react to and the Linq extensions allow us to express this in a succinct way.
This is only a basic console application but it’s not hard to imagine how this could make the complex ordering and timing of GUI events in many applications much easier. As another example I have translated the C# ISubject ping pong example which mimics the classic Ping Pong Actor example that the Scala folks get so excited about.
First of all the Ping ISubject and Pong ISubject classes:
namespace PingPongExample; interface uses System.Collections.Generic, System.Linq, System.Text; type Pong = public class (ISubject<Ping, Pong>) private protected public method OnError(exception: System.Exception); method OnCompleted; method Dispose; method Subscribe(observer: System.IObserver<PingPongExample.Pong>): System.IDisposable; method OnNext(value: PingPongExample.Ping); end; implementation method Pong.Dispose; begin OnCompleted(); end; method Pong.OnCompleted; begin Console.WriteLine('Pong finished Ponging'); end; method Pong.OnError(exception: System.Exception); begin Console.WriteLine('Exception'); end; method Pong.OnNext(value: PingPongExample.Ping); begin Console.WriteLine('Pong received Ping..'); end; method Pong.Subscribe(observer: System.IObserver<PingPongExample.Pong>): System.IDisposable; begin result := Observable.Interval(TimeSpan.FromSeconds(2.5)) .Where(n -> n < 10) .Select(n -> Self) .Subscribe(observer); end; end.
namespace PingPongExample; interface uses System.Collections.Generic, System.Linq, System.Text; type Ping = public class (ISubject<Pong, Ping>) private protected public method OnNext(value: Pong); method OnError(error: Exception); method OnFinished; method Subscribe(observer: System.IObserver<PingPongExample.Ping>): System.IDisposable; method OnCompleted; method Dispose; end; implementation method Ping.OnNext(value: Pong); begin Console.WriteLine('Ping received Pong..'); end; method Ping.OnError(error: Exception); begin Console.WriteLine('Exception'); end; method Ping.OnFinished; begin Console.WriteLine('Completed Ping Ponging'); end; method Ping.Dispose; begin OnFinished(); end; method Ping.OnCompleted; begin end; method Ping.Subscribe(observer: System.IObserver<PingPongExample.Ping>): System.IDisposable; begin result := Observable.Interval(TimeSpan.FromSeconds(2)) .Where(n -> n < 10) .Select(n -> Self) .Subscribe(observer); end; end.
And then our setup code:
class method ConsoleApp.Main(args: array of string); begin var ping := new Ping(); var pong := new Pong(); Console.WriteLine('Any key to stop..'); var pingSubscription := ping.Subscribe(pong); var pongSubscription := pong.Subscribe(ping); Console.ReadKey(); pongSubscription.Dispose(); pingSubscription.Dispose(); Console.WriteLine('Ping Pong Completed..'); end;
You can embrace IObservable in your own business objects quite easily and begin pushing events throughout your application asynchronously:
type Invoice = class private paidDate: DateTime; paidSubject: Subject<Invoice>; public constructor; method MarkPaid(paidDate: DateTime); property Paid: IObservable<Invoice> read paidSubject.Hide(); end; ConsoleApp = class public class method Main(args: array of string); end; implementation class method ConsoleApp.Main(args: array of string); begin var myInvoice := new Invoice(); myInvoice.Paid.Subscribe(n -> Console.WriteLine('Paid')); myInvoice.MarkPaid(DateTime.Now); Console.ReadLine(); end; constructor Invoice; begin Self.paidSubject := new Subject<Invoice>; end; method Invoice.MarkPaid(paidDate: DateTime); begin Self.paidDate := paidDate; Self.paidSubject.OnNext(Self); end;
It’s true that the examples above can be replicated quite easily without using the Rx Framework but within a short time of experimenting with the Rx Framework, I would hope that you should be able to see that it allows asynchronous event handling patterns to be expressed in a standard and more expressive manner.
For further information I would recommend:
- The Rx Wiki
- Wes Dyer’s introduction to the Rx Framework
- Wes Dyer and Erik Meijer from MS Research explaining the Rx Framework
- 101 Rx Samples (not quite 101 yet!)
Does the framework appeal to you? As always, if you have any thoughts or questions about using the Rx Framework with Delphi Prism then let me know in the comments below.