0

It’s very easy to overlook the fact that Delphi Prism fully supports .NET 3.5 and parts of .NET 4 and with it, the amazing range of Frameworks and Libraries that the .NET eco-system contains. One such framework that I’ve been waiting to get working with is the Microsoft Reactive Extensions for .NET (aka Rx Framework).

So what is the Rx Framework?

It’s a framework for asynchronous programming from Microsoft Research (including Erik Meijer & Wes Dyer amongst others) which could potentially be para-phrased as LINQ to Events. The basic problem is that Asynchronous programming is hard, the exacerbating issue for this problem is that it is entirely necessary for modern development. It is pretty common for Desktop applications to need to chain a series of events together with the correct order and sometimes correct time but doing so at present can land you in a spaghetti-like sea of event callbacks. Enter the Microsoft Rx Framework.

What do I need?

The Framework requires a minimum .NET Framework version of .NET 3.5 SP1 or .NET 4 but downloads for both versions are available on the Microsoft Research site. I couldn’t work out why but Delphi Prism appears to have a problem with the Libraries use of Extension Methods but this issue appears to be fixed in the latest Beta of Delphi Prism. I now suspect it might have started working as a result of the updated support for Extension methods which marc hoffman recently posted about.

How does it work?

In Brief: The Rx Framework introduces a of a pair of interfaces, the IObserver & IObservable, which allow you to represent push-based observable collections, along with a library of extension methods (the bit that wasn’t previously working in Delphi Prims) that implement the Standard LINQ Query Operators.

And…

I always find that the easiest way to understand it is to view some examples. First you’ll need to make sure reference the System.CoreEx and System.Reactive assemblies.

As with always, we’ll start with an entirely useless example just to demonstrate a basic example which uses an Observable Timespan interval to print out a number which doubles every 2 seconds:

class method ConsoleApp.Main(args: array of string);
begin
  var oneNumberPerSecond := Observable.Interval(TimeSpan.FromSeconds(2));

  var numbersTimesTwo := from n in oneNumberPerSecond
                        select n * 2;

  Console.WriteLine('Numbers * 2:');

  numbersTimesTwo.Subscribe(num -> begin Console.WriteLine(num) end);

  Console.ReadKey();
end;

Clearly, this is a contrived example but I think this hints at a few of the more powerful features and versatility of the Rx Framework. The first thing to note is that this is an Asynchronous process, imagine the same code using the standard .NET Timer class. This is a demonstration of the power of the Observable Interfaces and how it can allow standard interfaces to concepts that would otherwise complicate our code such time windows to be easily expressed. The second lines to notice is the highlighted LINQ To Events code on lines 5 and 6 which show how easily you can filter specific events from any events. This is a truly powerful and important concept because it is fairly common to generate a stream of events, only some of which we want to react to and the Linq extensions allow us to express this in a succinct way.

This is only a basic console application but it’s not hard to imagine how this could make the complex ordering and timing of GUI events in many applications much easier. As another example I have translated the C# ISubject ping pong example which mimics the classic Ping Pong Actor example that the Scala folks get so excited about.

First of all the Ping ISubject and Pong ISubject classes:

namespace PingPongExample;

interface

uses
  System.Collections.Generic,
  System.Linq,
  System.Text;

type
  Pong = public class (ISubject<Ping, Pong>)
  private
  protected
  public
    method OnError(exception: System.Exception);
    method OnCompleted;
    method Dispose;
    method Subscribe(observer: System.IObserver<PingPongExample.Pong>): System.IDisposable;
    method OnNext(value: PingPongExample.Ping);
  end;

implementation

method Pong.Dispose;
begin
  OnCompleted();
end;

method Pong.OnCompleted;
begin
  Console.WriteLine('Pong finished Ponging');
end;

method Pong.OnError(exception: System.Exception);
begin
  Console.WriteLine('Exception');
end;

method Pong.OnNext(value: PingPongExample.Ping);
begin
  Console.WriteLine('Pong received Ping..');
end;

method Pong.Subscribe(observer: System.IObserver<PingPongExample.Pong>): System.IDisposable;
begin
  result := Observable.Interval(TimeSpan.FromSeconds(2.5))
                .Where(n -> n < 10)
                .Select(n -> Self)
                .Subscribe(observer);
end;

end.
namespace PingPongExample;

interface

uses
  System.Collections.Generic,
  System.Linq,
  System.Text;

type
  Ping = public class (ISubject<Pong, Ping>)
  private
  protected
  public
    method OnNext(value: Pong);
    method OnError(error: Exception);
    method OnFinished;
    method Subscribe(observer: System.IObserver<PingPongExample.Ping>): System.IDisposable;
    method OnCompleted;
    method Dispose;
  end;

implementation

method Ping.OnNext(value: Pong);
begin
  Console.WriteLine('Ping received Pong..');
end;

method Ping.OnError(error: Exception);
begin
  Console.WriteLine('Exception');
end;

method Ping.OnFinished;
begin
  Console.WriteLine('Completed Ping Ponging');
end;

method Ping.Dispose;
begin
  OnFinished();
end;

method Ping.OnCompleted;
begin

end;

method Ping.Subscribe(observer: System.IObserver<PingPongExample.Ping>): System.IDisposable;
begin
  result := Observable.Interval(TimeSpan.FromSeconds(2))
                .Where(n -> n < 10)
                .Select(n -> Self)
                .Subscribe(observer);
end;

end.

And then our setup code:

class method ConsoleApp.Main(args: array of string);
begin
  var ping := new Ping();
  var pong := new Pong();

  Console.WriteLine('Any key to stop..');

  var pingSubscription := ping.Subscribe(pong);
  var pongSubscription := pong.Subscribe(ping);

  Console.ReadKey();

  pongSubscription.Dispose();
  pingSubscription.Dispose();

  Console.WriteLine('Ping Pong Completed..');
end;

You can embrace IObservable in your own business objects quite easily and begin pushing events throughout your application asynchronously:

type
  Invoice = class

  private
    paidDate: DateTime;
    paidSubject: Subject<Invoice>;
  public
    constructor;
    method MarkPaid(paidDate: DateTime);
    property Paid: IObservable<Invoice> read paidSubject.Hide();
  end;

  ConsoleApp = class
  public
    class method Main(args: array of string);
  end;

implementation

class method ConsoleApp.Main(args: array of string);
begin
  var myInvoice := new Invoice();
  myInvoice.Paid.Subscribe(n -> Console.WriteLine('Paid'));
  myInvoice.MarkPaid(DateTime.Now);
  Console.ReadLine();
end;

constructor Invoice;
begin
  Self.paidSubject := new Subject<Invoice>;
end;

method Invoice.MarkPaid(paidDate: DateTime);
begin
  Self.paidDate := paidDate;
  Self.paidSubject.OnNext(Self);
end;

It’s true that the examples above can be replicated quite easily without using the Rx Framework but within a short time of experimenting with the Rx Framework, I would hope that you should be able to see that it allows asynchronous event handling patterns to be expressed in a standard and more expressive manner.

I realise that this post is a little short on more practical examples but in my next post I’ll expand on some slightly more practical examples and some examples which demonstrate the Frameworks Power in handling GUI events in WPF Applications. It is also worth noting that the Rx Team recently released the RxJs framework which brings the Reactive Programming environment to Javascript.

Further Reading

For further information I would recommend:

Does the framework appeal to you? As always, if you have any thoughts or questions about using the Rx Framework with Delphi Prism then let me know in the comments below.

Tags: , ,

Leave a Comment