Rxjs Observables

4 min read
Mid-level1 min read
Rapid overview

Rxjs Observables

TL;DR

RxJS Observables model values that arrive over time — HTTP responses, user input, timers — and let you compose them with operators (map, filter, debounceTime, switchMap, catchError) instead of nested callbacks. Mastery is mostly knowing which operator solves which problem and remembering to unsubscribe so cold streams don't leak.

How it works

Observable Basics

import { Observable } from 'rxjs';

// Create observable
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

// Subscribe
observable.subscribe({
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('Done')
});

Common RxJS Operators

import { of, from, interval } from 'rxjs';
import { map, filter, take, debounceTime, switchMap, catchError } from 'rxjs/operators';

// map - transform values
of(1, 2, 3).pipe(
  map(x => x * 2)
).subscribe(console.log); // 2, 4, 6

// filter - filter values
of(1, 2, 3, 4).pipe(
  filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 4

// take - take first n values
interval(1000).pipe(
  take(3)
).subscribe(console.log); // 0, 1, 2 then complete

// debounceTime - delay emissions
searchInput$.pipe(
  debounceTime(300)
).subscribe(query => search(query));

// switchMap - switch to new observable
searchInput$.pipe(
  debounceTime(300),
  switchMap(query => this.http.get(`/api/search?q=${query}`))
).subscribe(results => console.log(results));

// catchError - handle errors
this.http.get('/api/data').pipe(
  catchError(error => {
    console.error(error);
    return of([]);
  })
).subscribe(data => console.log(data));

Subject Types

import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';

// Subject - no initial value
const subject = new Subject<number>();
subject.next(1);
subject.subscribe(console.log); // Only receives future values
subject.next(2); // Logs: 2

// BehaviorSubject - requires initial value, emits latest
const behaviorSubject = new BehaviorSubject<number>(0);
behaviorSubject.subscribe(console.log); // Logs: 0
behaviorSubject.next(1); // Logs: 1

// ReplaySubject - replays n values to new subscribers
const replaySubject = new ReplaySubject<number>(2);
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.subscribe(console.log); // Logs: 2, 3

// AsyncSubject - only emits last value on complete
const asyncSubject = new AsyncSubject<number>();
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.subscribe(console.log); // Nothing yet
asyncSubject.complete(); // Logs: 2

Hot vs Cold Observables

FeatureColdHot
Starts on subscribeYesNo
Shared executionNoYes
Late subscribersRestart sequenceMiss previous values
ExamplesHTTP, timerSubject, DOM events

Cold observables create a new producer per subscriber, hot observables share a producer.

Interview-friendly examples

Cold (new producer per subscriber):

  • this.http.get(...) (each subscribe makes a request unless shared)
  • defer(() => fetch(...))

Hot (shared producer):

  • fromEvent(button, 'click')
  • Subject / BehaviorSubject

Signals vs Observables (how they fit together)

Observables are best for async streams and cancellation-heavy flows:

  • HTTP, websockets, user events
  • composition operators (switchMap, mergeMap, retries, buffering)

Signals are best for local in-memory UI state and derived values:

  • synchronous state (signal)
  • memoized derivations (computed)

Bridge patterns:

import { toSignal, takeUntilDestroyed } from '@angular/core/rxjs-interop';

items = toSignal(this.items$); // Observable -> Signal

this.items$.pipe(takeUntilDestroyed()).subscribe(); // manual subscription, safe teardown

Unsubscribing

export class Component implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    // Method 1: takeUntil
    this.dataService.getData().pipe(
      takeUntil(this.destroy$)
    ).subscribe(data => console.log(data));

    // Method 2: Store subscription
    this.subscription = this.dataService.getData()
      .subscribe(data => console.log(data));
  }

  ngOnDestroy() {
    // Method 1
    this.destroy$.next();
    this.destroy$.complete();

    // Method 2
    this.subscription?.unsubscribe();
  }
}

Async Pipe

@Component({
  selector: 'app-users',
  template: `
    <div *ngIf="users$ | async as users">
      <div *ngFor="let user of users">{{user.name}}</div>
    </div>
  `
})
export class UsersComponent {
  users$: Observable<User[]>;

  constructor(private http: HttpClient) {
    this.users$ = this.http.get<User[]>('/api/users');
  }
  // No need to unsubscribe - async pipe handles it
}

Operator Cheat Sheet

  • map: transform values.
  • filter: keep values that match a predicate.
  • switchMap: cancel previous inner observable when a new value arrives.
  • mergeMap: run inner observables concurrently.
  • concatMap: queue inner observables in order.
  • combineLatest: emit when any source emits after all have emitted once.
  • tap: side effects such as logging.
  • catchError: recover from errors with a fallback observable.
  • debounceTime: wait for a pause before emitting.

Combination Operators

import { combineLatest, forkJoin, merge, zip } from 'rxjs';

// combineLatest - emits when any observable emits
combineLatest([obs1$, obs2$]).subscribe(([val1, val2]) => {
  console.log(val1, val2);
});

// forkJoin - waits for all to complete (like Promise.all)
forkJoin([
  this.http.get('/api/users'),
  this.http.get('/api/posts')
]).subscribe(([users, posts]) => {
  console.log(users, posts);
});

// merge - emits from all observables
merge(clicks$, hovers$).subscribe(event => {
  console.log(event);
});

// zip - pairs values by index
zip(numbers$, letters$).subscribe(([num, letter]) => {
  console.log(num, letter);
});

See also