Rxjs Observables
3 min readRapid overview
RxJS & Observables in Angular
Observable Basics
import { Observable } from 'rxjs';
// Create observable
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// Subscribe
observable.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('Done')
});
Common RxJS Operators
import { of, from, interval } from 'rxjs';
import { map, filter, take, debounceTime, switchMap, catchError } from 'rxjs/operators';
// map - transform values
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log); // 2, 4, 6
// filter - filter values
of(1, 2, 3, 4).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 4
// take - take first n values
interval(1000).pipe(
take(3)
).subscribe(console.log); // 0, 1, 2 then complete
// debounceTime - delay emissions
searchInput$.pipe(
debounceTime(300)
).subscribe(query => search(query));
// switchMap - switch to new observable
searchInput$.pipe(
debounceTime(300),
switchMap(query => this.http.get(`/api/search?q=${query}`))
).subscribe(results => console.log(results));
// catchError - handle errors
this.http.get('/api/data').pipe(
catchError(error => {
console.error(error);
return of([]);
})
).subscribe(data => console.log(data));
Subject Types
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';
// Subject - no initial value
const subject = new Subject<number>();
subject.next(1);
subject.subscribe(console.log); // Only receives future values
subject.next(2); // Logs: 2
// BehaviorSubject - requires initial value, emits latest
const behaviorSubject = new BehaviorSubject<number>(0);
behaviorSubject.subscribe(console.log); // Logs: 0
behaviorSubject.next(1); // Logs: 1
// ReplaySubject - replays n values to new subscribers
const replaySubject = new ReplaySubject<number>(2);
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.subscribe(console.log); // Logs: 2, 3
// AsyncSubject - only emits last value on complete
const asyncSubject = new AsyncSubject<number>();
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.subscribe(console.log); // Nothing yet
asyncSubject.complete(); // Logs: 2
Hot vs Cold Observables
| Feature | Cold | Hot |
|---|---|---|
| Starts on subscribe | Yes | No |
| Shared execution | No | Yes |
| Late subscribers | Restart sequence | Miss previous values |
| Examples | HTTP, timer | Subject, DOM events |
Cold observables create a new producer per subscriber, hot observables share a producer.
Interview-friendly examples
Cold (new producer per subscriber):
this.http.get(...)(each subscribe makes a request unless shared)defer(() => fetch(...))
Hot (shared producer):
fromEvent(button, 'click')Subject/BehaviorSubject
Signals vs Observables (how they fit together)
Observables are best for async streams and cancellation-heavy flows:
- HTTP, websockets, user events
- composition operators (
switchMap,mergeMap, retries, buffering)
Signals are best for local in-memory UI state and derived values:
- synchronous state (
signal) - memoized derivations (
computed)
Bridge patterns:
import { toSignal, takeUntilDestroyed } from '@angular/core/rxjs-interop';
items = toSignal(this.items$); // Observable -> Signal
this.items$.pipe(takeUntilDestroyed()).subscribe(); // manual subscription, safe teardown
Unsubscribing
export class Component implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
// Method 1: takeUntil
this.dataService.getData().pipe(
takeUntil(this.destroy$)
).subscribe(data => console.log(data));
// Method 2: Store subscription
this.subscription = this.dataService.getData()
.subscribe(data => console.log(data));
}
ngOnDestroy() {
// Method 1
this.destroy$.next();
this.destroy$.complete();
// Method 2
this.subscription?.unsubscribe();
}
}
Async Pipe
@Component({
selector: 'app-users',
template: `
<div *ngIf="users$ | async as users">
<div *ngFor="let user of users">{{user.name}}</div>
</div>
`
})
export class UsersComponent {
users$: Observable<User[]>;
constructor(private http: HttpClient) {
this.users$ = this.http.get<User[]>('/api/users');
}
// No need to unsubscribe - async pipe handles it
}
Operator Cheat Sheet
map: transform values.filter: keep values that match a predicate.switchMap: cancel previous inner observable when a new value arrives.mergeMap: run inner observables concurrently.concatMap: queue inner observables in order.combineLatest: emit when any source emits after all have emitted once.tap: side effects such as logging.catchError: recover from errors with a fallback observable.debounceTime: wait for a pause before emitting.
Combination Operators
import { combineLatest, forkJoin, merge, zip } from 'rxjs';
// combineLatest - emits when any observable emits
combineLatest([obs1$, obs2$]).subscribe(([val1, val2]) => {
console.log(val1, val2);
});
// forkJoin - waits for all to complete (like Promise.all)
forkJoin([
this.http.get('/api/users'),
this.http.get('/api/posts')
]).subscribe(([users, posts]) => {
console.log(users, posts);
});
// merge - emits from all observables
merge(clicks$, hovers$).subscribe(event => {
console.log(event);
});
// zip - pairs values by index
zip(numbers$, letters$).subscribe(([num, letter]) => {
console.log(num, letter);
});