So sánh rxjava 1 và 2 năm 2024
Hẳn các bạn vẫn còn nhớ trong một số bài trước chúng ta có nói về Observable trong ứng dụng Angular, vậy Observable là gì, nó có quan hệ gì với Angular, làm thế nào để sử dụng Observable hiệu quả trong ứng dụng của bạn. Trong bài này chúng ta sẽ cùng tìm hiểu về Observable, về Rxjs, Reactive Programming. Show
1. Giới thiệuAngular đi kèm với một dependency là Rxjs giúp cho nó trở nên reactive, một ứng dụng Angular là một reactive system. Dễ thấy nhất ở đây chính là EventEmitter, hay Reactive Forms mà chúng ta đã tìm hiểu trong các bài học trước. Vậy Reactive Programming (RP) là gì? Điều gì khiến nó trở thành một chủ đề hot như vậy… Hiện tại, có cả tá định nghĩa về RP, nhưng mình thấy định nghĩa sau đây là bao quát tốt vấn đề: Reactive programming is programming with asynchronous data streams Vâng đúng vậy, đây là phương pháp lập trình xoay quanh data streams và nó deal với các vấn đề của asynchronous. Nhưng bạn đừng hiểu lầm, nó có thể deal với cả synchronous nữa. Bạn có thể tưởng tượng data streams như hình sau, với
4 được gửi đến trong suốt dòng thời gian của một stream (over time), giống như một array có các phần tử được gửi đến lần lượt theo thời gian. Và chúng ta có thể coi mọi thứ là stream: single value, array, event, etc. Không những thế, khi thao tác với stream, chúng ta có thể có
5,
6, hay
7 signals. Đây là điều mà các API trước đây của các hệ thống event trong JavaScript còn thiếu, chúng có qua nhiều interface khác nhau cho các loại event khác nhau, Observable sinh ra để tổng quát hóa các interface đó lại. Và Rxjs giúp chúng ta có được reactive trong lập trình ứng dụng JavaScript: Rxjs is a library for composing asynchronous and event-based programs by using observable sequences. Các Concepts nền tảng của Rxjs bao gồm: An Observable is a collection that arrives over time
2. Array Trong JavaScriptTrước khi bắt đầu với Observable, chúng ta sẽ ôn lại một số kiến thức về Array sẽ giúp ích trong việc tiếp cận Observable. 2.1 Array forEachArray
01 là một trong các cách để ta có thể lặp qua lần lượt từng phần tử trong mảng.
Kết quả nhận được của chúng ta như sau:
Ngoại trừ một điểm chúng ta cần lưu ý khi các phần tử là kiểu reference thay vì kiểu primitive, thì
01 có thể khiến các phần tử của array ban đầu thay đổi giá trị.
2.2 Array mapArray
03 cho phép chúng ta lặp qua tất cả các phần tử trong mảng, áp dụng một function nào đó lên các phần tử để biến đổi, sau đó trả về một mảng các giá trị sau khi thực hiện function.
2.3 Array filterArray
04 cho phép chúng ta lặp qua tất cả các phần tử trong mảng, áp dụng một function nào đó lên các phần tử để kiểm tra, sau đó trả về một mảng các giá trị sau khi thực hiện hàm kiểm tra mà thỏa mãn điều kiện (
2.4 Array reduceMethod
06 cho phép chúng ta lặp qua tất cả các phần tử và áp dụng một function nào đó vào mỗi phần tử, function này có các tham số:
Ngoài ra, chúng ta còn có thể cung cấp giá trị ban đầu
11 sau tham số function đầu tiên.
2.5 Flatten ArrayTrong nhiều tình huống, chúng ta có các array, bên trong mỗi phần tử có thể là các array khác, lúc này chúng ta có nhiệm vụ làm giảm số chiều (flatten) đi chẳng hạn, chúng ta có thể có đoạn code xử lý sau trong JavaScript.
Như ở ví dụ trên, chúng ta flat mảng con 2 chiều thành 1 chiều, và chúng ta có thể flat nhiều lần để mỗi lần sẽ giảm đi 1 chiều. Điều này các bạn sẽ hay gặp khi làm việc với Observable trả về Observable trong các phần tiếp theo. 3. Producer vs Consumer, Push vs PullPull and Push are two different protocols how a data Producer can communicate with a data Consumer. OK, chúng ta lại có một số khái niệm mới: Producer: là nguồn sản sinh ra data. Consumer: là nơi
12 data sản sinh từ Producer. Pull systems: Consumer sẽ quyết định khi nào lấy data từ Producer. Producer không quan tâm khi nào data sẽ được gửi đến cho Consumer. Các
13 trong JavaScript là một Pull system. Khi nào lời gọi hàm thì khi đó mới xử lý. Gọi
14 lần thì xử lý
14 lần. Lưu ý: function chỉ trả về 1 giá trị sau khi lời gọi hàm được thực hiện. (một mảng cũng chỉ coi là 1 giá trị, vì nó được trả về 1 lần). Push systems: Producer sẽ quyết định khi nào gửi dữ liệu cho Consumer. Consumer không quan tâm khi nào nhận được data. Promise, DOM events là các Push systems. Chúng ta register các callbacks và khi event phát sinh, các callbacks sẽ được gọi với dữ liệu từ Producer truyền vào. Chúng ta có một bảng so sánh như sau: ProducerConsumerPullPassive: produces data when requested.Active: decides when data is requested.PushActive: produces data at its own pace.Passive: reacts to received data. Ví dụ: Pull
Push
4. ObservableLưu ý, để code theo các ví dụ trong bài học này, bạn có thể clone bin sau: Rxjs starter - jsbin Vậy Observable là gì? Observable chỉ là một function (class) mà nó có một số yêu cầu đặc biệt. Nó nhận đầu vào là một Function, mà Function này nhận đầu vào là một Observer và trả về một function để có thể thực hiện việc cancel quá trình xử lý. Thông thường (Rxjs 5) chúng ta đặt tên function đó là
16. Observer: một object có chứa các phương thức
8,
6 và
7 để xử lý dữ liệu tương ứng với các
20 được gửi từ Observable. Observables are functions that tie an observer to a producer. That’s it. They don’t necessarily set up the producer, they just set up an observer to listen to the producer, and generally return a teardown mechanism to remove that listener. The act of subscription is the act of “calling” the observable like a function, and passing it an observer. Nói theo cách lý thuyết hơn: Observables are lazy Push collections of multiple values. Như vậy, chúng ta có thể thấy Observable là lazy computation, giống như function, nếu chúng ta tạo chúng ra mà không gọi, thì không có gì thực thi cả. Tạm thời bỏ qua các chi tiết về hàm
21 dưới đây, chúng ta sẽ gặp lại nó khi tìm hiểu về Operator:
Đến đây, nếu chúng ta không gọi hàm
22, hoặc không
23 đến
24 thì chẳng có gì xảy ra cả. Để thực thi, chúng ta sẽ làm như sau:
0 Và sau đây là kết quả chúng ta nhận được:
1 Observable có thể deal với cả sync và async. Observables are able to deliver values either synchronously or asynchronously. 5. Làm Quen Với ObservableVới Observable chúng ta sẽ quan tâm đến các thao tác như sau:
5.1 Creating Observables
25 là một operator, nó chỉ là một alias cho
26 constructor, chúng ta hoàn toàn có thể thay thế tương ứng bằng cách gọi constructor cũng cho kết quả tương tự. Đầu vào của
27 yêu cầu một hàm gọi là
28 mà hàm này có đầu vào là một
29 object.
2 Bạn hoàn toàn có thể sử dụng
30 hoặc
25. Ngoài operator như
21, Rxjs mang đến cho bạn nhiều lựa chọn khác nhau để tạo mới một Observable như các operators:
33,
34,
35, etc. Chúng được đặt trong nhóm creation operators. Ví dụ, bạn muốn tạo Observable cho một mảng các giá trị, lúc này bạn không cần dùng
25 rồi lặp qua các phần tử, xong gọi
8 nữa. Rxjs có cách dùng khác, vì đây là một usecase rất hay dùng và
21 là một low-level API.
3 5.2 Subscribing to ObservablesSau khi đã tạo xong một Observable, chúng ta cần
23 bằng cách
28 vào như sau:
4 Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to. Vậy nên chúng ta call một function
14 lần, chúng ta sẽ có
14 lần thực thi. Tương tự như thế, khi chúng ta
28 vào một Observable
44 lần, thì có
44 lần thực thi, một lời gọi
28 giống như một cách để Observable bắt đầu thực thi. 5.3 Executing ObservablesPhần code khi chúng ta khởi tạo Observable
47 chính là “Observable execution”. Giống như khai báo một function, phần code này để thực hiện một số hành động, xử lý nào đó; chúng là
48 - chỉ thực thi khi Observer thực hiện subscribe. Có ba kiểu giá trị mà một Observable Execution có thể gửi đi:
Next notifications thường được sử dụng rộng rãi, nó cực kỳ quan trọng, vì nó gửi đi dữ liệu cần thiết cho một Observer. Error và Complete notifications có thể chỉ xảy ra duy nhất một lần trong một Observable Execution. Lưu ý rằng, chỉ có 1 trong 2 loại tín hiệu trên được gửi đi, nếu đã complete thì không có error, nếu có error thì không có complete. (Chúng không thuộc về nhau :D). Và nếu đã gửi đi complete, hoặc error signal, thì sau đó không có dữ liệu nào được gửi đi nữa. Tức là stream đã close. In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification is delivered, then nothing else can be delivered afterwards. Ví dụ:
5 Khi Subscribe vào observable được tạo ở trên, bạn có thể thấy được kết quả như sau:
6 Lưu ý rằng trong ví dụ trên, tôi đã
23 Observable bằng cách
28 với 3 callbacks riêng biệt cho 3 loại signals tương ứng, về mặt xử lý phía sâu bên trong sẽ convert về Observer object có dạng.
7 5.4 Disposing Observable ExecutionsBởi vì quá trình thực thi Observable có thể lặp vô hạn, hoặc trong trường hợp nào đó bạn muốn thực hiện hủy việc thực thi vì việc này không còn cần thiết nữa - dữ liệu đã lỗi thời, có dữ liệu khác thay thế. Các bạn có thể liên tưởng tới việc close websocket stream, removeEvenListener cho một element nào đó đã bị loại bỏ khỏi DOM chẳng hạn. Observable có cơ chế tương ứng, cho phép chúng ta hủy việc thực thi. Đó là khi
28 được gọi, một Observer sẽ bị gắn với một Observable execution mới được tạo, sau đó nó sẽ trả về một object thuộc type Subscription. Kiểu dữ liệu này có một method
16 khi chúng ta gọi đến, nó sẽ thực hiện cơ chế để hủy việc thực thi. Lưu ý: nếu bạn tạo Observable bằng
21 hoặc
30 thì bạn phải tự thiết lập cơ chế để hủy. When you subscribe, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution. Ví dụ:
8 6. ObserverObserver là một Consumer những dữ liệu được gửi bởi Observable. Observer là một object chứa một tập 3 callbacks tương ứng cho mỗi loại notification được gửi từ Observable:
8,
6,
7. Một Observer có dạng như sau:
9 Observer được cung cấp là tham số đầu vào của
28 để kích hoạt Observable execution.
0 Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver. Observe có thể chỉ có một số callbacks trong bộ 3 callbacks kể trên (có thể là một object không có callback nào trong bộ kể trên, trường hợp này ít dùng đến). Như tôi đã đề cập từ trước,
59 sẽ chuẩn hóa các callbacks thành Observer object tương ứng, bạn có thể truyền vào các hàm rời rạc nhau, nhưng cần lưu ý truyền đúng thứ tự callback.
1 Lưu ý: Nếu bạn không muốn truyền error handler function vào, hãy truyền
60/
61:
2 7. SubscriptionSubscription là một object đại diện cho một nguồn tài nguyên có khả năng hủy được, thông thường trong Rxjs là hủy Observable execution. Subscription có chứa một method quan trọng
16 (từ Rxjs 5), khi method này được gọi, execution sẽ bị hủy. Ví dụ: chúng ta có một đồng hồ đếm thời gian, mỗi giây sẽ gửi đi một giá trị, giả sử sau khi chạy 5s chúng ta cần hủy phần thực thi này.
3 A Subscription essentially just has an unsubscribe() function to release resources or cancel Observable executions. Một Subscription có thể chứa trong nó nhiều Subscriptions con, khi Subscription
16, các Subscriptions con cũng sẽ
16. Ở Subscription cha, chúng ta có thể gọi method
65 để thêm các Subscriptions con mà phụ thuộc Subscription cha này.
4 8. Cold Observables vs Hot ObservablesCold Observables: Producers created inside Một observable là “cold” nếu Producer được tạo ra và quản lý bên trong nó. Ví dụ:
5 Kết quả là sau 1s thì lần
28 đầu tiên có in ra 15, và lần
28 thứ 2 in ra 5. Chúng không có cùng giá trị của
68. Giờ thay đổi chút bằng việc move khai báo biến
68 ra ngoài
21:
6 Lần này, sau 1s thì cả 2 execution đều in ra giá trị là 15. Đây chính là ví dụ về Hot Observables. Hot Observables: Producers created outside 9. SubjectGiả sử chúng ta có đoạn code sau đây: Ở đây mình mong muốn sau khi
71 chạy 1.5s thì
72 sẽ nhận được giá trị hiện tại mà
71 đang nhận.
7 Kết quả khi chạy chương trình:
8 Oh well,
71 và
72 tạo ra các execution khác nhau của riêng chúng, chúng ta không kết nối chúng đến với nhau theo cách trên được. Có cách nào để share execution giữa nhiều Observers không? Lúc này, chúng ta cần đến một object làm cầu nối ở giữa để làm nhiệm vụ tạo ra Observable execution và share dữ liệu ra cho những Observers khác. OK, tiến hành thêm một số code nữa.
9 Object
76 có chưa method
77 đây chính là khuôn mẫu của “Observer-Pattern”. Giờ đây chúng ta có thể có được kết quả như mong muốn.
0 Bây giờ, nếu chúng ta thay đổi một chút:
1 Giờ bạn có thể thấy,
76 trông giống như một Observable, nó lại còn giống như Observer, hơn nữa nó có thể gửi signals cho nhiều Observers mà nó đang quản lý. Đây chính là cấu trúc hybrid của Subject. A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners. Mỗi Subject là một Observable: bạn có thể
28 vào nó, cung cấp cho nó một Observer và bạn có thể nhận data tương ứng. Mỗi Subject là một Observer: bên trong Subject có chứa các method
8,
6,
7 tương ứng để bạn có thể
28 vào Observable chẳng hạn. Khi cần gửi dữ liệu cho các Observers mà Subject đang quản lý, bạn chỉ cần gọi hàm tương ứng. Ví dụ:
2 Hoặc truyền vào một Observable:
3 Với phương pháp kể trên, chúng ta đã cơ bản chuyển đổi từ một unicast Observable execution sang multicast, bằng cách sử dụng Subject. unicast: giống như bạn vào Youtube, mở video nào đó đã được thu sẵn và xem, nó play từ đầu đến cuối video. Một người khác vào xem, Youtube cũng sẽ phát từ đầu đến cuối như thế, hai người không có liên quan gì về thời gian hiện tại của video mà mình đang xem. multicast: cũng là hai người (có thể nhiều hơn) vào xem video ở Youtube, nhưng video đó đang phát Live (theo dõi một show truyền hình, hay một trận bóng đá Live chẳng hạn). Lúc này Youtube sẽ phát video Live, và những người vào xem video đó sẽ có cùng một thời điểm của video đó (cùng thời điểm của trận đấu đang diễn ra chẳng hạn). 9.1 BehaviorSubjectMột trong các biến thể của Subject đó là BehaviorSubject, nó là biến thế có khái niệm về “the current value”. BehaviorSubject lưu trữ lại giá trị mới emit gần nhất để khi một Observer mới subscribe vào, nó sẽ emit giá trị đó ngay lập tức cho Observer vừa rồi. BehaviorSubjects are useful for representing “values over time”. For instance, an event stream of birthdays is a Subject, but the stream of a person’s age would be a BehaviorSubject. Hay như sử dụng BehaviorSubject để chia sẻ thông tin user hiện tại đang đăng nhập hệ thống cho các component khác nhau trong Angular chẳng hạn. Lưu ý: BehaviorSubject yêu cầu phải có giá trị khởi tạo khi tạo ra subject.
4 9.2 ReplaySubjectMột ReplaySubject tương tự như một BehaviorSubject khi nó có thể gửi những dữ liệu trước đó cho Observer mới subscribe, nhưng nó có thể lưu giữ nhiều giá trị (có thể là toàn bộ giá trị của stream từ thời điểm ban đầu). Tham số đầu vào của ReplaySubject có thể là: buffer: là số lượng phần tử tối đa có thể lưu trữ. windowTime: (ms) thời gian tối đa tính đến thời điểm gần nhất emit value.
5 Hoặc kết hợp buffer với windowTime:
6 Trong ví dụ trên sau 1s chỉ có giá trị 3, 4 và 5 là được emit trong 500ms gần nhất và nằm trong buffer nên được replay lại cho
76. 9.3 AsyncSubjectĐây là biến thể mà chỉ emit giá trị cuối cùng của Observable execution cho các observers, và chỉ khi execution complete. Lưu ý: Nếu stream không complete thì không có gì được emit cả.
7 9.4 Subject CompleteKhi BehaviorSubject complete, thì các Observers subscribe vào sau đó sẽ chỉ nhận được
7 signal. Khi ReplaySubject complete, thì các Observers subscribe vào sau đó sẽ được emit tất cả các giá trị lưu trữ trong buffer, sau đó mới thực thi
7 của Observer. Kể cả khi AsyncSubject complete rồi, Observer vẫn có thể subscribe vào được và vẫn nhận giá trị cuối cùng.
8 10. OperatorsTrong Rxjs chúng ta thấy có vô số các method/static method được cung cấp, khiến nó trở nên vô cùng mạnh mẽ và hữu dụng. Vậy Operator là gì? An Operator is a function which creates a new Observable based on the current Observable. This is a pure operation: the previous Observable stays unmodified. Operator là một pure function. Với cùng một giá trị đầu vào, chúng ta sẽ luôn có cùng một giá trị ở đầu ra. Ví dụ:
9 Operator nhận đầu vào là một Observable, sau đó xử lý và tạo mới mội Observable để trả về, và giữ Observable đầu vào không bị thay đổi gì. Ví dụ, chúng ta tạo một Operator nhận đầu vào là một function, giống như Array
03 chẳng hạn.
0 Bây giờ chúng ta có thể tạo một Observable như sau:
1 10.1 Instance operators vs static operatorsThông thường static operators là các operators được gắn với
88 Observable, chúng được dùng phổ biến để tạo mới Observable. Ví dụ như các operator
33,
34,
35,
92,
93, etc. Instance operators: các operators này gắn với một instance của
88 Observable. Giả sử chúng ta tạo operator
95 như sau, thì kết quả cũng được tương tự như trên:
2 Instance operators are functions that use the this keyword to infer what is the input Observable. Static operators are pure functions attached to the Observable class, and usually are used to create Observables from scratch. Operators được chia vào nhiều nhóm khác nhau, mỗi nhóm có một mục đích sử dụng:
Sau đây chúng ta sẽ tìm hiểu một số operators chính hay dùng đến. 10.2 Creation OperatorsTừ đầu đến giờ chúng ta đã quen với sử dụng
21 operator để tạo ra Observable, ngoài ra chúng ta còn có một số operators khác để tạo Observable từ những common usecase. Ví dụ để tạo Observable từ một số static values, chúng ta có thể dùng
33:
3 Hoặc có thể dùng
33 với nhiều tham số đầu vào:
4 Lưu ý: với các static values như ví dụ trên, chúng được gửi đi sync. (mặc đinh nếu không có tác động của Scheduler). Sau khi gửi đi hết dữ liệu, nó sẽ gửi
7 signal. Một số operators tương tự:
Tổng quát cho các loại event thì chúng ta có
01: Converts any addHandler/removeHandler API to an Observable. Ví dụ:
5 Để làm việc với DOM event chúng ta có
02 operator, nó là bản chi tiết của
01:
6 Khi click vào trang web, chúng ta có thể thấy console hiển thị tọa độ X ra. Để convert từ Promise sang Observable chúng ta có
92 operator:
7 Tạo mới một Observable từ Array, array-like object, Promise, iterable object, hoặc Observable-like object chúng ta dùng
34 operator:
8 repeat:
06 Repeat khi nào Observable complete, có thể giới hạn số lần repeat.
9 Còn nhiều các operators khác nữa như
07, các bạn vào trang chủ của ReactiveX để xem thêm. 10.3 Transformation OperatorsTransformation Operators dùng để chuyển đổi giá trị của Observable từ dạng này sang sang dạng khác. Hay gặp nhất có lẽ là
03, nó tương tự như
95 operator mà chúng ta đã tạo ở trên (tạo để tìm hiểu cơ chế hoạt động).
0 Các bạn có thể dùng
03 để chuyển đổi theo bất kỳ yêu cầu nào của các bạn. Ví dụ khi làm việc với Response object của HTTP, lúc này các bạn có thể convert từ JSON thành object như sau:
1 Khi bạn muốn luôn luôn trả về 1 giá trị cho các giá trị được emit, lúc này bạn có thể dùng
11 operator.
2 scan:
12 Giống như Array
06.
3 buffer:
14 Lưu trữ giá trị được emit ra và đợi đến khi
15 emit thì emit những giá trị đó thành 1 array.
4 bufferTime:
16 Tương tự như
17, nhưng emit values mỗi khoảng thời gian
18 ms.
5 Ngoài ra còn rất nhiều operators khác như
17, và một số Operators chúng ta sẽ tìm hiểu ở phần sau “Higher Order Observables”. 10.4 Filtering OperatorsFiltering Operators mục đích để filter các giá trị được emit thỏa mãn điều kiện nào đó. filter:
20 Giống như Array filter, operator này chỉ emit các value thỏa mãn
21 function. Nếu không có phần tử nào thỏa mãn thì không emit giá trị
8 nào cả.
6 take:
23 Emit tối đa
24 giá trị, rồi complete.
7 takeUntil:
25 Emit value cho đến khi
26 emit thì dừng. Rất hữu ích khi làm việc với Angular, các bạn có thể sử dụng để tạo ra trình auto-unsubscribe cho Observable. Vì khi complete thì nó tự
16.
8 takeWhile:
28 Còn emit value khi mà
29 còn trả về
30. Một khi
29 trả về
32 thì dừng lại, complete.
9 skip:
33 Skip số lượng phần tử từ đầu stream đến
34, các giá trị emit sau đó mới được emit.
0 Tương tự như
35,
36, nhưng hành động skip thay vì take, chúng ta cũng có
37,
38
39 chúng ta có một cách viết khác
40:
41 Nếu không truyền vào
29 thì nó sẽ lấy phần tử đầu tiên, ngược lại sẽ lấy phần tử đầu tiên thỏa mãn
29.
1 Ngược lại với
40 chúng ta sẽ có
45:
46.
2 throttleTime:
47 Emit giá trị mới nhất khi một khoảng thời gian
48 trôi qua.
3 Rất hữu ích khi làm việc với các event fire liên tục như
49, hay
50 chẳng hạn.
4 Sử dụng với
51 để hạn chế event fire quá nhiều. Cứ sau 300ms thì emit giá trị mới nhất có được.
5 debounceTime:
52 Discard emitted values that take less than the specified time between output Hay dùng khi bạn muốn đảm bảo người dùng thôi thao tác sau một khoảng thời gian
53 thì emit giá trị. Giống như đảm bảo người dùng ngừng di chuyển chuột, hay ngừng gõ vào input chẳng hạn, để tránh phát sinh nhiều hành động không cần thiết.
6 distinctUntilChanged:
54 Chỉ emit value khi giá trị hiện tại khác với giá trị đã emit trước đó. Hay dùng để đảm bảo dữ liệu phải khác nhau thì mới thực thi hành động. Kết hợp cùng
55 hoặc
51 để làm việc khi người dùng nhập vào input, sau đó xử lý giá trị, và chỉ xử lý khi dữ liệu hiện tại khác với dữ liệu đã xử lý trước đó.
7 Khi bạn không di chuyển chuột mà click liên tục thì không có gì được emit cả. Trên đây chỉ là một số Filter Operators hay dùng, các bạn có thể vào trang chủ của ReactiveX để tìm hiểu thêm những operators như
57,
58, etc. 10.5 Combination OperatorsCombination Operators: sử dụng để kết hợp các Observables lại với nhau. merge: có thể sử dụng cả instance và static operator. Dùng để merge nhiều Observables thành một Observable.
59
60
61: số lượng stream được phép chạy đồng thời.
8 Lúc này cả 3 cùng chạy đồng thời, nếu bạn muốn dùng instance operator thì thay như sau:
9 Giả sử bạn muốn chỉ 2 stream được chạy đồng thời:
0
62 sẽ đợi cho
63,
64 complete rồi mới bắt đầu chạy. concat: trong trường hợp bạn chỉ muốn 1 stream được chạy, các stream khác phải đợi stream trước complete. Tương đương với
65. Giống như Array
66 vậy.
1 Trong nhiều trường hợp bạn muốn tạo ra 1 Observable cho 1 giá trị, rồi dùng nó
66 với một số Observables khác, lúc này chúng ta có một cách viết gọn hơn sử dụng
68 với cú pháp
69.
2 combineLatest:
70 combineLatest Operator có thể dùng cả static và instance. Mỗi khi một Observable emit value, emit giá trị mới nhất từ tất cả
71, nhưng phải thỏa mãn các Observable khác cũng đã emit value. Như trong ví dụ sau, khi
72 emit value, nó emit giá trị của
63 lúc này là
74, nhưng
62 lúc này mới emit value đầu tiên.
3 Hoặc trường hợp bạn muốn xử lý giá trị trước bằng
76 function:
4 withLatestFrom:
77 Lưu ý: chỉ có thể sử dụng instance operator với withLatestFrom. Sử dụng để emit value của Observable này
78 kết hợp với latest value của
79 Observable, nếu
79 chưa emit gì thì
78 có emit value cũng không emit gì cả.
5 Stream
63 bị phụ thuộc vào
64 trong giai đoạn đầu tiên, vì khi đó
64 chưa emit gì, nên
63 cũng không emit, đến khi
64 bắt đầu emit thì những lần emit sau đó của
63 sẽ kết hợp với giá trị mới nhất của
64. Nếu bạn muốn mỗi khi có giá trị của stream emit thì emit giá trị mới nhất của các stream khác, lúc này bạn nên dùng
89. Chẳng hạn, bạn có search box với
14 fields để thiết lập điều kiện, và khi mỗi field thay đổi thì sẽ lấy danh sách mới dựa theo list các điều kiện kia chẳng hạn. forkJoin:
91 Khi tất cả các Observables complete, emit giá trị cuối cùng của mỗi Observable. Lưu ý: nếu còn một stream không complete, hoặc 1 stream không emit value nào
93 thì không có gì được emit cả. Chỉ có thể sử dụng static operator.
6 Không emit gì cả ngoài
7:
7 zip:
94 Sau khi tất cả Observables emit value, thì emit 1 mảng các giá trị tương ứng (cùng index). Nếu một phần tử không emit gì cả
95, hoặc complete ngay
93, thì không có gì emit cả. Trường hợp
93 thì chỉ send
7 signal. The zip operator will subscribe to all inner observables, waiting for each to emit a value. Once this occurs, all values with the corresponding index will be emitted. This will continue until at least one inner observable completes.
8 10.5 Error Handling OperatorsError Handling Operators sử dụng để handle error trong ứng dụng của bạn. catch:
99 Thường được sử dụng rộng rãi để handle error. Ví dụ:
9 retry:
00 Sử dụng để restart lại stream trong trường hợp stream bị văng ra lỗi. Có thể giới hạn số lần retry bằng cách truyền vào tham số. Nếu không truyền vào, sẽ luôn retry mỗi khi lỗi văng ra. Chẳng hạn khi bạn reqest lên server để lấy dữ liệu, giả sử trong quá trình này bạn muốn request lại 2 lần nếu bị văng ra lỗi.
0 retryWhen:
01 Retry một Observable dựa vào một
02 Observable emit value.
02 Observable này chỉ emit
8 khi
72 Observable văng ra error, khi đó chúng ta có thể xác định khi nào lại subscribe vào
72 Observable lần nữa.
1 10.6 Utility Observablesdo:
07 Dùng để thực hiện một hành động nào đó, và đảm bảo side-effect không ảnh hưởng tới source. Như ví dụ dưới đây, chúng ta đã thực hiện update value cho
08 trong
09, nhưng không bị ảnh hưởng gì.
2 delay:
10 Delay emit value theo một khoảng thời gian cho trước. Ví dụ chúng ta muốn delay 1s rồi mới bắt đầu emit value cho stream sau:
3 Và còn nhiều operators khác như
11 trên trang chủ của ReactiveX. 10.7 Higher Order ObservablesHigher Order Observable (HOO) là một Observable trả về một Observable, nó giống như mảng nhiều chiều vậy - mảng 2 chiều chứa trong nó các mảng 1 chiều. Ví dụ:
4 Oh, nhìn thế này không ổn, phải có cách nào khác. Hãy quay lại ví dụ flatten Array ở trên, Observable cũng cung cấp cho chúng ta giải pháp để thực hiện flatten HOO. mergeMap:
12
5 Hay có một cách viết ngắn gọn hơn, là alias của cách viết ở trên:
6 Bây giờ chúng ta thay đổi 1 chút, cứ mỗi lần click vào
13, chúng ta sẽ start stream
35:
7 Khi bạn click 3 lần chẳng hạn, sẽ tạo ra 3 stream chạy đồng thời (nếu cái trước chưa complete thì cái mới vẫn start), lúc này tương tự như
15, chúng ta có thể control có bao nhiêu stream được chạy đồng thời. Ví dụ chúng ta giới hạn có 2 stream chạy đồng thời.
8 Tham số thứ 2 của
16 được đặt tên là
17, tham số này cho phép chúng ta truyền vào 1 hàm để xử lý inner và outer Observable value. Ví dụ, mình muốn lấy về tọa độ
18,
19 của stream
20 kèm theo giá trị emit của
35:
9 Lưu ý:
22 là một alias cho
16 concatMap:
24 Khi chỉ cho phép 1 stream được chạy, các stream khác phải đợi stream trước complete mới được chạy - tương tự
66 thì chúng ta có operator
26.
0 Khi bạn click vào document nhiều lần thì chỉ có 1 stream
35 chạy tại 1 thời điểm, những stream khác phải đợi stream trước complete mới thực thi. Tham số
17 cũng giống như
16. switchMap:
30 Chỉ cho phép 1 stream được chạy, nếu stream trước chưa
7 thì
16.
33 rất hữu ích khi làm việc với HTTP request, giả sử request cũ chưa trả về giá trị mà người dùng đã request dữ liệu mới thì chúng ta có thể cancel để tránh việc dư thừa hoặc xử lý sai dữ liệu (dữ liệu cũ hơn response sau dữ liệu mới hơn). Khi cần đảm bảo chỉ có stream mới nhất đang chạy.
1 Khi bạn click vào document nhiều lần, nếu để ý bạn sẽ thấy những stream trước đó chưa complete sẽ bị cancel và start stream mới. Ngoài ra nó còn hữu ích để tránh việc bị leak. Giả sử stream
35 kia chạy vô hạn, nếu bạn không cancel trước đó thì chẳng mấy chốc ứng dụng của bạn sẽ crash khi người dùng click càng ngày càng nhiều lần.
2
33 cũng có
17 cho phép chúng ta tùy chọn dữ liệu cần emit. Lưu ý: Trong hầu hết các trường hợp nếu bạn không có ý định gì đặc biệt thì nên dùng
33 thay vì
16. 11. Tổng kếtTrong bài này chúng ta đã tìm hiểu rất nhiều vấn đề về Reactive Programming với Rxjs, nhưng vẫn còn thiếu phần Multicasting, hi vọng mình sẽ làm xong phần đó trong thời gian tới. Marble diagrams các bạn có thể theo dõi trên trang chủ ReactiveX. Observable vs Promise: Khả năng mở rộng của Observable là rất lớn, nó bao quát được nhiều vấn đề mà Promise còn thiếu, như emit multiple values, hay cancel execution. Hơn nữa, nó đang trong giai đoạn phát triển để trở thành standard của EcmaScript (hiện tại đang ở stage-1), nên việc sử dụng Observable thay vì Promise có lẽ là điều chúng ta nên cân nhắc. Các bạn quan tâm về Rxjs có thể theo dõi thêm các phần khác ở trong link tham khảo cuối bài. Sau đây là một phần bonus để các bạn tạo ra một
39 mà nó chịu trách nhiệm tự
16 các subscription trong ứng dụng của bạn. |