Compartir a través de


Crear y suscribirse a secuencias observables simples

No es necesario implementar manualmente la interfaz de T> IObservable<para crear secuencias observables. Del mismo modo, no es necesario implementar IObserver<T> para suscribirse a una secuencia. Al instalar los ensamblados de extensión reactiva, puede aprovechar el tipo observable que proporciona muchos operadores LINQ estáticos para crear una secuencia simple con cero, uno o varios elementos. Además, Rx proporciona métodos de extensión Subscribe que toman varias combinaciones de controladores OnNext, OnError y OnCompleted en términos de delegados.

Creación y suscripción a una secuencia sencilla

En el ejemplo siguiente se usa el operador Range del tipo Observable para crear una colección observable simple de números. El observador se suscribe a esta colección mediante el método Subscribe de la clase Observable y proporciona acciones que son delegados que controlan OnNext, OnError y OnCompleted.

El operador Range tiene varias sobrecargas. En nuestro ejemplo, crea una secuencia de enteros que comienza con x y genera números secuenciales y después. 

En cuanto se produzca la suscripción, los valores se envían al observador. A continuación, el delegado OnNext imprime los valores.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }
    }
}

Cuando un observador se suscribe a una secuencia observable, el subproceso que llama al método Subscribe puede ser diferente del subproceso en el que se ejecuta la secuencia hasta su finalización. Por lo tanto, la llamada Subscribe es asincrónica en que el autor de la llamada no se bloquea hasta que se completa la observación de la secuencia. Esto se tratará en más detalles en el tema Uso de programadores .

Observe que el método Subscribe devuelve un IDisposable, para que pueda cancelar la suscripción a una secuencia y eliminarlo fácilmente. Al invocar el método Dispose en la secuencia observable, el observador dejará de escuchar a los observables para los datos.  Normalmente, no es necesario llamar explícitamente a Dispose a menos que tenga que cancelar la suscripción anticipada o cuando la secuencia observable de origen tenga un período de vida mayor que el observador. Las suscripciones de Rx están diseñadas para escenarios de fuego y olvido sin el uso de un finalizador. Cuando el recolector de elementos no utilizados recopila la instancia IDisposable, Rx no elimina automáticamente la suscripción. Sin embargo, tenga en cuenta que el comportamiento predeterminado de los operadores observables es eliminar la suscripción lo antes posible (es decir, cuando se publica un mensaje OnCompleted o OnError). Por ejemplo, el código var x = Observable.Zip(a,b).Subscribe(); se suscribirá x a ambas secuencias a y b. Si se produce un error, x se cancelará inmediatamente de b.

También puede ajustar el ejemplo de código para usar el operador Create del tipo Observable , que crea y devuelve un observador de los delegados de acción OnNext, OnError y OnCompleted especificados. A continuación, puede pasar este observador al método Subscribe del tipo Observable . En el ejemplo siguiente se muestra cómo hacerlo.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IObserver<int> obsvr = Observer.Create<int>(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            IDisposable subscription = source.Subscribe(obsvr);
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
       }
    }
}

Además de crear una secuencia observable desde cero, puede convertir enumeradores existentes, eventos de .NET y patrones asincrónicos en secuencias observables. Los demás temas de esta sección le mostrarán cómo hacerlo.

Tenga en cuenta que en este tema solo se muestran algunos operadores que pueden crear una secuencia observable desde cero. Para obtener más información sobre otros operadores LINQ, consulte Consulta de secuencias observables mediante operadores LINQ.

Uso de un temporizador

En el ejemplo siguiente se usa el operador Timer para crear una secuencia. La secuencia insertará el primer valor después de que haya transcurrido 5 segundos y, a continuación, insertará los valores posteriores cada 1 segundo. Con fines ilustrativos, encadenamos el operador Timestamp a la consulta para que cada valor insertado se anexe en el momento en que se publique. Al hacerlo, al suscribirse a esta secuencia de origen, podemos recibir su valor y marca de tiempo.

Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
                       .Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
      {
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
      }
Console.WriteLine("Press any key to exit");
Console.ReadKey();

La salida debe ser similar a esta:

Current Time: 5/31/2011 5:35:08 PM

Press any key to unsubscribe

0: 5/31/2011 5:35:13 PM -07:00

1: 5/31/2011 5:35:14 PM -07:00

2: 5/31/2011 5:35:15 PM -07:00

Mediante el uso del operador Timestamp, hemos comprobado que el primer elemento se inserta realmente 5 segundos después de que se haya iniciado la secuencia y cada elemento se publica 1 segundo más tarde.

Convertir una colección enumerable en una secuencia observable

Con el operador ToObservable, puede convertir una colección enumerable genérica en una secuencia observable y suscribirse a ella.

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();

Observables fríos frente a calientes

Los observables inactivos comienzan a ejecutarse en la suscripción, es decir, la secuencia observable solo comienza a insertar valores en los observadores cuando se llama a Subscribe. Los valores tampoco se comparten entre los suscriptores. Esto es diferente de los observables activos, como los eventos de movimiento del mouse o los tickers de acciones que ya producen valores incluso antes de que una suscripción esté activa. Cuando un observador se suscribe a una secuencia observable activa, obtendrá el valor actual en la secuencia. La secuencia observable activa se comparte entre todos los suscriptores y cada suscriptor se inserta el siguiente valor en la secuencia. Por ejemplo, incluso si nadie se ha suscrito a un ticker de acciones determinado, el ticker seguirá actualizando su valor en función del movimiento del mercado. Cuando un suscriptor registra interés en este ticker, obtendrá automáticamente el tic más reciente.

En el ejemplo siguiente se muestra una secuencia observable inactiva. En este ejemplo, usamos el operador Interval para crear una secuencia observable simple de números bombeados a intervalos específicos, en este caso, cada 1 segundo.

Después, dos observadores se suscriben a esta secuencia e imprimen sus valores. Observará que la secuencia se restablece para cada suscriptor, en la que la segunda suscripción reiniciará la secuencia desde el primer valor.

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));   

IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));

IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));

Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();

En el ejemplo siguiente, convertimos la secuencia source observable inactiva anterior en una activa mediante el operador Publish, que devuelve una instancia IConnectableObservable que denominamos hot. El operador Publish proporciona un mecanismo para compartir suscripciones mediante la difusión de una sola suscripción a varios suscriptores. hot actúa como proxy y se suscribe a source, a medida que recibe valores de source, los inserta en sus propios suscriptores. Para establecer una suscripción a la copia de seguridad source y empezar a recibir valores, usamos el método IConnectableObservable.Connect(). Puesto que IConnectableObservable hereda IObservable, podemos usar Subscribe para suscribirse a esta secuencia activa incluso antes de empezar a ejecutarse. Observe que, en el ejemplo, la secuencia activa no se ha iniciado cuando subscription1 se suscribe a ella. Por lo tanto, no se inserta ningún valor en el suscriptor. Después de llamar a Connect, los valores se insertan en subscription1. Después de un retraso de 3 segundos, subscription2 se suscribe a y comienza a hot recibir los valores inmediatamente desde la posición actual (3 en este caso) hasta el final. La salida es similar a esta:

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(                        // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();