Reactive Extensions, to write async, event-based programs with observables
Hello World in Reactive way
Reactive Programming is "a programming paradigm oriented around data flows and the propagation of change" (wikipedia)
With Reactive Extensions (Rx), you can write asynchronous and event-based programs using observable sequences. Rx let you represent asynchronous data streams with Observables, (push-based notifications) and query asynchronous data streams using LINQ, Simply put "Rx = Observables + LINQ + Schedulers".
You can install the package via nuget.
pm> Install-Package Rx-Main
Channel9 has a concise introduction video: Rx Workshop Introduction. The simplest "Hello, World" can be done in this way.
class Program { static void Main(string[] args) { var streamOfChars = "Hello, World".ToObservable(); streamOfChars.Subscribe(c => Console.WriteLine(c)); } }
Another simple example is to enumerate from 1 to 10 and subscribe to it.
IObservable<int> source = Observable.Range(1, 10); IDisposable subscription = source.Subscribe( x => Console.WriteLine("OnNext: {0}", x), ex => Console.WriteLine("OnError: {0}", ex.Message), () => Console.WriteLine("OnCompleted")); Console.WriteLine("Press ENTER to unsubscribe..."); Console.ReadLine(); subscription.Dispose();
Cold vs. Hot Observables
Cold observables start running on subscription, that is, it starts pushing values to the observables when Subscribe is called. This doesn't fit in the real-world case, like stock tickers, which should be producing values even before a subscription is active. The observer that subscribes to a hot observable sequence, get the current value in the stream.
Console.WriteLine("Current Time: " + DateTime.Now); var source = Observable.Interval(TimeSpan.FromSeconds(1)); //creates a sequence IConnectableObservable<long> hot = Observable.Publish<long>(source); // convert the sequence into a hot sequence IDisposable subscription1 = hot.Subscribe( x => Console.WriteLine("Observer 1: OnNext: {0}", x), ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 1: OnCompleted")); Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now); Thread.Sleep(3000); //idle for 3 seconds hot.Connect(); // hot is connected to source and starts pushing value to subscribers Console.WriteLine("Current Time after Connect: " + DateTime.Now); Thread.Sleep(3000); //idle for 3 seconds Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now); // value will immediately be pushed to 2nd subscription IDisposable subscription2 = hot.Subscribe( x => Console.WriteLine("Observer 2: OnNext: {0}", x), ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message), () => Console.WriteLine("Observer 2: OnCompleted")); Console.ReadKey();
An example from MSDN.
Comments