Spring RabbitMQ — AMQP 메시지 발행·소비·오류 처리

spring-amqp와 spring-rabbit을 사용해 RabbitMQ 메시지를 발행하고 소비하는 방법을 설명합니다. Exchange·Queue·Binding 선언, RabbitTemplate 발행, @RabbitListener 소비, Dead Letter Queue, Retry 전략까지 실전 예제로 정리합니다.

· 6 min read · PALDYN Team

지난 글에서는 Apache Kafka와 spring-kafka를 통해 높은 처리량의 이벤트 스트리밍을 다뤘습니다. 이번에는 유연한 라우팅신뢰성 있는 메시지 전달에 강점이 있는 RabbitMQ와 AMQP 프로토콜을 Spring에서 사용하는 방법을 살펴보겠습니다.

AMQP vs Kafka — 언제 RabbitMQ를 선택하나?

Kafka는 순서 보장과 리플레이가 필요한 이벤트 스트리밍에 강합니다. 반면 RabbitMQ(AMQP)는 메시지를 즉시 라우팅·소비하고 ACK 후 삭제하는 패턴에 적합합니다. 마이크로서비스 간 RPC·Command·Notification 패턴, TTL이 있는 메시지, 복잡한 라우팅 규칙이 필요한 경우 RabbitMQ가 더 나은 선택입니다.

의존성 추가

<!-- pom.xml -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

spring-boot-starter-amqpspring-amqp, spring-rabbit, 자동 구성을 모두 포함합니다.

RabbitMQ 핵심 개념

RabbitMQ 메시지 흐름 아키텍처

RabbitMQ의 메시지 흐름은 Producer → Exchange → Queue → Consumer 4단계입니다.

개념역할
Exchange메시지를 받아 Binding 규칙으로 Queue에 라우팅
BindingExchange와 Queue를 연결하는 규칙 (routingKey 포함)
Queue메시지를 소비될 때까지 보관
routingKeyExchange가 Queue를 선택할 때 사용하는 문자열

Exchange 타입

  • direct — routingKey가 정확히 일치하는 Queue로 전달
  • topic — 와일드카드(*, #) 패턴으로 여러 Queue에 라우팅
  • fanout — routingKey 무시, 바인딩된 모든 Queue에 브로드캐스트
  • headers — 메시지 헤더 기반 라우팅

application.yml 설정

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual   # ACK 수동 처리
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000ms
          multiplier: 2.0

Queue · Exchange · Binding Bean 선언

@Configuration
public class RabbitConfig {

    public static final String QUEUE    = "order.queue";
    public static final String EXCHANGE = "orders";
    public static final String KEY      = "order.#";

    @Bean
    Queue orderQueue() {
        return QueueBuilder.durable(QUEUE)
            .withArgument("x-dead-letter-exchange", "orders.dlx")
            .build();
    }

    @Bean
    TopicExchange ordersExchange() {
        return new TopicExchange(EXCHANGE);
    }

    @Bean
    Binding binding(Queue orderQueue, TopicExchange ordersExchange) {
        return BindingBuilder
            .bind(orderQueue)
            .to(ordersExchange)
            .with(KEY);
    }

    // Dead Letter 설정
    @Bean Queue dlq()      { return QueueBuilder.durable("order.dlq").build(); }
    @Bean DirectExchange dlx() { return new DirectExchange("orders.dlx"); }
    @Bean Binding dlqBinding() {
        return BindingBuilder.bind(dlq()).to(dlx()).with(QUEUE);
    }
}

x-dead-letter-exchange 인수를 Queue에 설정해 두면, nack되거나 TTL이 만료된 메시지가 자동으로 DLX로 이동합니다.

메시지 발행 — RabbitTemplate

spring-amqp 핵심 코드 패턴

@Service
@RequiredArgsConstructor
public class OrderEventProducer {

    private final RabbitTemplate rabbitTemplate;

    public void publishOrderCreated(OrderEvent event) {
        rabbitTemplate.convertAndSend(
            RabbitConfig.EXCHANGE,
            "order.created",   // routingKey
            event
        );
    }

    public void publishWithDelay(OrderEvent event, long delayMs) {
        rabbitTemplate.convertAndSend(
            RabbitConfig.EXCHANGE, "order.created", event,
            msg -> {
                msg.getMessageProperties().setDelay((int) delayMs);
                return msg;
            }
        );
    }
}

MessagePostProcessor를 통해 메시지별 헤더 조작이 가능합니다. 지연 발행은 rabbitmq-delayed-message-exchange 플러그인이 필요합니다.

메시지 소비 — @RabbitListener

@Component
@RequiredArgsConstructor
public class OrderConsumer {

    private final OrderService orderService;

    @RabbitListener(queues = RabbitConfig.QUEUE)
    public void handle(
            OrderEvent event,
            Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            orderService.processOrder(event);
            channel.basicAck(tag, false);          // 처리 완료
        } catch (RecoverableException e) {
            channel.basicNack(tag, false, true);   // 재큐 (requeue=true)
        } catch (Exception e) {
            channel.basicNack(tag, false, false);  // DLQ로 이동 (requeue=false)
        }
    }
}

