Rxjs Observables

3 min read
Rapid overview

RxJS & Observables in Angular

Observable Basics

import { Observable } from 'rxjs';

// Create observable
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

// Subscribe
observable.subscribe({
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('Done')
});

Common RxJS Operators

import { of, from, interval } from 'rxjs';
import { map, filter, take, debounceTime, switchMap, catchError } from 'rxjs/operators';

// map - transform values
of(1, 2, 3).pipe(
  map(x => x * 2)
).subscribe(console.log); // 2, 4, 6

// filter - filter values
of(1, 2, 3, 4).pipe(
  filter(x => x % 2 === 0)
).subscribe(console.log); // 2, 4

// take - take first n values
interval(1000).pipe(
  take(3)
).subscribe(console.log); // 0, 1, 2 then complete

// debounceTime - delay emissions
searchInput$.pipe(
  debounceTime(300)
).subscribe(query => search(query));

// switchMap - switch to new observable
searchInput$.pipe(
  debounceTime(300),
  switchMap(query => this.http.get(`/api/search?q=${query}`))
).subscribe(results => console.log(results));

// catchError - handle errors
this.http.get('/api/data').pipe(
  catchError(error => {
    console.error(error);
    return of([]);
  })
).subscribe(data => console.log(data));

Subject Types

import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';

// Subject - no initial value
const subject = new Subject<number>();
subject.next(1);
subject.subscribe(console.log); // Only receives future values
subject.next(2); // Logs: 2

// BehaviorSubject - requires initial value, emits latest
const behaviorSubject = new BehaviorSubject<number>(0);
behaviorSubject.subscribe(console.log); // Logs: 0
behaviorSubject.next(1); // Logs: 1

// ReplaySubject - replays n values to new subscribers
const replaySubject = new ReplaySubject<number>(2);
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.subscribe(console.log); // Logs: 2, 3

// AsyncSubject - only emits last value on complete
const asyncSubject = new AsyncSubject<number>();
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.subscribe(console.log); // Nothing yet
asyncSubject.complete(); // Logs: 2

Hot vs Cold Observables

FeatureColdHot
Starts on subscribeYesNo
Shared executionNoYes
Late subscribersRestart sequenceMiss previous values
ExamplesHTTP, timerSubject, DOM events

Cold observables create a new producer per subscriber, hot observables share a producer.

Interview-friendly examples

Cold (new producer per subscriber):

  • this.http.get(...) (each subscribe makes a request unless shared)
  • defer(() => fetch(...))

Hot (shared producer):

  • fromEvent(button, 'click')
  • Subject / BehaviorSubject

Signals vs Observables (how they fit together)

Observables are best for async streams and cancellation-heavy flows:

  • HTTP, websockets, user events
  • composition operators (switchMap, mergeMap, retries, buffering)

Signals are best for local in-memory UI state and derived values:

  • synchronous state (signal)
  • memoized derivations (computed)

Bridge patterns:

import { toSignal, takeUntilDestroyed } from '@angular/core/rxjs-interop';

items = toSignal(this.items$); // Observable -> Signal

this.items$.pipe(takeUntilDestroyed()).subscribe(); // manual subscription, safe teardown

Unsubscribing

export class Component implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    // Method 1: takeUntil
    this.dataService.getData().pipe(
      takeUntil(this.destroy$)
    ).subscribe(data => console.log(data));

    // Method 2: Store subscription
    this.subscription = this.dataService.getData()
      .subscribe(data => console.log(data));
  }

  ngOnDestroy() {
    // Method 1
    this.destroy$.next();
    this.destroy$.complete();

    // Method 2
    this.subscription?.unsubscribe();
  }
}

Async Pipe

@Component({
  selector: 'app-users',
  template: `
    <div *ngIf="users$ | async as users">
      <div *ngFor="let user of users">{{user.name}}</div>
    </div>
  `
})
export class UsersComponent {
  users$: Observable<User[]>;

  constructor(private http: HttpClient) {
    this.users$ = this.http.get<User[]>('/api/users');
  }
  // No need to unsubscribe - async pipe handles it
}

Operator Cheat Sheet

  • map: transform values.
  • filter: keep values that match a predicate.
  • switchMap: cancel previous inner observable when a new value arrives.
  • mergeMap: run inner observables concurrently.
  • concatMap: queue inner observables in order.
  • combineLatest: emit when any source emits after all have emitted once.
  • tap: side effects such as logging.
  • catchError: recover from errors with a fallback observable.
  • debounceTime: wait for a pause before emitting.

Combination Operators

import { combineLatest, forkJoin, merge, zip } from 'rxjs';

// combineLatest - emits when any observable emits
combineLatest([obs1$, obs2$]).subscribe(([val1, val2]) => {
  console.log(val1, val2);
});

// forkJoin - waits for all to complete (like Promise.all)
forkJoin([
  this.http.get('/api/users'),
  this.http.get('/api/posts')
]).subscribe(([users, posts]) => {
  console.log(users, posts);
});

// merge - emits from all observables
merge(clicks$, hovers$).subscribe(event => {
  console.log(event);
});

// zip - pairs values by index
zip(numbers$, letters$).subscribe(([num, letter]) => {
  console.log(num, letter);
});