Rxjs Observables
4 min readRapid overview
Rxjs Observables
TL;DR
RxJS Observables model values that arrive over time — HTTP responses, user input, timers — and let you compose them with operators (map, filter, debounceTime, switchMap, catchError) instead of nested callbacks. Mastery is mostly knowing which operator solves which problem and remembering to unsubscribe so cold streams don't leak.
How it works
Observable Basics
import { Observable } from 'rxjs';
// Create observable
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// Subscribe
observable.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('Done')
});
Common RxJS Operators
import { of, from, interval } from 'rxjs';
import { map, filter, take, debounceTime, switchMap, catchError } from 'rxjs/operators';
// map - transform values
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log); // 2, 4, 6
// filter - filter values
of(1, 2, 3, 4).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 4
// take - take first n values
interval(1000).pipe(
take(3)
).subscribe(console.log); // 0, 1, 2 then complete
// debounceTime - delay emissions
searchInput$.pipe(
debounceTime(300)
).subscribe(query => search(query));
// switchMap - switch to new observable
searchInput$.pipe(
debounceTime(300),
switchMap(query => this.http.get(`/api/search?q=${query}`))
).subscribe(results => console.log(results));
// catchError - handle errors
this.http.get('/api/data').pipe(
catchError(error => {
console.error(error);
return of([]);
})
).subscribe(data => console.log(data));
Subject Types
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';
// Subject - no initial value
const subject = new Subject<number>();
subject.next(1);
subject.subscribe(console.log); // Only receives future values
subject.next(2); // Logs: 2
// BehaviorSubject - requires initial value, emits latest
const behaviorSubject = new BehaviorSubject<number>(0);
behaviorSubject.subscribe(console.log); // Logs: 0
behaviorSubject.next(1); // Logs: 1
// ReplaySubject - replays n values to new subscribers
const replaySubject = new ReplaySubject<number>(2);
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.subscribe(console.log); // Logs: 2, 3
// AsyncSubject - only emits last value on complete
const asyncSubject = new AsyncSubject<number>();
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.subscribe(console.log); // Nothing yet
asyncSubject.complete(); // Logs: 2
Hot vs Cold Observables
| Feature | Cold | Hot |
|---|---|---|
| Starts on subscribe | Yes | No |
| Shared execution | No | Yes |
| Late subscribers | Restart sequence | Miss previous values |
| Examples | HTTP, timer | Subject, DOM events |
Cold observables create a new producer per subscriber, hot observables share a producer.
Interview-friendly examples
Cold (new producer per subscriber):
this.http.get(...)(each subscribe makes a request unless shared)defer(() => fetch(...))
Hot (shared producer):
fromEvent(button, 'click')Subject/BehaviorSubject
Signals vs Observables (how they fit together)
Observables are best for async streams and cancellation-heavy flows:
- HTTP, websockets, user events
- composition operators (
switchMap,mergeMap, retries, buffering)
Signals are best for local in-memory UI state and derived values:
- synchronous state (
signal) - memoized derivations (
computed)
Bridge patterns:
import { toSignal, takeUntilDestroyed } from '@angular/core/rxjs-interop';
items = toSignal(this.items$); // Observable -> Signal
this.items$.pipe(takeUntilDestroyed()).subscribe(); // manual subscription, safe teardown
Unsubscribing
export class Component implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
// Method 1: takeUntil
this.dataService.getData().pipe(
takeUntil(this.destroy$)
).subscribe(data => console.log(data));
// Method 2: Store subscription
this.subscription = this.dataService.getData()
.subscribe(data => console.log(data));
}
ngOnDestroy() {
// Method 1
this.destroy$.next();
this.destroy$.complete();
// Method 2
this.subscription?.unsubscribe();
}
}
Async Pipe
@Component({
selector: 'app-users',
template: `
<div *ngIf="users$ | async as users">
<div *ngFor="let user of users">{{user.name}}</div>
</div>
`
})
export class UsersComponent {
users$: Observable<User[]>;
constructor(private http: HttpClient) {
this.users$ = this.http.get<User[]>('/api/users');
}
// No need to unsubscribe - async pipe handles it
}
Operator Cheat Sheet
map: transform values.filter: keep values that match a predicate.switchMap: cancel previous inner observable when a new value arrives.mergeMap: run inner observables concurrently.concatMap: queue inner observables in order.combineLatest: emit when any source emits after all have emitted once.tap: side effects such as logging.catchError: recover from errors with a fallback observable.debounceTime: wait for a pause before emitting.
Combination Operators
import { combineLatest, forkJoin, merge, zip } from 'rxjs';
// combineLatest - emits when any observable emits
combineLatest([obs1$, obs2$]).subscribe(([val1, val2]) => {
console.log(val1, val2);
});
// forkJoin - waits for all to complete (like Promise.all)
forkJoin([
this.http.get('/api/users'),
this.http.get('/api/posts')
]).subscribe(([users, posts]) => {
console.log(users, posts);
});
// merge - emits from all observables
merge(clicks$, hovers$).subscribe(event => {
console.log(event);
});
// zip - pairs values by index
zip(numbers$, letters$).subscribe(([num, letter]) => {
console.log(num, letter);
});