Observer
The Observable pattern is one that allows an object, called subject, to keep track of other objects, called observers, interested in the subject state. When the subject state changes, it notifies its observers about it. The mechanics behind this are really simple.
Below code for sorting function by re-calling API. This is not only for sorting but also for listening any changes of paginator. The idea is if we want to listen to any changes of any objects we add them to observation, making it observable.
private sortChangeSubscription: Subscription;
changes = new Subject();
@ViewChild(MatPaginator) paginator: MatPaginator;
@ViewChild(MatSort) sort: MatSort;
ngOnInit() {
this.sortChangeSubscription = this.sort.sortChange.subscribe(() => this.paginator.firstPage());
this.dataChangeSubscription = merge(
this.dataChange,
this.paginator.page,
this.sort.sortChange
).pipe(
debounce(() => interval(300)),
tap(() => this.isLoadingResults = true),
switchMap(() => this.dataService.fetchAll({
pageSize: this.paginator.pageSize,
pageIndex: this.paginator.pageIndex + 1,
sortField: this.sort.active || 'created_at',
sortDirection: this.sort.direction || 'desc',
...this.filterInfo
})),
tap(() => this.isLoadingResults = false)
).subscribe((resp: JobApplicationCollectionResult) => {
this.collection = resp.jobApplicationCollection;
this.ability = resp.ability;
});
}
Handling sortChange, going to first page if user clicks on sorting icon at column header.
this.sortChangeSubscription = this.sort.sortChange.subscribe(() => this.paginator.firstPage());
Putting objects to observation by using pipe method.
// Creates an output Observable which concurrently emits all values from every
// given input Observable.
this.dataChangeSubscription = merge(
this.dataChange,
this.paginator.page,
this.sort.sortChange
// Would do st with those objects. This is AsyncPipe
/**
* Used to stitch together functional operators into a chain.
* @method pipe
* @return {Observable} the Observable result of all of the operators having
* been called in the order they were passed in.
*
* ### Example
* ```javascript
* import { map, filter, scan } from 'rxjs/operators';
*
* Rx.Observable.interval(1000)
* .pipe(
* filter(x => x % 2 === 0),
* map(x => x + x),
* scan((acc, x) => acc + x)
* )
* .subscribe(x => console.log(x))
* ```
*/
).pipe(
In first 300 ms if new value arrives, it drops previous ones.
/**
* Emits a value from the source Observable only after a particular time span
* determined by another Observable has passed without another source emission.
*
* <span class="informal">It's like {@link debounceTime}, but the time span of
* emission silence is determined by a second Observable.</span>
*
* ![](debounce.png)
*
* `debounce` delays values emitted by the source Observable, but drops previous
* pending delayed emissions if a new value arrives on the source Observable.
* This operator keeps track of the most recent value from the source
* Observable, and spawns a duration Observable by calling the
* `durationSelector` function. The value is emitted only when the duration
* Observable emits a value or completes, and if no other value was emitted on
* the source Observable since the duration Observable was spawned. If a new
* value appears before the duration Observable emits, the previous value will
* be dropped and will not be emitted on the output Observable.
*
* Like {@link debounceTime}, this is a rate-limiting operator, and also a
* delay-like operator since output emissions do not necessarily occur at the
* same time as they did on the source Observable.
*
* ## Example
* Emit the most recent click after a burst of clicks
* ```javascript
* const clicks = fromEvent(document, 'click');
* const result = clicks.pipe(debounce(() => interval(1000)));
* result.subscribe(x => console.log(x));
* ```
*
* @see {@link audit}
* @see {@link debounceTime}
* @see {@link delayWhen}
* @see {@link throttle}
*
* @param {function(value: T): SubscribableOrPromise} durationSelector A function
* that receives a value from the source Observable, for computing the timeout
* duration for each source value, returned as an Observable or a Promise.
* @return {Observable} An Observable that delays the emissions of the source
* Observable by the specified duration Observable returned by
* `durationSelector`, and may drop some values if they occur too frequently.
* @method debounce
* @owner Observable
*/
debounce(() => interval(300)),
/**
* Perform a side effect for every emission on the source Observable, but return
* an Observable that is identical to the source.
*
* <span class="informal">Intercepts each emission on the source and runs a
* function, but returns an output which is identical to the source as long as errors don't occur.</span>
*
* ![](do.png)
*
* Returns a mirrored Observable of the source Observable, but modified so that
* the provided Observer is called to perform a side effect for every value,
* error, and completion emitted by the source. Any errors that are thrown in
* the aforementioned Observer or handlers are safely sent down the error path
* of the output Observable.
*
* This operator is useful for debugging your Observables for the correct values
* or performing other side effects.
*
* Note: this is different to a `subscribe` on the Observable. If the Observable
* returned by `tap` is not subscribed, the side effects specified by the
* Observer will never happen. `tap` therefore simply spies on existing
* execution, it does not trigger an execution to happen like `subscribe` does.
*
* ## Example
* Map every click to the clientX position of that click, while also logging the click event
* ```javascript
* const clicks = fromEvent(document, 'click');
* const positions = clicks.pipe(
* tap(ev => console.log(ev)),
* map(ev => ev.clientX),
* );
* positions.subscribe(x => console.log(x));
* ```
*
* @see {@link map}
* @see {@link Observable#subscribe}
*
* @param {Observer|function} [nextOrObserver] A normal Observer object or a
* callback for `next`.
* @param {function} [error] Callback for errors in the source.
* @param {function} [complete] Callback for the completion of the source.
* @return {Observable} An Observable identical to the source, but runs the
* specified Observer or callback(s) for each item.
* @name tap
*/
tap(() => this.isLoadingResults = true),
/**
* Projects each source value to an Observable which is merged in the output
* Observable, emitting values only from the most recently projected Observable.
*
* <span class="informal">Maps each value to an Observable, then flattens all of
* these inner Observables.</span>
*
* ![](switchMap.png)
*
* Returns an Observable that emits items based on applying a function that you
* supply to each item emitted by the source Observable, where that function
* returns an (so-called "inner") Observable. Each time it observes one of these
* inner Observables, the output Observable begins emitting the items emitted by
* that inner Observable. When a new inner Observable is emitted, `switchMap`
* stops emitting items from the earlier-emitted inner Observable and begins
* emitting items from the new one. It continues to behave like this for
* subsequent inner Observables.
*
* ## Example
* Rerun an interval Observable on every click event
* ```javascript
* const clicks = fromEvent(document, 'click');
* const result = clicks.pipe(switchMap((ev) => interval(1000)));
* result.subscribe(x => console.log(x));
* ```
*
* @see {@link concatMap}
* @see {@link exhaustMap}
* @see {@link mergeMap}
* @see {@link switchAll}
* @see {@link switchMapTo}
*
* @param {function(value: T, ?index: number): ObservableInput} project A function
* that, when applied to an item emitted by the source Observable, returns an
* Observable.
* @return {Observable} An Observable that emits the result of applying the
* projection function (and the optional deprecated `resultSelector`) to each item
* emitted by the source Observable and taking only the values from the most recently
* projected inner Observable.
* @method switchMap
* @owner Observable
*/
switchMap(() => this.dataService.fetchAll({
pageSize: this.paginator.pageSize,
pageIndex: this.paginator.pageIndex + 1,
sortField: this.sort.active || 'created_at',
sortDirection: this.sort.direction || 'desc',
...this.filterInfo
})),
tap(() => this.isLoadingResults = false)
/**
* Invokes an execution of an Observable and registers Observer handlers for notifications it will emit.
*
* <span class="informal">Use it when you have all these Observables, but still nothing is happening.</span>
*
* `subscribe` is not a regular operator, but a method that calls Observable's internal `subscribe` function. It
* might be for example a function that you passed to Observable's constructor, but most of the time it is
* a library implementation, which defines what will be emitted by an Observable, and when it be will emitted. This means
* that calling `subscribe` is actually the moment when Observable starts its work, not when it is created, as it is often
* the thought.
*
* Apart from starting the execution of an Observable, this method allows you to listen for values
* that an Observable emits, as well as for when it completes or errors. You can achieve this in two
* of the following ways.
*
* The first way is creating an object that implements {@link Observer} interface. It should have methods
* defined by that interface, but note that it should be just a regular JavaScript object, which you can create
* yourself in any way you want (ES6 class, classic function constructor, object literal etc.). In particular do
* not attempt to use any RxJS implementation details to create Observers - you don't need them. Remember also
* that your object does not have to implement all methods. If you find yourself creating a method that doesn't
* do anything, you can simply omit it. Note however, if the `error` method is not provided, all errors will
* be left uncaught.
*
* The second way is to give up on Observer object altogether and simply provide callback functions in place of its methods.
* This means you can provide three functions as arguments to `subscribe`, where the first function is equivalent
* of a `next` method, the second of an `error` method and the third of a `complete` method. Just as in case of Observer,
* if you do not need to listen for something, you can omit a function, preferably by passing `undefined` or `null`,
* since `subscribe` recognizes these functions by where they were placed in function call. When it comes
* to `error` function, just as before, if not provided, errors emitted by an Observable will be thrown.
*
* Whichever style of calling `subscribe` you use, in both cases it returns a Subscription object.
* This object allows you to call `unsubscribe` on it, which in turn will stop the work that an Observable does and will clean
* up all resources that an Observable used. Note that cancelling a subscription will not call `complete` callback
* provided to `subscribe` function, which is reserved for a regular completion signal that comes from an Observable.
*
* Remember that callbacks provided to `subscribe` are not guaranteed to be called asynchronously.
* It is an Observable itself that decides when these functions will be called. For example {@link of}
* by default emits all its values synchronously. Always check documentation for how given Observable
* will behave when subscribed and if its default behavior can be modified with a `scheduler`.
*
* ## Example
* ### Subscribe with an Observer
* ```javascript
* const sumObserver = {
* sum: 0,
* next(value) {
* console.log('Adding: ' + value);
* this.sum = this.sum + value;
* },
* error() { // We actually could just remove this method,
* }, // since we do not really care about errors right now.
* complete() {
* console.log('Sum equals: ' + this.sum);
* }
* };
*
* Rx.Observable.of(1, 2, 3) // Synchronously emits 1, 2, 3 and then completes.
* .subscribe(sumObserver);
*
* // Logs:
* // "Adding: 1"
* // "Adding: 2"
* // "Adding: 3"
* // "Sum equals: 6"
* ```
*
* ### Subscribe with functions
* ```javascript
* let sum = 0;
*
* Rx.Observable.of(1, 2, 3)
* .subscribe(
* function(value) {
* console.log('Adding: ' + value);
* sum = sum + value;
* },
* undefined,
* function() {
* console.log('Sum equals: ' + sum);
* }
* );
*
* // Logs:
* // "Adding: 1"
* // "Adding: 2"
* // "Adding: 3"
* // "Sum equals: 6"
* ```
*
* ### Cancel a subscription
* ```javascript
* const subscription = Rx.Observable.interval(1000).subscribe(
* num => console.log(num),
* undefined,
* () => console.log('completed!') // Will not be called, even
* ); // when cancelling subscription
*
*
* setTimeout(() => {
* subscription.unsubscribe();
* console.log('unsubscribed!');
* }, 2500);
*
* // Logs:
* // 0 after 1s
* // 1 after 2s
* // "unsubscribed!" after 2.5s
* ```
*
* @param {Observer|Function} observerOrNext (optional) Either an observer with methods to be called,
* or the first of three possible handlers, which is the handler for each value emitted from the subscribed
* Observable.
* @param {Function} error (optional) A handler for a terminal event resulting from an error. If no error handler is provided,
* the error will be thrown as unhandled.
* @param {Function} complete (optional) A handler for a terminal event resulting from successful completion.
* @return {ISubscription} a subscription reference to the registered handlers
* @method subscribe
*/
).subscribe((resp: JobApplicationCollectionResult) => {
this.collection = resp.jobApplicationCollection;
this.ability = resp.ability;
});
This pattern is super popular in JS framework such as Angular and ReactJS. In ReactJS we call setState function in order to change the states of component – changing data of component. It’s like Angular, component would be re-rendered if its data (properties) changed. The is no useless pattern. Something becomes pattern because it appear and applied so many times.
More about implementation of this pattern in ruby: https://medium.com/@dljerome/design-patterns-in-ruby-observer-a6e8fe2e5c0a