Rx Reactive Framework 101 (Part I)

Introduction 

Un peu par hasard j’ai entendu parler du Reactive Framework pour .NET. J’étais intrigué et le moins qu’on puisse dire c’est que je n’ai pas été déçu ! C?est un modèle Push utilisant IObservable, a l’opposé du bon vieux modèle Pull d’IEnumerable .

Pour installer le framework, vos pouvez utiliser NuGet ou directement du site du Rx team.

Un outil pratique pour voir les fonctions est Rx Sandbox

J’ai utilisé la version 1.0.2838, ça ne marchera peut-être pas si vous utilisez une autre version.

Push model

Mais quel est la différence entre un modèle Push et Pull ?

Un schéma vaut mieux qu’un long discour (© Napoléon) :

 

 

Pour faire simple, dans un modèle pull (IEnumerable) il faut aller chercher soi-même les données tandis que dans un modèle push (IObservable) les données vous sont envoyés et vous réagissez (D’où le nom Reactive Framework).Pensez aux évènements en .NET. Maintenant imaginez LINQ pour évènements. Et voilà, vous venez de penser à Rx Framework !

Exemples

Un exemple vaut mieux qu’un long schéma (© Moi).

Simple exemple :[/fr]

using System;
using System.Collections.Generic;
using System.Linq;

namespace RxTest1
{
    class Program
    {
        static void Main(string[] args)
        {
            // create a IObservable fed every second.
            var timedstream = Observable.Interval(TimeSpan.FromSeconds(1));
            // subscribe to the IObservable
            timedstream.Subscribe(v => Console.WriteLine(v));

            Console.ReadKey();
        }
    }
}
  • Ligne 12: Je crée une collection IObservable. Elle est remplie chaque seconde avec un nombre incrémentant.
  • Ligne 14 : Je souscris à cette collection, et passe une lambda expression type Action.

Résultat :

Il écrit simplement le nombre envoyé par IObservable.

Fusionner:

using System;
using System.Collections.Generic;
using System.Linq;

namespace RxTest1
{
    class Program
    {
        static void Main(string[] args)
        {
            // data pushed every 3 seconds
            var timedstream1 = Observable.Interval(TimeSpan.FromSeconds(3)).Select(v => "   Stream 1 : " + v);
            // data pushed every 0.5 second
            var timedstream2 = Observable.Interval(TimeSpan.FromSeconds(0.5)).Select(v => "Stream 2 : " + v);
            // Merge the two IObservable and subscribe to the merged collection
            Observable.Merge(timedstream1, timedstream2).Subscribe(v => Console.WriteLine(v));

            Console.ReadKey();
        }
    }
}

 

  •   
    • Line 12 : Je crée une collection IObservable remplie toutes les 3 secondes.
    • Line 14 : Je crée une collection IObservable remplie toutes les 0.5 secondes.
    • Line 16 : Je fusionne les deux dans une nouvelle collection IObservable. Et je souscris à cette nouvelle collection. Voila le diagramme Marble de la fonction Merge :
  •  

Résultat :

 

