RxJS 입문 — Observable과 반응형 프로그래밍

RxJS가 필요한 이유, Observable 패러다임과 Promise의 차이, 구독과 해지, 생성·변환·고차·결합 연산자, switchMap·mergeMap·concatMap 차이, Subject, Angular/React에서의 활용 패턴, 마블 다이어그램 읽는 법까지 RxJS 핵심을 완전 정리합니다.

· 13 min read · PALDYN Team

지난 글에서 MobX의 투명한 반응형 상태 관리를 살펴봤습니다. 이번에는 한 단계 더 깊이 들어가 **RxJS(Reactive Extensions for JavaScript)**를 다룹니다. RxJS는 상태 관리 라이브러리가 아니라 비동기 이벤트 스트림을 다루는 라이브러리입니다. 클릭, HTTP 요청, WebSocket 메시지, 타이머 등 시간축 위에 펼쳐지는 모든 것을 하나의 모델로 다룰 수 있습니다.


RxJS가 필요한 이유

JavaScript에서 비동기를 처리하는 방법은 콜백 → Promise → async/await 순으로 발전해왔습니다. 단일 비동기 작업은 Promise와 async/await으로 충분합니다. 하지만 다음 시나리오를 생각해보세요:

  • 검색창 입력마다 API를 호출하되, 이전 요청은 취소해야 한다
  • 여러 소스의 이벤트를 결합해 하나의 상태를 만들어야 한다
  • WebSocket 메시지에 필터링·변환·디바운싱을 적용해야 한다
  • 실패 시 3번까지 재시도하고, 그래도 실패하면 폴백을 보여줘야 한다

이런 비동기 이벤트의 조합과 변환이 많아질수록 Promise 기반 코드는 복잡해집니다. RxJS는 이 복잡함을 파이프라인으로 표현합니다.

// Promise 방식 — 취소·조합이 어렵다
let currentRequest = null

async function search(query) {
  currentRequest?.abort()
  const controller = new AbortController()
  currentRequest = controller
  const data = await fetch(`/api/search?q=${query}`, {
    signal: controller.signal
  }).then(r => r.json())
  return data
}

// RxJS 방식 — 선언적으로 표현
fromEvent(input, 'input').pipe(
  map(e => e.target.value),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => ajax.getJSON(`/api/search?q=${query}`))
).subscribe(results => setResults(results))

Observable 패러다임 vs Promise

Observable과 Promise의 차이를 이해하는 것이 RxJS 학습의 출발점입니다.

특성PromiseObservable
값의 수단일 값 (1개)0개 ~ 무한
실행 시점즉시 실행 (eager)구독 시 실행 (lazy)
취소불가 (기본)unsubscribe()로 취소 가능
연산자.then(), .catch()70개 이상의 연산자
멀티캐스트가능기본은 유니캐스트 (Subject로 멀티캐스트)

Observable은 lazy합니다. subscribe()를 호출하기 전까지 아무 일도 일어나지 않습니다.

import { Observable } from 'rxjs'

const observable = new Observable(subscriber => {
  console.log('구독됨!')
  subscriber.next(1)
  subscriber.next(2)
  subscriber.complete()
})

// 이 시점까지 아무것도 실행되지 않음
console.log('구독 전')

observable.subscribe({
  next: value => console.log('값:', value),
  error: err => console.error('에러:', err),
  complete: () => console.log('완료')
})

// 출력 순서:
// 구독 전
// 구독됨!
// 값: 1
// 값: 2
// 완료

RxJS Observable 스트림


구독(subscribe)과 해지(unsubscribe), 메모리 누수 주의

Observable을 구독하면 Subscription 객체가 반환됩니다. 더 이상 필요 없을 때 반드시 해지해야 합니다.

import { interval } from 'rxjs'

const subscription = interval(1000).subscribe(n => console.log(n))

// 3초 후 해지
setTimeout(() => {
  subscription.unsubscribe()
  console.log('구독 해지 완료')
}, 3000)

React에서는 useEffect의 cleanup 함수에서 해지합니다.

useEffect(() => {
  const sub = someObservable$.subscribe(handler)
  return () => sub.unsubscribe()  // 컴포넌트 언마운트 시 자동 해지
}, [])

takeUntil, take, first 같은 완료 연산자를 사용하면 Observable이 자동으로 완료되어 구독도 자동 해지됩니다.

import { interval, Subject } from 'rxjs'
import { takeUntil } from 'rxjs/operators'

const destroy$ = new Subject()

interval(1000).pipe(
  takeUntil(destroy$)  // destroy$가 emit하면 자동 완료
).subscribe(n => console.log(n))

// 컴포넌트 소멸 시
destroy$.next()
destroy$.complete()

생성 연산자

RxJS는 다양한 소스로부터 Observable을 만드는 생성 연산자를 제공합니다.

import { of, from, interval, fromEvent, timer } from 'rxjs'
import { ajax } from 'rxjs/ajax'

// of: 주어진 값들을 순서대로 emit하고 완료
of(1, 2, 3).subscribe(console.log)  // 1, 2, 3

