Be the first user to complete this post

  • 0
Add to List

Getting started with Rxjs and streams

When I first started learning about Rxjs observables and streams, I found it really difficult to understand the flow of data across the system. Not because its difficult, but mainly because it is simple yet different. In this article, I will try to explain my perception of the concept of streams and observables. Hopefully, it would assist you in your understanding and usage of this powerful library.

Whats in a stream?

The simplest way to think about a stream is in terms of a timeline.
Its easy to conceptualize a stream as a timeline on which events occur.
To visualize a stream, we usually create a marble diagram as follows
Stream: ---x------x-----x--------x--x---
In the above diagram, the markers x are are considered as data points. Hence a stream is clearly nothing else but data points separated by time. Now think about that for a moment. What can a stream represent in the real world? Well, the answer is quite simple. A stream can easily represent asyncronous data arriving from a source. The best examples would be - users clicking on a button, ajax responses, an interval etc.

How to create a stream in Rxjs?

To create a stream, you have to do two things - a) define the source of its data. - b) emit the data. RXjs provides you several utility functions to create streams from commonly event sources like - button clicks, mouse scrolls, timed intervals etc. In the following example, we will see how to create an observable stream using the Observable.create function which creates an stream based upon a function which acts as a dataSource.
function dataSource(observer) {
  observer.next(1); // Emit this value instantaneously

  setTimeout(function () {
    // Emit this value after some time
    observer.next(2);
    observer.complete(); // Indicate that there will be no more data
  }, 1000);
}

let numberStream$ = Rx.Observable.create(dataSource);
As you can see in the above example, the function dataSource must have a certain signature in order to produce data points on the stream. - The first argument it receives is considered the observer - i.e. the receiver of the data. - In order to produce data, you must call the next() method on the observer. - When there is no more data, the data source can indicate completion by invoking the complete() method.
TIP: The $ suffix in the variable numberStream$ is just a convention for naming streams.
Now that we have a stream, all we have to do is subscribe to the stream. You can do so by, guess what, the subscribe() function which is available on the stream itself.
function logger(data) { console.log(data); }
numberStream$.subscribe(logger);
The above code can be read as - When data is produced on the numberStream$, the logger is interested in receiving that data. The logger function, is therefore the observer.

Creating new streams from existing streams

The other interesting thing about data streams is that you can create new streams out of existing ones. What that means is - whenever a data point arrives on a stream, you can create another stream from it by writing another tranformation function that produces a corresponding data point. Lets see an example.
// Create a stream of clicks.
var source$ = Rx.Observable.fromEvent(document.querySelector('body'), 'click');

// Create another stream of the x coordinate of the clicks.
xCoordinate$ = source$.map(function(e) { return e.x; });

// logs the click event object
source$.subscribe(data => console.log(data) );

// logs the x coordinates
xCoordinate$.subscribe(data => console.log(data) );
A marble diagram of the above streams would be represented as follows
/*
source$:     ---e---e----e-e----

xCoordinate$:     x---x----x-x---
*/
As seen in the example above, we used an rxjs operator called map to create a new stream from an existing stream. The map operator takes a function as an argument, and invokes it whenever data arrives on the underlying stream(source$). It then creates a stream that contains the values returned by our transformation function. In our case, the transformation function returns the x coordinate of the event. The interesting part here is that the original stream source$ is left intact, which as seen in the example above can be subscribed to independent of the newly created xCoordinate$ stream.
Now that you know how to create simple streams, in the next article, I will cover the nature of execution of streams to lay the groundwork for more advanced concepts like subjects and stream combination.



Also Read:

  1. Rxjs Observable publish refcount vs share
  2. Understanding the Rx multicast operator
  3. Difference between Rxjs Subject and Observable
  4. The difference between Rxjs combineLatest and withLatestFrom
  5. The difference between switchMap and flatMap or mergeMap