Source code for forest.rx

"""
Rx - Functional reactive programming
------------------------------------

The basic data structue in functional reactive
programming is a :class:`Stream`

.. autoclass:: Stream
    :members:

"""
from forest.observe import Observable


[docs]class Stream(Observable): """Sequence of events An event is a value passed to a callback or listener. Most data structures exist in space, e.g. in RAM or on disk, but Streams exist in time. Common operations on streams include :func:`~Stream.map` and :func:`~Stream.filter`. """
[docs] def listen_to(self, observable): """Re-transmit events on another observable :returns: current stream """ observable.add_subscriber(self.notify) return self
[docs] def map(self, f): """Make new stream by applying f to values :returns: new stream that emits f(x) """ stream = Stream() def callback(x): stream.notify(f(x)) self.add_subscriber(callback) return stream
[docs] def distinct(self): """Remove repeated items :returns: new stream with duplicate events removed """ stream = Stream() def closure(): y = None called = False def callback(x): nonlocal y, called if not called: y = x # Important: must be before notify() to prevent recursion stream.notify(x) called = True if x != y: y = x # Important: must be before notify() to prevent recursion stream.notify(x) return callback self.add_subscriber(closure()) return stream
[docs] def filter(self, f): """Emit items that pass a predicate test :param f: predicate function True keeps value False discards value """ stream = Stream() def callback(x): if f(x): stream.notify(x) self.add_subscriber(callback) return stream