Be the first user to complete this post

  • 0
Add to List

The difference between Rxjs combineLatest and withLatestFrom

When I started looking at ways to combine streams, one of the points of confusion for me was the subtle difference in the behavior of combineLatest and withLatestFrom. Although they are both used to combine streams,the difference lies in terms of 'WHEN' the combined stream produces data on the subscriber.

combineLatest

When using combineLatest, data is produced on the subscriber when EITHER of the streams produce data. Lets see an example.
var streamA = Rx.Observable
    .interval(100 /* ms */)
    .take(3)
    .map(i => `A-${i}` );

var streamB = Rx.Observable
    .interval(50 /* ms */)
    .take(6)
    .map(i => `B-${i}` );

streamA
  .combineLatest(streamB)
  .subscribe( data => console.log(data) );
Conceptually, a marble diagram of it would be as follows
/* Marble diagram */
/*
    One: -----A-----A-----A|
    Two: ---B---B---B---B---B---B|

 Result: -----X-X---XX--X-X-X---X|
*/
Notice how data is emitted on the Result stream when ANY of the streams involved emit data. Also equally important is to notice that data had to be present on both the streams in order to emit output. In the above example, since stream B emits faster than stream A, there is no output until stream A also starts emitting. Also notice that when both the streams emit data at the same time(as in the code example above), the result stream sees two immediate data points.
When using combineLatest, the data of both the streams have an equal weightage in terms of producing an output given that data is present on both the streams.

withLatestFrom

The withLatestFrom method is quite different instead. You use a primary stream to control when the data is emitted on the result.
Think of withLatestFrom as a way to use a primary stream to throttle the output of a secondary stream.
var primary = Rx.Observable
    .interval(100 /* ms */)
    .take(3)
    .map(i => `A-${i}` );

var secondary = Rx.Observable
    .interval(50 /* ms */)
    .take(6)
    .map(i => `B-${i}` );

primary
  .withLatestFrom(secondary)
  .subscribe( data => console.log(data) );
Conceptually, a marble diagram of it would be as follows
/* Marble diagram */
/*
    One: -----A-----A-----A|
    Two: ---B---B---B---B---B---B|

 Result: -----X-----X-----X|
*/
Notice how in this case, data is emitted only 3 times on the result stream because the primary observable only emitted 3 data points. As expected, it combines with the latest value of the secondary stream when emitting data on the result, just like combineLatest does. Hope this helps a bit in case you had the same confusion.



Also Read:

  1. Rxjs Observable publish refcount vs share
  2. Difference between Rxjs Subject and Observable
  3. Understanding the Rx multicast operator
  4. The difference between switchMap and flatMap or mergeMap
  5. Getting started with Rxjs and streams