[Comme prévu, il écrit les nombres envoyés par chacune des deux collections aussitôt qu’ils arrivent.

ZIP:

La signature et le diagramme Marble de la fonction ZIP :

Zip(IObservable, IEnumerable, Func)

 

 

Elle fusionne aussi 2 collections. Mais elle attend que des données soient prêtes dans les 2 collections avant d’appeler la fonction Func passée. La collection résultat est complète lorsqu?une des deux collection est complète, comme montré dans cet exemple : 

using System;
using System.Collections.Generic;
using System.Linq;

namespace RxTest1
{
    class Program
    {
        static void Main(string[] args)
        {
            // data pushed every 2 seconds, and take only first 3 data
            var timedstream1 = Observable.Interval(TimeSpan.FromSeconds(2)).Take(3);
            // data pushed every 0.5 second
            var timedstream2 = Observable.Interval(TimeSpan.FromSeconds(0.5));
            // Zip the two IObservable and subscribe to the zipped collection
            Observable.Zip(timedstream1, timedstream2, (x, y) => string.Format("Stream 1 : {0} - Stream 2 : {1}", x, y)).Subscribe(v => Console.WriteLine(v), () => Console.WriteLine("Completed."));

            Console.ReadKey();
        }
    }
}

 

  • Line 12 : Je crée une collection IObservable remplie toutes les 3 secondes. Mais je me limite aux 3 premiers nombres seulement.
  • Line 14 : Je crée une collection IObservable remplie toutes les 0.5 secondes.
  • Line 16 : Je zip ces 2 collections en une nouvelle collection IObservable en utilisant une fonction Func. Je souscris à cette dernière. J’écris un message quand la collection est complète. [/fr]

Comme vous voyez, la collection zippée est complète après les 3 premiers nombres car la première collection est complète.

Exceptions:

using System;
using System.Collections.Generic;
using System.Linq;

namespace RxTest1
{
    class Program
    {
        static void Main(string[] args)
        {
            // data pushed every 0.5 seconds, and take only first 4 data
            var timedstream1 = Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(4).Select(v => "Stream 1 : " + v);
            // raise exception
            var timedstream2 = Observable.Throw(new Exception("Exception thrown!"));
            // Concat the 2 collection. Write exception message is exception raised.
            Observable.Concat(timedstream1, timedstream2).Subscribe(v => Console.WriteLine(v), e => Console.WriteLine(e.Message), () => Console.WriteLine("Completed."));

            Console.ReadKey();
        }
    }
}
  • Line 12 : Je crée une collection IObservable remplie toutes les 0.5 secondes mais je ne prends que les 4 premiers nombres.
  • Line 14 : Je crée une collection IObservable qui lève une exception.
  • Line 16 : Je concatène les deux dans une nouvelle collection IObservable et je souscris à celle-ci. J’écris un message quand la collection est complète. Si une exception est levée, j’écris le message de l’exception.

 

 

Comme prévu, il affiche d’abord le contenu de la 1ere collection, puis affiche le message d’exception de la seconde.

Publication

Essayons de souscrire plusieurs fois à une collection IObservable :

using System;
using System.Collections.Generic;
using System.Linq;

namespace RxTest1
{
    class Program
    {
        static void Main(string[] args)
        {
            // create a IObservable fed every second.
            var timedstream = Observable.Interval(TimeSpan.FromSeconds(1)).Select(v =>
            {
                Console.WriteLine("Select evaluated!");
                return v;
            });
            // Subscription 1
            timedstream.Subscribe(v => Console.WriteLine("Observer 1 : " + v));
            // Subscription 2
            timedstream.Subscribe(v => Console.WriteLine("Observer 2 : " + v));

            Console.ReadKey();
        }
    }
}

Résultat:

Surprise ! Le Select est evalué pour chaque subscription ! Pour eviter cela, il faut publier la collection via une autre et souscrire à celle-ci.

using System;
using System.Collections.Generic;
using System.Linq;

namespace RxTest1
{
    class Program
    {
        static void Main(string[] args)
        {
            // create a IObservable fed every second.
            var timedstream = Observable.Interval(TimeSpan.FromSeconds(1)).Select(v =>
            {
                Console.WriteLine(
                    "Select evaluated!");
                return v;
            });
            // create a published version of the collection.
            var publishedtimedstream = timedstream.Publish(new Subject());

            // Subscription 1
            publishedtimedstream.Subscribe(v => Console.WriteLine("Observer 1 : " + v));
            // Subscription 2
            publishedtimedstream.Subscribe(v => Console.WriteLine("Observer 2 : " + v));

            Console.ReadKey();
        }
    }
}
  • Ligne 19 : Je crée une nouvelle collection observableen publiant la collection originale. Subject est un objet qui est à la fois observable et observé. C?est une implémentation de l’interface ISubject.
  • Ligne 22-24 : Je souscris deux fois à cette nouvelle collection.

Résultat:

Maintenant le select n’est évalué qu’une fois, quel que soit le nombre de souscris.

Hot Vs. Cold

Un hot streams publie ses données qu’il y ait des souscriptions ou pas. (Exemples : évènements timer ou souris)

Un cold streams ne commence à publier que lorsqu’il y a une souscription, et recommence à zéro pour chaque nouvelle souscription. (Exemple : collection Interval comme dans les exemples).

Exemple :

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace RxTest1
{
    class Program
    {
        static void Main(string[] args)
        {
            // create a IObservable fed every second.
            var timedstream = Observable.Interval(TimeSpan.FromSeconds(1));
            // create a published version of the collection.
            var publishedtimedstream = timedstream.Publish(new Subject());

            // Sleep 3 seconds
            Thread.Sleep(3000);
            // Subscription 
            publishedtimedstream.Subscribe(v => Console.WriteLine("Observer 1 : " + v));


            Console.ReadKey();
        }
    }
}

Comme vous voyez, le 1er résultat affiché est 3, preuve que la collection a publié même lorsqu’il n’y avait aucune souscription. Publish fait d’un stream cold un stream hot. Defer fait d’un stream hot un stream cold.

Now cold stream example :

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace RxTest1
{
    class Program
    {
        static void Main(string[] args)
        {
            // create a IObservable fed every second.
            var timedstream = Observable.Interval(TimeSpan.FromSeconds(1));
            // create a published version of the collection.

            // Sleep 3 seconds
            Thread.Sleep(3000);
            // Subscription 
            timedstream.Subscribe(v => Console.WriteLine("Observer 1 : " + v));
            // Sleep 3 seconds
            Thread.Sleep(3000);
            // Subscription 
            timedstream.Subscribe(v => Console.WriteLine("Observer 2 : " + v));

            Console.ReadKey();
        }
    }
}

Il recommence à zéro pour chaque souscription, montrant que la collection a commencé à publier lors de la souscription.

Conclusion

Ce ne sont que des exemples triviaux. Je montrerais plus dans la partie II, avec souscription à des évènements entre autre.

C’est un framework très impressionnant. Un très bon outil pour développeur, et qui rend certains scenarios asynchrones plus faciles. Ce sera certainement un acteur important de .NET prochainement et je pense que chaque développeur .NET sérieux devrait y jeter un oeil.

La seule chose est qu’il change souvent, et beaucoup d’exemples trouvés sur le net ne marchent plus. Mais c’est le prix à payer pour avoir un team très réactif (Gag !) qui est à l’écoute de la communauté. 

The following two tabs change content below.
Olivier

Olivier

Mobile Engineer chez Arcana Studio
Développeur Freelance. Passionné par le développement mobile et l'internet des objets. Expert en WinRT et Xamarin. MVP Windows Platform Development Nokia Developer Champion