So sánh rxjava 1 và 2 năm 2024

Hẳn các bạn vẫn còn nhớ trong một số bài trước chúng ta có nói về Observable trong ứng dụng Angular, vậy Observable là gì, nó có quan hệ gì với Angular, làm thế nào để sử dụng Observable hiệu quả trong ứng dụng của bạn. Trong bài này chúng ta sẽ cùng tìm hiểu về Observable, về Rxjs, Reactive Programming.

1. Giới thiệu

Angular đi kèm với một dependency là Rxjs giúp cho nó trở nên reactive, một ứng dụng Angular là một reactive system. Dễ thấy nhất ở đây chính là EventEmitter, hay Reactive Forms mà chúng ta đã tìm hiểu trong các bài học trước.

Vậy Reactive Programming (RP) là gì? Điều gì khiến nó trở thành một chủ đề hot như vậy…

Hiện tại, có cả tá định nghĩa về RP, nhưng mình thấy định nghĩa sau đây là bao quát tốt vấn đề:

Reactive programming is programming with asynchronous data streams

Vâng đúng vậy, đây là phương pháp lập trình xoay quanh data streams và nó deal với các vấn đề của asynchronous. Nhưng bạn đừng hiểu lầm, nó có thể deal với cả synchronous nữa.

Bạn có thể tưởng tượng data streams như hình sau, với

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

4 được gửi đến trong suốt dòng thời gian của một stream (over time), giống như một array có các phần tử được gửi đến lần lượt theo thời gian.

So sánh rxjava 1 và 2 năm 2024

Và chúng ta có thể coi mọi thứ là stream: single value, array, event, etc.

So sánh rxjava 1 và 2 năm 2024

Không những thế, khi thao tác với stream, chúng ta có thể có

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

5,

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

6, hay

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7 signals. Đây là điều mà các API trước đây của các hệ thống event trong JavaScript còn thiếu, chúng có qua nhiều interface khác nhau cho các loại event khác nhau, Observable sinh ra để tổng quát hóa các interface đó lại.

So sánh rxjava 1 và 2 năm 2024

Và Rxjs giúp chúng ta có được reactive trong lập trình ứng dụng JavaScript:

Rxjs is a library for composing asynchronous and event-based programs by using observable sequences.

Think of Rxjs as Lodash (ultility for array/object) for events/streams.

ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.

Các Concepts nền tảng của Rxjs bao gồm:

