Reactive Extensions, to write async, event-based programs with observables

1 minute read

Hello World in Reactive way

Reactive Programming is "a programming paradigm oriented around data flows and the propagation of change" (wikipedia)

With Reactive Extensions (Rx), you can write asynchronous and event-based programs using observable sequences. Rx let you represent asynchronous data streams with Observables, (push-based notifications) and query asynchronous data streams using LINQ, Simply put "Rx = Observables + LINQ + Schedulers".

You can install the package via nuget.

pm> Install-Package Rx-Main

Channel9 has a concise introduction video: Rx Workshop Introduction. The simplest "Hello, World" can be done in this way.

class Program
    static void Main(string[] args)
        var streamOfChars = "Hello, World".ToObservable();
        streamOfChars.Subscribe(c => Console.WriteLine(c));

Another simple example is to enumerate from 1 to 10 and subscribe to it.

IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
   x => Console.WriteLine("OnNext: {0}", x),
   ex => Console.WriteLine("OnError: {0}", ex.Message),
   () => Console.WriteLine("OnCompleted"));

Console.WriteLine("Press ENTER to unsubscribe...");

Cold vs. Hot Observables

Cold observables start running on subscription, that is, it starts pushing values to the observables when Subscribe is called. This doesn't fit in the real-world case, like stock tickers, which should be producing values even before a subscription is active. The observer that subscribes to a hot observable sequence, get the current value in the stream.

Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));
//creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);
// convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(
    x => Console.WriteLine("Observer 1: OnNext: {0}", x),
    ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
    () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
// hot is connected to source and starts pushing value to subscribers

Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

// value will immediately be pushed to 2nd subscription
IDisposable subscription2 = hot.Subscribe(
    x => Console.WriteLine("Observer 2: OnNext: {0}", x),
    ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
    () => Console.WriteLine("Observer 2: OnCompleted"));

An example from MSDN.