// from: Promise, 배열, 이터러블을 Observable로 변환
from([10, 20, 30]).subscribe(console.log)
from(fetch('/api/data')).subscribe(res => console.log(res.status))

// interval: 지정한 ms마다 0부터 증가하는 숫자 emit
interval(1000).pipe(take(5)).subscribe(console.log)  // 0,1,2,3,4

// fromEvent: DOM 이벤트를 Observable로
fromEvent(document, 'click').subscribe(e => console.log(e.clientX))

// timer: 지연 후 한 번, 또는 주기적으로 emit
timer(2000, 1000).subscribe(console.log)  // 2초 후 시작, 1초마다

// ajax: HTTP 요청
ajax.getJSON('/api/users').subscribe(users => console.log(users))

변환 연산자: map, filter, tap

파이프라인에서 스트림의 값을 변환합니다.

import { fromEvent } from 'rxjs'
import { map, filter, tap } from 'rxjs/operators'

fromEvent(document, 'keyup').pipe(
  map(e => e.key),                        // KeyboardEvent → key 문자열
  filter(key => key.length === 1),        // 단일 문자만 통과
  tap(key => console.log('입력:', key)),  // 부수 효과 (디버깅), 스트림 변경 없음
  map(key => key.toUpperCase())           // 대문자 변환
).subscribe(key => console.log('최종:', key))

tap은 스트림의 값을 변경하지 않고 부수 효과만 실행합니다. 디버깅이나 로깅에 유용합니다.


고차 연산자: switchMap, mergeMap, concatMap, exhaustMap

고차(higher-order) 연산자는 각 값을 새로운 Observable로 변환합니다. “Observable을 emit하는 Observable”을 평탄화(flatten)하는 방식이 각각 다릅니다.

switchMap vs mergeMap vs concatMap

switchMap — 이전 inner 취소

새 값이 오면 이전 inner Observable을 취소하고 새 것을 시작합니다. 검색 자동완성처럼 “최신 요청만 유효”한 경우에 적합합니다.

import { fromEvent } from 'rxjs'
import { map, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators'
import { ajax } from 'rxjs/ajax'

fromEvent(searchInput, 'input').pipe(
  map(e => e.target.value),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query =>
    ajax.getJSON(`/api/search?q=${encodeURIComponent(query)}`)
  )
).subscribe(results => renderResults(results))

mergeMap — 병렬 실행

모든 inner Observable을 동시에 실행하고 도착하는 순서대로 emit합니다. 독립적인 HTTP 요청을 병렬로 실행할 때 적합합니다.

import { from } from 'rxjs'
import { mergeMap } from 'rxjs/operators'
import { ajax } from 'rxjs/ajax'

const userIds = [1, 2, 3, 4, 5]

from(userIds).pipe(
  mergeMap(id => ajax.getJSON(`/api/users/${id}`))
  // 5개 요청이 동시에 실행, 응답 순서는 비보장
).subscribe(user => console.log(user))

concatMap — 순서 보장

이전 inner Observable이 완료된 후에만 다음 것을 시작합니다. 순서가 중요한 파일 업로드, 순차 API 호출에 적합합니다.

import { from } from 'rxjs'
import { concatMap } from 'rxjs/operators'

const filesToUpload = [file1, file2, file3]

from(filesToUpload).pipe(
  concatMap(file => uploadFile(file))  // file1 완료 후 file2, file2 완료 후 file3
).subscribe(result => console.log('업로드 완료:', result.name))

exhaustMap — 진행 중이면 무시

inner Observable이 실행 중일 때 새 값이 오면 무시합니다. 로그인 버튼처럼 중복 클릭을 방지할 때 적합합니다.

import { fromEvent } from 'rxjs'
import { exhaustMap } from 'rxjs/operators'

fromEvent(loginButton, 'click').pipe(
  exhaustMap(() => loginRequest$)  // 로그인 진행 중 추가 클릭 무시
).subscribe(user => navigateHome(user))

결합 연산자

여러 Observable을 결합할 때 사용합니다.

import { combineLatest, forkJoin, merge } from 'rxjs'

// combineLatest: 모든 소스의 최신 값을 결합, 하나라도 바뀌면 emit
const form$ = combineLatest([
  nameInput$,
  emailInput$,
  passwordInput$
]).pipe(
  map(([name, email, password]) => ({ name, email, password }))
)

// forkJoin: 모든 소스가 완료되면 마지막 값을 배열로 emit (Promise.all과 유사)
forkJoin([
  ajax.getJSON('/api/user'),
  ajax.getJSON('/api/posts'),
  ajax.getJSON('/api/comments')
]).subscribe(([user, posts, comments]) => {
  console.log(user, posts, comments)
})

// merge: 여러 소스를 하나의 스트림으로 합침
merge(
  fromEvent(document, 'click'),
  fromEvent(document, 'touchstart')
).subscribe(e => console.log('입력 감지:', e.type))

Subject: Observable이자 Observer

Subject는 Observable과 Observer 역할을 동시에 합니다. 외부에서 직접 값을 push할 수 있어 이벤트 버스, 컴포넌트 간 통신에 활용됩니다.