An Observable is a collection that arrives over time
  • Observable: đại diện cho khái niệm về một tập hợp các giá trị hoặc các sự kiện trong tương lai. Khi các giá trị hoặc sự kiện phát sinh trong tương lai, Observable sẽ điều phối nó đến Observer.
  • Observer: là một tập hợp các callbacks tương ứng cho việc lắng nghe các giá trị (

    function getDetail() { console.log('tiepphan.com'); return 100; } const observable = Rx.Observable.create(function (observer) { console.log('Rxjs và Reactive Programming'); observer.next(1); observer.next(2); observer.next(3); setTimeout(() => {

    observer.next(4);  
    observer.complete();  
    
    }, 1000); });

    8,

    function getDetail() { console.log('tiepphan.com'); return 100; } const observable = Rx.Observable.create(function (observer) { console.log('Rxjs và Reactive Programming'); observer.next(1); observer.next(2); observer.next(3); setTimeout(() => {

    observer.next(4);  
    observer.complete();  
    
    }, 1000); });

    6, hay

    function getDetail() { console.log('tiepphan.com'); return 100; } const observable = Rx.Observable.create(function (observer) { console.log('Rxjs và Reactive Programming'); observer.next(1); observer.next(2); observer.next(3); setTimeout(() => {

    observer.next(4);  
    observer.complete();  
    
    }, 1000); });

  • được gửi đến bởi Observable.
  • Subscription: là kết quả có được sau khi thực hiện một Observable, nó thường dùng cho việc hủy việc tiếp tục xử lý.
  • Operators: là các pure functions cho phép lập trình functional với Observable.
  • Subject: để thực hiện việc gửi dữ liệu đến nhiều Observers (multicasting).
  • Schedulers: một scheduler sẽ điều khiển khi nào một subscription bắt đầu thực thi, và khi nào sẽ gửi tín hiệu đi. (Trong bài này chúng ta sẽ không nói về phần này).

2. Array Trong JavaScript

Trước khi bắt đầu với Observable, chúng ta sẽ ôn lại một số kiến thức về Array sẽ giúp ích trong việc tiếp cận Observable.

2.1 Array forEach

Array

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

01 là một trong các cách để ta có thể lặp qua lần lượt từng phần tử trong mảng.

const arr = [1, 2, 3, 4, 5];

arr.forEach((item, index) => {
  console.log(index + ' => ' + item);
});

Kết quả nhận được của chúng ta như sau:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

Ngoại trừ một điểm chúng ta cần lưu ý khi các phần tử là kiểu reference thay vì kiểu primitive, thì

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

01 có thể khiến các phần tử của array ban đầu thay đổi giá trị.

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

2.2 Array map

Array

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

03 cho phép chúng ta lặp qua tất cả các phần tử trong mảng, áp dụng một function nào đó lên các phần tử để biến đổi, sau đó trả về một mảng các giá trị sau khi thực hiện function.

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

2.3 Array filter

Array

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

04 cho phép chúng ta lặp qua tất cả các phần tử trong mảng, áp dụng một function nào đó lên các phần tử để kiểm tra, sau đó trả về một mảng các giá trị sau khi thực hiện hàm kiểm tra mà thỏa mãn điều kiện (

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

  1. và giữ nguyên mảng cũ không bị ảnh hưởng.

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

2.4 Array reduce

Method

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

06 cho phép chúng ta lặp qua tất cả các phần tử và áp dụng một function nào đó vào mỗi phần tử, function này có các tham số:

  • "0 => 1" "1 => 2" "2 => 3" "3 => 4" "4 => 5"

    07: giá trị trả về từ các lần call callback trước.
  • "0 => 1" "1 => 2" "2 => 3" "3 => 4" "4 => 5"

    08: giá trị của phần tử hiện tại trong array.
  • "0 => 1" "1 => 2" "2 => 3" "3 => 4" "4 => 5"

    09: index của phần tử hiện tại.
  • "0 => 1" "1 => 2" "2 => 3" "3 => 4" "4 => 5"

    10: chính là mảng hiện tại.

Ngoài ra, chúng ta còn có thể cung cấp giá trị ban đầu

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

11 sau tham số function đầu tiên.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

2.5 Flatten Array

Trong nhiều tình huống, chúng ta có các array, bên trong mỗi phần tử có thể là các array khác, lúc này chúng ta có nhiệm vụ làm giảm số chiều (flatten) đi chẳng hạn, chúng ta có thể có đoạn code xử lý sau trong JavaScript.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

Như ở ví dụ trên, chúng ta flat mảng con 2 chiều thành 1 chiều, và chúng ta có thể flat nhiều lần để mỗi lần sẽ giảm đi 1 chiều.

Điều này các bạn sẽ hay gặp khi làm việc với Observable trả về Observable trong các phần tiếp theo.

3. Producer vs Consumer, Push vs Pull

Pull and Push are two different protocols how a data Producer can communicate with a data Consumer.

OK, chúng ta lại có một số khái niệm mới:

Producer: là nguồn sản sinh ra data.

Consumer: là nơi

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

12 data sản sinh từ Producer.

Pull systems: Consumer sẽ quyết định khi nào lấy data từ Producer. Producer không quan tâm khi nào data sẽ được gửi đến cho Consumer.

Các

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

13 trong JavaScript là một Pull system. Khi nào lời gọi hàm thì khi đó mới xử lý. Gọi

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

14 lần thì xử lý

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

14 lần.

Lưu ý: function chỉ trả về 1 giá trị sau khi lời gọi hàm được thực hiện. (một mảng cũng chỉ coi là 1 giá trị, vì nó được trả về 1 lần).

Push systems: Producer sẽ quyết định khi nào gửi dữ liệu cho Consumer. Consumer không quan tâm khi nào nhận được data.

Promise, DOM events là các Push systems. Chúng ta register các callbacks và khi event phát sinh, các callbacks sẽ được gọi với dữ liệu từ Producer truyền vào.

Chúng ta có một bảng so sánh như sau:

ProducerConsumerPullPassive: produces data when requested.Active: decides when data is requested.PushActive: produces data at its own pace.Passive: reacts to received data.

Ví dụ:

Pull

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

Push

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

4. Observable

Lưu ý, để code theo các ví dụ trong bài học này, bạn có thể clone bin sau:

Rxjs starter - jsbin

Vậy Observable là gì?

Observable chỉ là một function (class) mà nó có một số yêu cầu đặc biệt. Nó nhận đầu vào là một Function, mà Function này nhận đầu vào là một Observer và trả về một function để có thể thực hiện việc cancel quá trình xử lý. Thông thường (Rxjs 5) chúng ta đặt tên function đó là

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

16.

Observer: một object có chứa các phương thức

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

8,

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

6 và

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7 để xử lý dữ liệu tương ứng với các

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

20 được gửi từ Observable.

Observables are functions that tie an observer to a producer. That’s it. They don’t necessarily set up the producer, they just set up an observer to listen to the producer, and generally return a teardown mechanism to remove that listener. The act of subscription is the act of “calling” the observable like a function, and passing it an observer.

Nói theo cách lý thuyết hơn:

Observables are lazy Push collections of multiple values.

Như vậy, chúng ta có thể thấy Observable là lazy computation, giống như function, nếu chúng ta tạo chúng ra mà không gọi, thì không có gì thực thi cả.

Tạm thời bỏ qua các chi tiết về hàm

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

21 dưới đây, chúng ta sẽ gặp lại nó khi tìm hiểu về Operator:

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

Đến đây, nếu chúng ta không gọi hàm

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

22, hoặc không

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

23 đến

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

24 thì chẳng có gì xảy ra cả.

Để thực thi, chúng ta sẽ làm như sau:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

0

Và sau đây là kết quả chúng ta nhận được:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

1

Observable có thể deal với cả sync và async.

Observables are able to deliver values either synchronously or asynchronously.

5. Làm Quen Với Observable

Với Observable chúng ta sẽ quan tâm đến các thao tác như sau:

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observables

5.1 Creating Observables

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

25 là một operator, nó chỉ là một alias cho

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

26 constructor, chúng ta hoàn toàn có thể thay thế tương ứng bằng cách gọi constructor cũng cho kết quả tương tự.

Đầu vào của

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

27 yêu cầu một hàm gọi là

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 mà hàm này có đầu vào là một

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

29 object.

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

2

Bạn hoàn toàn có thể sử dụng

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

30 hoặc

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

25.

Ngoài operator như

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

21, Rxjs mang đến cho bạn nhiều lựa chọn khác nhau để tạo mới một Observable như các operators:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

33,

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

34,

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

35, etc. Chúng được đặt trong nhóm creation operators.

Ví dụ, bạn muốn tạo Observable cho một mảng các giá trị, lúc này bạn không cần dùng

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

25 rồi lặp qua các phần tử, xong gọi

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

8 nữa. Rxjs có cách dùng khác, vì đây là một usecase rất hay dùng và

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

21 là một low-level API.

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

3

5.2 Subscribing to Observables

Sau khi đã tạo xong một Observable, chúng ta cần

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

23 bằng cách

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 vào như sau:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

4

Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.

Vậy nên chúng ta call một function

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

14 lần, chúng ta sẽ có

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

14 lần thực thi. Tương tự như thế, khi chúng ta

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 vào một Observable

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

44 lần, thì có

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

44 lần thực thi, một lời gọi

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 giống như một cách để Observable bắt đầu thực thi.

5.3 Executing Observables

Phần code khi chúng ta khởi tạo Observable

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

47 chính là “Observable execution”. Giống như khai báo một function, phần code này để thực hiện một số hành động, xử lý nào đó; chúng là

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

48 - chỉ thực thi khi Observer thực hiện subscribe.

Có ba kiểu giá trị mà một Observable Execution có thể gửi đi:

  • “Next” notification: gửi đi một giá trị, có thể là bất kỳ kiểu dữ liệu nào như Number, a String, an Object, etc.
  • “Error” notification: gửi đi một JavaScript Error hoặc exception.
  • “Complete” notification: không gửi đi một giá trị nào, nhưng nó gửi đi một tín hiệu để báo rằng stream này đã completed, mục đích để Observer có thể thực hiện một hành động nào đó khi stream completed.

Next notifications thường được sử dụng rộng rãi, nó cực kỳ quan trọng, vì nó gửi đi dữ liệu cần thiết cho một Observer.

Error và Complete notifications có thể chỉ xảy ra duy nhất một lần trong một Observable Execution. Lưu ý rằng, chỉ có 1 trong 2 loại tín hiệu trên được gửi đi, nếu đã complete thì không có error, nếu có error thì không có complete. (Chúng không thuộc về nhau :D). Và nếu đã gửi đi complete, hoặc error signal, thì sau đó không có dữ liệu nào được gửi đi nữa. Tức là stream đã close.

In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards.

Ví dụ:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

5

Khi Subscribe vào observable được tạo ở trên, bạn có thể thấy được kết quả như sau:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

6

Lưu ý rằng trong ví dụ trên, tôi đã

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

23 Observable bằng cách

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 với 3 callbacks riêng biệt cho 3 loại signals tương ứng, về mặt xử lý phía sâu bên trong sẽ convert về Observer object có dạng.

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

7

5.4 Disposing Observable Executions

Bởi vì quá trình thực thi Observable có thể lặp vô hạn, hoặc trong trường hợp nào đó bạn muốn thực hiện hủy việc thực thi vì việc này không còn cần thiết nữa - dữ liệu đã lỗi thời, có dữ liệu khác thay thế. Các bạn có thể liên tưởng tới việc close websocket stream, removeEvenListener cho một element nào đó đã bị loại bỏ khỏi DOM chẳng hạn.

Observable có cơ chế tương ứng, cho phép chúng ta hủy việc thực thi. Đó là khi

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 được gọi, một Observer sẽ bị gắn với một Observable execution mới được tạo, sau đó nó sẽ trả về một object thuộc type Subscription. Kiểu dữ liệu này có một method

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

16 khi chúng ta gọi đến, nó sẽ thực hiện cơ chế để hủy việc thực thi.

Lưu ý: nếu bạn tạo Observable bằng

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

21 hoặc

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

30 thì bạn phải tự thiết lập cơ chế để hủy.

When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.

Ví dụ:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

8

6. Observer

Observer là một Consumer những dữ liệu được gửi bởi Observable. Observer là một object chứa một tập 3 callbacks tương ứng cho mỗi loại notification được gửi từ Observable:

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

8,

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

6,

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7.

Một Observer có dạng như sau:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

9

Observer được cung cấp là tham số đầu vào của

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 để kích hoạt Observable execution.

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

0

Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver.

Observe có thể chỉ có một số callbacks trong bộ 3 callbacks kể trên (có thể là một object không có callback nào trong bộ kể trên, trường hợp này ít dùng đến).

Như tôi đã đề cập từ trước,

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

59 sẽ chuẩn hóa các callbacks thành Observer object tương ứng, bạn có thể truyền vào các hàm rời rạc nhau, nhưng cần lưu ý truyền đúng thứ tự callback.

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

1

Lưu ý: Nếu bạn không muốn truyền error handler function vào, hãy truyền

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

60/

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

61:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

2

7. Subscription

Subscription là một object đại diện cho một nguồn tài nguyên có khả năng hủy được, thông thường trong Rxjs là hủy Observable execution. Subscription có chứa một method quan trọng

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

16 (từ Rxjs 5), khi method này được gọi, execution sẽ bị hủy.

Ví dụ: chúng ta có một đồng hồ đếm thời gian, mỗi giây sẽ gửi đi một giá trị, giả sử sau khi chạy 5s chúng ta cần hủy phần thực thi này.

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

3

A Subscription essentially just has an unsubscribe() function to release resources or cancel Observable executions.

Một Subscription có thể chứa trong nó nhiều Subscriptions con, khi Subscription

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

16, các Subscriptions con cũng sẽ

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

16.

Ở Subscription cha, chúng ta có thể gọi method

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

65 để thêm các Subscriptions con mà phụ thuộc Subscription cha này.

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

4

8. Cold Observables vs Hot Observables

Cold Observables: Producers created inside

Một observable là “cold” nếu Producer được tạo ra và quản lý bên trong nó.

Ví dụ:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

5

Kết quả là sau 1s thì lần

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 đầu tiên có in ra 15, và lần

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 thứ 2 in ra 5. Chúng không có cùng giá trị của

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

68.

Giờ thay đổi chút bằng việc move khai báo biến

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

68 ra ngoài

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

21:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

6

Lần này, sau 1s thì cả 2 execution đều in ra giá trị là 15.

Đây chính là ví dụ về Hot Observables.

Hot Observables: Producers created outside

9. Subject

Giả sử chúng ta có đoạn code sau đây:

Ở đây mình mong muốn sau khi

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

71 chạy 1.5s thì

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

72 sẽ nhận được giá trị hiện tại mà

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

71 đang nhận.

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

7

Kết quả khi chạy chương trình:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

8

Oh well,

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

71 và

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

72 tạo ra các execution khác nhau của riêng chúng, chúng ta không kết nối chúng đến với nhau theo cách trên được.

Có cách nào để share execution giữa nhiều Observers không?

Lúc này, chúng ta cần đến một object làm cầu nối ở giữa để làm nhiệm vụ tạo ra Observable execution và share dữ liệu ra cho những Observers khác.

OK, tiến hành thêm một số code nữa.

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

9

Object

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

76 có chưa method

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

77 đây chính là khuôn mẫu của “Observer-Pattern”.

Giờ đây chúng ta có thể có được kết quả như mong muốn.

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

0

Bây giờ, nếu chúng ta thay đổi một chút:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

1

Giờ bạn có thể thấy,

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

76 trông giống như một Observable, nó lại còn giống như Observer, hơn nữa nó có thể gửi signals cho nhiều Observers mà nó đang quản lý. Đây chính là cấu trúc hybrid của Subject.

A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.

Mỗi Subject là một Observable: bạn có thể

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 vào nó, cung cấp cho nó một Observer và bạn có thể nhận data tương ứng.

Mỗi Subject là một Observer: bên trong Subject có chứa các method

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

8,

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

6,

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7 tương ứng để bạn có thể

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

28 vào Observable chẳng hạn. Khi cần gửi dữ liệu cho các Observers mà Subject đang quản lý, bạn chỉ cần gọi hàm tương ứng.

Ví dụ:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

2

Hoặc truyền vào một Observable:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

3

Với phương pháp kể trên, chúng ta đã cơ bản chuyển đổi từ một unicast Observable execution sang multicast, bằng cách sử dụng Subject.

unicast: giống như bạn vào Youtube, mở video nào đó đã được thu sẵn và xem, nó play từ đầu đến cuối video. Một người khác vào xem, Youtube cũng sẽ phát từ đầu đến cuối như thế, hai người không có liên quan gì về thời gian hiện tại của video mà mình đang xem.

multicast: cũng là hai người (có thể nhiều hơn) vào xem video ở Youtube, nhưng video đó đang phát Live (theo dõi một show truyền hình, hay một trận bóng đá Live chẳng hạn). Lúc này Youtube sẽ phát video Live, và những người vào xem video đó sẽ có cùng một thời điểm của video đó (cùng thời điểm của trận đấu đang diễn ra chẳng hạn).

9.1 BehaviorSubject

Một trong các biến thể của Subject đó là BehaviorSubject, nó là biến thế có khái niệm về “the current value”. BehaviorSubject lưu trữ lại giá trị mới emit gần nhất để khi một Observer mới subscribe vào, nó sẽ emit giá trị đó ngay lập tức cho Observer vừa rồi.

BehaviorSubjects are useful for representing “values over time”. For instance, an event stream of birthdays is a Subject, but the stream of a person’s age would be a BehaviorSubject.

Hay như sử dụng BehaviorSubject để chia sẻ thông tin user hiện tại đang đăng nhập hệ thống cho các component khác nhau trong Angular chẳng hạn.

Lưu ý: BehaviorSubject yêu cầu phải có giá trị khởi tạo khi tạo ra subject.

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

4

9.2 ReplaySubject

Một ReplaySubject tương tự như một BehaviorSubject khi nó có thể gửi những dữ liệu trước đó cho Observer mới subscribe, nhưng nó có thể lưu giữ nhiều giá trị (có thể là toàn bộ giá trị của stream từ thời điểm ban đầu).

Tham số đầu vào của ReplaySubject có thể là:

buffer: là số lượng phần tử tối đa có thể lưu trữ.

windowTime: (ms) thời gian tối đa tính đến thời điểm gần nhất emit value.

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

5

Hoặc kết hợp buffer với windowTime:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

6

Trong ví dụ trên sau 1s chỉ có giá trị 3, 4 và 5 là được emit trong 500ms gần nhất và nằm trong buffer nên được replay lại cho

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

76.

9.3 AsyncSubject

Đây là biến thể mà chỉ emit giá trị cuối cùng của Observable execution cho các observers, và chỉ khi execution complete.

Lưu ý:

Nếu stream không complete thì không có gì được emit cả.

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

7

9.4 Subject Complete

Khi BehaviorSubject complete, thì các Observers subscribe vào sau đó sẽ chỉ nhận được

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7 signal.

Khi ReplaySubject complete, thì các Observers subscribe vào sau đó sẽ được emit tất cả các giá trị lưu trữ trong buffer, sau đó mới thực thi

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7 của Observer.

Kể cả khi AsyncSubject complete rồi, Observer vẫn có thể subscribe vào được và vẫn nhận giá trị cuối cùng.

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

8

10. Operators

Trong Rxjs chúng ta thấy có vô số các method/static method được cung cấp, khiến nó trở nên vô cùng mạnh mẽ và hữu dụng.

Vậy Operator là gì?

An Operator is a function which creates a new Observable based on the current Observable. This is a pure operation: the previous Observable stays unmodified.

Operator là một pure function. Với cùng một giá trị đầu vào, chúng ta sẽ luôn có cùng một giá trị ở đầu ra.

Ví dụ:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

9

Operator nhận đầu vào là một Observable, sau đó xử lý và tạo mới mội Observable để trả về, và giữ Observable đầu vào không bị thay đổi gì.

Ví dụ, chúng ta tạo một Operator nhận đầu vào là một function, giống như Array

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

03 chẳng hạn.

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

0

Bây giờ chúng ta có thể tạo một Observable như sau:

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

1

10.1 Instance operators vs static operators

Thông thường static operators là các operators được gắn với

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

88 Observable, chúng được dùng phổ biến để tạo mới Observable.

Ví dụ như các operator

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

33,

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

34,

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

35,

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

92,

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

93, etc.

Instance operators: các operators này gắn với một instance của

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

88 Observable.

Giả sử chúng ta tạo operator

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

95 như sau, thì kết quả cũng được tương tự như trên:

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

2

Instance operators are functions that use the this keyword to infer what is the input Observable. Static operators are pure functions attached to the Observable class, and usually are used to create Observables from scratch.

Operators được chia vào nhiều nhóm khác nhau, mỗi nhóm có một mục đích sử dụng:

  • Creation Operators.
  • Transformation Operators.
  • Filtering Operators.
  • Combination Operators.
  • Error Handling Operators.
  • Utility Operators.
  • Multicasting Operators.
  • Conditional and Boolean Operators.
  • Mathematical and Aggregate Operators

Sau đây chúng ta sẽ tìm hiểu một số operators chính hay dùng đến.

10.2 Creation Operators

Từ đầu đến giờ chúng ta đã quen với sử dụng

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

21 operator để tạo ra Observable, ngoài ra chúng ta còn có một số operators khác để tạo Observable từ những common usecase.

Ví dụ để tạo Observable từ một số static values, chúng ta có thể dùng

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

33:

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

3

Hoặc có thể dùng

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

33 với nhiều tham số đầu vào:

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

4

Lưu ý: với các static values như ví dụ trên, chúng được gửi đi sync. (mặc đinh nếu không có tác động của Scheduler).

Sau khi gửi đi hết dữ liệu, nó sẽ gửi

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7 signal.

Một số operators tương tự:

  • interval: thay vì tự tạo interval với setInterval, chúng ta dùng operator này cho mục đích tương tự.
  • timer: giống như setTimeout, sau một khoảng thời gian nào đó thì emit value.
  • throw: để bắn ra error.
  • empty: không gửi gì khác ngoài

    function getDetail() { console.log('tiepphan.com'); return 100; } const observable = Rx.Observable.create(function (observer) { console.log('Rxjs và Reactive Programming'); observer.next(1); observer.next(2); observer.next(3); setTimeout(() => {

    observer.next(4);  
    observer.complete();  
    
    }, 1000); });

    7 signal.
  • never: không gửi bất kỳ loại signals nào.

Tổng quát cho các loại event thì chúng ta có

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

01:

Converts any addHandler/removeHandler API to an Observable.

Ví dụ:

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

5

Để làm việc với DOM event chúng ta có

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

02 operator, nó là bản chi tiết của

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

01:

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

6

Khi click vào trang web, chúng ta có thể thấy console hiển thị tọa độ X ra.

Để convert từ Promise sang Observable chúng ta có

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

92 operator:

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

7

Tạo mới một Observable từ Array, array-like object, Promise, iterable object, hoặc Observable-like object chúng ta dùng

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

34 operator:

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

8

repeat:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

06 Repeat khi nào Observable complete, có thể giới hạn số lần repeat.

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

9

Còn nhiều các operators khác nữa như

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

07, các bạn vào trang chủ của ReactiveX để xem thêm.

10.3 Transformation Operators

Transformation Operators dùng để chuyển đổi giá trị của Observable từ dạng này sang sang dạng khác.

Hay gặp nhất có lẽ là

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

03, nó tương tự như

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

95 operator mà chúng ta đã tạo ở trên (tạo để tìm hiểu cơ chế hoạt động).

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

0

Các bạn có thể dùng

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

03 để chuyển đổi theo bất kỳ yêu cầu nào của các bạn. Ví dụ khi làm việc với Response object của HTTP, lúc này các bạn có thể convert từ JSON thành object như sau:

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

1

Khi bạn muốn luôn luôn trả về 1 giá trị cho các giá trị được emit, lúc này bạn có thể dùng

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

11 operator.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

2

scan:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

12

Giống như Array

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

06.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

3

buffer:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

14

Lưu trữ giá trị được emit ra và đợi đến khi

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

15 emit thì emit những giá trị đó thành 1 array.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

4

bufferTime:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

16

Tương tự như

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

17, nhưng emit values mỗi khoảng thời gian

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

18 ms.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

5

Ngoài ra còn rất nhiều operators khác như

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

17, và một số Operators chúng ta sẽ tìm hiểu ở phần sau “Higher Order Observables”.

10.4 Filtering Operators

Filtering Operators mục đích để filter các giá trị được emit thỏa mãn điều kiện nào đó.

filter:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

20

Giống như Array filter, operator này chỉ emit các value thỏa mãn

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

21 function. Nếu không có phần tử nào thỏa mãn thì không emit giá trị

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

8 nào cả.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

6

take:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

23

Emit tối đa

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

24 giá trị, rồi complete.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

7

takeUntil:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

25

Emit value cho đến khi

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

26 emit thì dừng.

Rất hữu ích khi làm việc với Angular, các bạn có thể sử dụng để tạo ra trình auto-unsubscribe cho Observable. Vì khi complete thì nó tự

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

16.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

8

takeWhile:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

28

Còn emit value khi mà

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

29 còn trả về

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

30. Một khi

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

29 trả về

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

32 thì dừng lại, complete.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

9

skip:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

33

Skip số lượng phần tử từ đầu stream đến

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

34, các giá trị emit sau đó mới được emit.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

0

Tương tự như

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

35,

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

36, nhưng hành động skip thay vì take, chúng ta cũng có

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

37,

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

38

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

39 chúng ta có một cách viết khác

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

40:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

41

Nếu không truyền vào

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

29 thì nó sẽ lấy phần tử đầu tiên, ngược lại sẽ lấy phần tử đầu tiên thỏa mãn

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

29.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

1

Ngược lại với

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

40 chúng ta sẽ có

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

45:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

46.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

2

throttleTime:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

47

Emit giá trị mới nhất khi một khoảng thời gian

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

48 trôi qua.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

3

Rất hữu ích khi làm việc với các event fire liên tục như

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

49, hay

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

50 chẳng hạn.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

4

Sử dụng với

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

51 để hạn chế event fire quá nhiều. Cứ sau 300ms thì emit giá trị mới nhất có được.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

5

debounceTime:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

52

Discard emitted values that take less than the specified time between output

Hay dùng khi bạn muốn đảm bảo người dùng thôi thao tác sau một khoảng thời gian

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

53 thì emit giá trị. Giống như đảm bảo người dùng ngừng di chuyển chuột, hay ngừng gõ vào input chẳng hạn, để tránh phát sinh nhiều hành động không cần thiết.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

6

distinctUntilChanged:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

54

Chỉ emit value khi giá trị hiện tại khác với giá trị đã emit trước đó. Hay dùng để đảm bảo dữ liệu phải khác nhau thì mới thực thi hành động. Kết hợp cùng

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

55 hoặc

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

51 để làm việc khi người dùng nhập vào input, sau đó xử lý giá trị, và chỉ xử lý khi dữ liệu hiện tại khác với dữ liệu đã xử lý trước đó.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

7

Khi bạn không di chuyển chuột mà click liên tục thì không có gì được emit cả.

Trên đây chỉ là một số Filter Operators hay dùng, các bạn có thể vào trang chủ của ReactiveX để tìm hiểu thêm những operators như

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

57,

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

58, etc.

10.5 Combination Operators

Combination Operators: sử dụng để kết hợp các Observables lại với nhau.

merge: có thể sử dụng cả instance và static operator. Dùng để merge nhiều Observables thành một Observable.

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

59

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

60

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

61: số lượng stream được phép chạy đồng thời.

So sánh rxjava 1 và 2 năm 2024
Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

8

Lúc này cả 3 cùng chạy đồng thời, nếu bạn muốn dùng instance operator thì thay như sau:

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

9

Giả sử bạn muốn chỉ 2 stream được chạy đồng thời:

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

0

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

62 sẽ đợi cho

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

63,

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

64 complete rồi mới bắt đầu chạy.

concat: trong trường hợp bạn chỉ muốn 1 stream được chạy, các stream khác phải đợi stream trước complete.

Tương đương với

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

65. Giống như Array

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

66 vậy.

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

1

Trong nhiều trường hợp bạn muốn tạo ra 1 Observable cho 1 giá trị, rồi dùng nó

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

66 với một số Observables khác, lúc này chúng ta có một cách viết gọn hơn sử dụng

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

68 với cú pháp

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

69.

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

2

combineLatest:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

70

combineLatest Operator có thể dùng cả static và instance.

Mỗi khi một Observable emit value, emit giá trị mới nhất từ tất cả

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

71, nhưng phải thỏa mãn các Observable khác cũng đã emit value.

Như trong ví dụ sau, khi

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

72 emit value, nó emit giá trị của

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

63 lúc này là

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

74, nhưng

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

62 lúc này mới emit value đầu tiên.

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

3

Hoặc trường hợp bạn muốn xử lý giá trị trước bằng

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

76 function:

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

4

So sánh rxjava 1 và 2 năm 2024

withLatestFrom:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

77

Lưu ý: chỉ có thể sử dụng instance operator với withLatestFrom.

Sử dụng để emit value của Observable này

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

78 kết hợp với latest value của

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

79 Observable, nếu

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

79 chưa emit gì thì

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

78 có emit value cũng không emit gì cả.

So sánh rxjava 1 và 2 năm 2024
const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

5

Stream

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

63 bị phụ thuộc vào

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

64 trong giai đoạn đầu tiên, vì khi đó

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

64 chưa emit gì, nên

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

63 cũng không emit, đến khi

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

64 bắt đầu emit thì những lần emit sau đó của

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

63 sẽ kết hợp với giá trị mới nhất của

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

64.

Nếu bạn muốn mỗi khi có giá trị của stream emit thì emit giá trị mới nhất của các stream khác, lúc này bạn nên dùng

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

89. Chẳng hạn, bạn có search box với

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

14 fields để thiết lập điều kiện, và khi mỗi field thay đổi thì sẽ lấy danh sách mới dựa theo list các điều kiện kia chẳng hạn.

forkJoin:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

91

Khi tất cả các Observables complete, emit giá trị cuối cùng của mỗi Observable.

Lưu ý: nếu còn một stream không complete, hoặc 1 stream không emit value nào

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

93 thì không có gì được emit cả.

Chỉ có thể sử dụng static operator.

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

6

Không emit gì cả ngoài

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7:

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

7

zip:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

94

Sau khi tất cả Observables emit value, thì emit 1 mảng các giá trị tương ứng (cùng index). Nếu một phần tử không emit gì cả

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

95, hoặc complete ngay

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

93, thì không có gì emit cả. Trường hợp

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

93 thì chỉ send

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7 signal.

The zip operator will subscribe to all inner observables, waiting for each to emit a value. Once this occurs, all values with the corresponding index will be emitted. This will continue until at least one inner observable completes.

So sánh rxjava 1 và 2 năm 2024
const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

8

10.5 Error Handling Operators

Error Handling Operators sử dụng để handle error trong ứng dụng của bạn.

catch:

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

99

Thường được sử dụng rộng rãi để handle error.

Ví dụ:

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();
{value: 1, done: false}
iter.next();
{value: 2, done: false}
iter.next();
{value: 3, done: false}
iter.next();
{value: 4, done: false}
iter.next();
{value: undefined, done: true}

9

retry:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

00

Sử dụng để restart lại stream trong trường hợp stream bị văng ra lỗi. Có thể giới hạn số lần retry bằng cách truyền vào tham số. Nếu không truyền vào, sẽ luôn retry mỗi khi lỗi văng ra.

Chẳng hạn khi bạn reqest lên server để lấy dữ liệu, giả sử trong quá trình này bạn muốn request lại 2 lần nếu bị văng ra lỗi.

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

0

retryWhen:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

01

Retry một Observable dựa vào một

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

02 Observable emit value.

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

02 Observable này chỉ emit

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

8 khi

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

72 Observable văng ra error, khi đó chúng ta có thể xác định khi nào lại subscribe vào

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

72 Observable lần nữa.

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

1

10.6 Utility Observables

do:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

07

Dùng để thực hiện một hành động nào đó, và đảm bảo side-effect không ảnh hưởng tới source. Như ví dụ dưới đây, chúng ta đã thực hiện update value cho

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

08 trong

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

09, nhưng không bị ảnh hưởng gì.

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

2

delay:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

10

Delay emit value theo một khoảng thời gian cho trước.

Ví dụ chúng ta muốn delay 1s rồi mới bắt đầu emit value cho stream sau:

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

3

Và còn nhiều operators khác như

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

11 trên trang chủ của ReactiveX.

10.7 Higher Order Observables

Higher Order Observable (HOO) là một Observable trả về một Observable, nó giống như mảng nhiều chiều vậy - mảng 2 chiều chứa trong nó các mảng 1 chiều.

Ví dụ:

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

4

Oh, nhìn thế này không ổn, phải có cách nào khác. Hãy quay lại ví dụ flatten Array ở trên, Observable cũng cung cấp cho chúng ta giải pháp để thực hiện flatten HOO.

mergeMap:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

12

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

5

Hay có một cách viết ngắn gọn hơn, là alias của cách viết ở trên:

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

6

Bây giờ chúng ta thay đổi 1 chút, cứ mỗi lần click vào

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

13, chúng ta sẽ start stream

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

35:

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

7

Khi bạn click 3 lần chẳng hạn, sẽ tạo ra 3 stream chạy đồng thời (nếu cái trước chưa complete thì cái mới vẫn start), lúc này tương tự như

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

15, chúng ta có thể control có bao nhiêu stream được chạy đồng thời.

Ví dụ chúng ta giới hạn có 2 stream chạy đồng thời.

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

8

Tham số thứ 2 của

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

16 được đặt tên là

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

17, tham số này cho phép chúng ta truyền vào 1 hàm để xử lý inner và outer Observable value.

Ví dụ, mình muốn lấy về tọa độ

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

18,

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

19 của stream

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

20 kèm theo giá trị emit của

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

35:

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

9

Lưu ý:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

22 là một alias cho

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

16

concatMap:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

24

Khi chỉ cho phép 1 stream được chạy, các stream khác phải đợi stream trước complete mới được chạy - tương tự

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

66 thì chúng ta có operator

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

26.

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

0

Khi bạn click vào document nhiều lần thì chỉ có 1 stream

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

35 chạy tại 1 thời điểm, những stream khác phải đợi stream trước complete mới thực thi.

Tham số

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

17 cũng giống như

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

16.

switchMap:

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

30

Chỉ cho phép 1 stream được chạy, nếu stream trước chưa

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

7 thì

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

16.

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

33 rất hữu ích khi làm việc với HTTP request, giả sử request cũ chưa trả về giá trị mà người dùng đã request dữ liệu mới thì chúng ta có thể cancel để tránh việc dư thừa hoặc xử lý sai dữ liệu (dữ liệu cũ hơn response sau dữ liệu mới hơn). Khi cần đảm bảo chỉ có stream mới nhất đang chạy.

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

1

Khi bạn click vào document nhiều lần, nếu để ý bạn sẽ thấy những stream trước đó chưa complete sẽ bị cancel và start stream mới.

Ngoài ra nó còn hữu ích để tránh việc bị leak. Giả sử stream

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

35 kia chạy vô hạn, nếu bạn không cancel trước đó thì chẳng mấy chốc ứng dụng của bạn sẽ crash khi người dùng click càng ngày càng nhiều lần.

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

2

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

33 cũng có

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

17 cho phép chúng ta tùy chọn dữ liệu cần emit.

Lưu ý: Trong hầu hết các trường hợp nếu bạn không có ý định gì đặc biệt thì nên dùng

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

33 thay vì

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

16.

11. Tổng kết

Trong bài này chúng ta đã tìm hiểu rất nhiều vấn đề về Reactive Programming với Rxjs, nhưng vẫn còn thiếu phần Multicasting, hi vọng mình sẽ làm xong phần đó trong thời gian tới.

Marble diagrams các bạn có thể theo dõi trên trang chủ ReactiveX.

Observable vs Promise: Khả năng mở rộng của Observable là rất lớn, nó bao quát được nhiều vấn đề mà Promise còn thiếu, như emit multiple values, hay cancel execution. Hơn nữa, nó đang trong giai đoạn phát triển để trở thành standard của EcmaScript (hiện tại đang ở stage-1), nên việc sử dụng Observable thay vì Promise có lẽ là điều chúng ta nên cân nhắc.

Các bạn quan tâm về Rxjs có thể theo dõi thêm các phần khác ở trong link tham khảo cuối bài.

Sau đây là một phần bonus để các bạn tạo ra một

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

39 mà nó chịu trách nhiệm tự

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

16 các subscription trong ứng dụng của bạn.