Source code for forest.rx

"""
Rx - Functional reactive programming
------------------------------------

The basic data structue in functional reactive
programming is a :class:`Stream`

.. autoclass:: Stream
    :members:

"""
from forest.observe import Observable


[docs]class Stream(Observable): """Sequence of events An event is a value passed to a callback or listener. Most data structures exist in space, e.g. in RAM or on disk, but Streams exist in time. Common operations on streams include :func:`~Stream.map` and :func:`~Stream.filter`. """
[docs] def listen_to(self, observable): """Re-transmit events on another observable :returns: current stream """ observable.add_subscriber(self.notify) return self
[docs] def map(self, f): """Make new stream by applying f to values :returns: new stream that emits f(x) """ stream = Stream() def callback(x): stream.notify(f(x)) self.add_subscriber(callback) return stream
[docs] def distinct(self, comparator=None): """Remove repeated items :param comparator: f(x, y) that returns True if x == y :returns: new stream with duplicate events removed """ stream = Stream() def closure(): y = None called = False def callback(x): nonlocal y, called if not called: y = x # Important: must be before notify() to prevent recursion stream.notify(x) called = True return if comparator is None: not_same = x != y else: not_same = not comparator(x, y) if not_same: y = x # Important: must be before notify() to prevent recursion stream.notify(x) return callback self.add_subscriber(closure()) return stream
[docs] def filter(self, f): """Emit items that pass a predicate test :param f: predicate function True keeps value False discards value """ stream = Stream() def callback(x): if f(x): stream.notify(x) self.add_subscriber(callback) return stream
@classmethod def combine_latest(cls, *input_streams): output = cls() payload = len(input_streams) * [None] def callback(i): def wrapper(x): nonlocal payload payload[i] = x output.notify(tuple(payload)) return wrapper for i, stream in enumerate(input_streams): stream.add_subscriber(callback(i)) return output