acknowledge-mode: manual로 설정하면 channel.basicAck() 또는 basicNack()을 직접 호출해야 합니다. 처리에 성공하면 basicAck, 재시도 가능한 오류는 basicNack(requeue=true), 영구 오류는 basicNack(requeue=false)로 DLQ에 보냅니다.

@RabbitListener 고급 옵션

// 여러 Queue 동시 리스닝
@RabbitListener(queues = {"queue.a", "queue.b"})

// concurrency (스레드 수)
@RabbitListener(queues = "order.queue", concurrency = "3-10")

// 선언과 동시에 Binding
@RabbitListener(bindings = @QueueBinding(
    value    = @Queue("order.queue"),
    exchange = @Exchange(value="orders", type="topic"),
    key      = "order.#"
))

Dead Letter Queue 처리

@RabbitListener(queues = "order.dlq")
public void handleDlq(
        OrderEvent event,
        @Header(AmqpHeaders.DELIVERY_TAG) long tag,
        Channel channel) throws IOException {

    log.error("DLQ 수신: {}", event);
    // 슬랙 알림, 별도 DB 기록 등
    channel.basicAck(tag, false);   // DLQ에서도 ACK 필수
}

DLQ 소비자는 실패한 메시지를 로깅·알림 처리하거나, 일정 시간 후 원래 Queue에 재발행하는 리드라이브(re-drive) 전략을 사용합니다.

Jackson 메시지 변환기 설정

@Bean
Jackson2JsonMessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory cf,
                               Jackson2JsonMessageConverter conv) {
    RabbitTemplate tpl = new RabbitTemplate(cf);
    tpl.setMessageConverter(conv);
    return tpl;
}

기본 변환기는 Java 직렬화를 사용합니다. Jackson2JsonMessageConverter로 교체하면 JSON으로 주고받을 수 있어 다른 언어의 소비자와도 호환됩니다.

테스트 — Testcontainers

@SpringBootTest
@Testcontainers
class OrderConsumerTest {

    @Container
    static RabbitMQContainer rabbit =
        new RabbitMQContainer("rabbitmq:3.12-management");

    @DynamicPropertySource
    static void props(DynamicPropertyRegistry r) {
        r.add("spring.rabbitmq.host", rabbit::getHost);
        r.add("spring.rabbitmq.port", rabbit::getAmqpPort);
    }

    @Autowired RabbitTemplate template;

    @Test
    void orderCreatedEventIsProcessed() throws Exception {
        template.convertAndSend(EXCHANGE, "order.created",
                                new OrderEvent(1L, "CREATED"));
        await().atMost(5, SECONDS)
               .untilAsserted(() -> verify(orderService).processOrder(any()));
    }
}

주요 모니터링 포인트

지표의미기준값
Queue depth처리되지 않은 메시지 수지속 증가 → 소비자 스케일아웃
Consumer count큐당 활성 소비자 수0이면 알림
nack rate처리 실패 비율급증 → DLQ 확인
Publish rate초당 발행 수소비 속도와 비교

RabbitMQ Management UI(http://localhost:15672)에서 실시간 확인이 가능하며, Spring Actuator에 micrometer-registry-prometheus를 추가하면 rabbitmq.* 메트릭을 Prometheus/Grafana로 수집할 수 있습니다.


지난 글: Spring Kafka 기초 — KafkaTemplate 발행과 @KafkaListener 소비

다음 글: 이벤트 기반 아키텍처 — Spring으로 구현하는 EDA 패턴


읽어주셔서 감사합니다. 😊