import { Subject, BehaviorSubject, ReplaySubject } from 'rxjs'

// Subject: 구독 후 emit된 값만 받음
const subject = new Subject()
subject.subscribe(v => console.log('A:', v))
subject.next(1)  // A: 1
subject.subscribe(v => console.log('B:', v))
subject.next(2)  // A: 2, B: 2

// BehaviorSubject: 초기값을 가지며 최신 값을 새 구독자에게 즉시 전달
const count$ = new BehaviorSubject(0)
count$.subscribe(v => console.log('count:', v))  // 즉시 0 출력
count$.next(1)   // count: 1
count$.next(2)   // count: 2
// 나중에 구독해도 현재값(2)을 즉시 받음
count$.subscribe(v => console.log('late:', v))   // late: 2

// ReplaySubject: 지정한 개수만큼 이전 값을 새 구독자에게 재생
const replay$ = new ReplaySubject(3)  // 마지막 3개 값 보관
replay$.next(10)
replay$.next(20)
replay$.next(30)
replay$.next(40)
replay$.subscribe(v => console.log(v))  // 20, 30, 40 즉시 출력

Angular/React에서의 RxJS 활용 패턴

Angular: 기본 내장

Angular는 RxJS를 기본으로 사용합니다. HttpClient는 Observable을 반환하고, async 파이프로 템플릿에서 직접 구독할 수 있습니다.

// Angular 서비스
@Injectable({ providedIn: 'root' })
export class UserService {
  users$ = this.http.get<User[]>('/api/users').pipe(
    shareReplay(1)  // 여러 컴포넌트가 구독해도 요청은 한 번
  )

  constructor(private http: HttpClient) {}
}

// Angular 템플릿 — async 파이프가 구독/해지 자동 처리
@Component({
  template: `
    <div *ngFor="let user of userService.users$ | async">
      {{ user.name }}
    </div>
  `
})
export class UserListComponent {
  constructor(public userService: UserService) {}
}

React: 커스텀 훅 패턴

import { useState, useEffect } from 'react'
import { BehaviorSubject } from 'rxjs'
import { distinctUntilChanged } from 'rxjs/operators'

// 전역 상태를 BehaviorSubject로 관리
const theme$ = new BehaviorSubject('light')

// RxJS Observable을 React 상태로 연결하는 커스텀 훅
function useObservable(observable$, initialValue) {
  const [value, setValue] = useState(initialValue)

  useEffect(() => {
    const sub = observable$.subscribe(v => setValue(v))
    return () => sub.unsubscribe()
  }, [observable$])

  return value
}

// 사용
function ThemeToggle() {
  const theme = useObservable(theme$, 'light')

  return (
    <button onClick={() => theme$.next(theme === 'light' ? 'dark' : 'light')}>
      현재 테마: {theme}
    </button>
  )
}

마블 다이어그램 읽는 법

RxJS 공식 문서는 **마블 다이어그램(marble diagram)**으로 연산자 동작을 시각화합니다.

시간 →
─────●─────●─────●──|─→   source$  (●: 값 emit, |: 완료, ✗: 에러)
     1     2     3

map(x => x * 2)

─────●─────●─────●──|─→   result$
     2     4     6

읽는 규칙:

  • 가로축: 시간의 흐름 (왼쪽 → 오른쪽)
  • 원(●): 값이 emit되는 시점과 값
  • 세로 선(|): Observable 완료
  • X: 에러 발생
  • 아래 화살표: 연산자 적용
  • 결과 라인: 연산자 출력

switchMap의 마블 다이어그램을 읽으면:

source$: ─────A──────B──────C──|─→
              │      │      │
              ∨      │      │
         ─a──a──✗    │      │      (A의 inner — B 도착 시 취소)
                ─b───b──✗   │      (B의 inner — C 도착 시 취소)
                        ─c──c──|─→ (C의 inner — 완료까지 실행)

result$:                ─c──c──|─→

공식 사이트 rxmarbles.com에서 인터랙티브하게 마블 다이어그램을 실험해볼 수 있습니다.


정리

RxJS는 처음 접하면 연산자 이름과 개수에 압도될 수 있습니다. 하지만 핵심 개념은 단순합니다:

  • Observable — 시간축 위의 값의 흐름, 구독 전까지 실행 안 됨 (lazy)
  • subscribe — 구독 시작, 반드시 unsubscribe로 해지해야 메모리 누수 없음
  • pipe + operators — 생성 → 변환 → 필터링 → 결합 의 선언적 파이프라인
  • 고차 연산자 4종: switchMap(취소) / mergeMap(병렬) / concatMap(순서) / exhaustMap(무시)
  • Subject — Observer + Observable, 외부에서 직접 push 가능

처음에는 fromEvent, map, filter, debounceTime, switchMap 다섯 가지만 익혀도 실용적인 사용 사례 대부분을 커버할 수 있습니다. 나머지 연산자는 필요할 때 공식 문서에서 마블 다이어그램을 보며 찾으면 됩니다.

다음 글에서는 TanStack Query — 서버 상태 관리의 표준에서 서버 데이터를 위한 특화된 상태 관리를 살펴봅니다.