RabbitMQ 란?
RabbitMQ는 메시지 지향 미들웨어에 대한 개방형 표준인 Advanced Message Queuing Protocol (AMQP)을 구현한 오픈소스 메시징 브로커 중 하나이다.
간단하게 이야기하면, 우편물을 받아 수취인에게 전달해주는 우체국과 비슷하다고 생각하면 된다.
메시지 지향 미들웨어란
•
메시지 지향 미들웨어는 분산 시스템에서 서로 다른 어플리케이션들 간의 통신을 지원하는 소프트웨어 구조를 말한다.
•
이 구조에서는 데이터를 메시지 단위로 주고받으며, 메시지를 전달할 때는 중간 단계에서 메시지 큐를 통해 관리한다.
•
이렇게 하면 어플리케이션 간의 결합도를 낮출 수 있어 유연한 아키텍처를 구성할 수 있다.
사용 이유
RabbitMQ를 떠나, Kafka, ActiveMQ, AWS SQS 등 여러 메시지 시스템을 사용하는 이유를 간단하게 표현하자면, ‘대규모 Enterprise 환경에서 Real-Time 어플리케이션 구현을 위함’ 이라고 표현할 수 있을 것이다.
이를 위해서는 Message Orientied Middleware(MOM)의 이해가 필요합니다. 아래 글에서 MOM, MQ, AMQP에 대한 정보를 얻을 수 있습니다.
MOM은 설계도, Message System는 구현체라고 보면 쉬울 것
또한, 이러한 메시징 시스템은 장점도 가지지만, 단점도 함께 가지게 된다.
장점
•
서비스간의 결합도가 낮아지므로 비즈니스 로직에만 집중 가능
•
메시지 처리 방식은 Message Broker가 담당하고 각 서비스는 메시지만 보내면 됨
•
각 서비스가 비동기 방식으로 메시지를 보내기만 하면, Message Broker에서 순서 보장(Queue)과 메시지 전송을 보장
단점
•
Message broker 운영을 위한 자원 더 소모
•
시스템이 복잡하여 관리가 어려움
•
호출 구간이 늘어나므로 신뢰성이 떨어지고 네트워크 비용이 추가로 발생
컴포넌트
•
메시지 지향 미들웨어에 대한 개방형 표준인 Advanced Message Queuing Protocol (AMQP)을 구현한 것이기 때문에 AMQP의 컴포넌트들을 모두 가지고 있다.
컴포넌트 | 설명 |
Producer | 메시지를 보내는일만 하는 컴포넌트 |
Binding | 전달 받은 메시지를 원하는 Queue 로 전달하기 위해 Bindings 이라는 규칙을 정의 |
Exchange | Queue 또는 다른 Exchange로 분배하는 라우팅 기능 |
Queue | 메모리나 디스크에 메시지를 저장하고, 그것을 Consumer에게 전달하는 역할 |
Routing Key | 메시지 헤더에 포함되어 어떤 Queue로 메시지를 Routing 할지 체크할때 사용하는 키 |
Standard Exchange Type | 대부분 MQ(Message Queue)에서 가능한 여러가지 상황에 대해 AMQP에서 정의한 표준 라우팅 알고리즘 |
Virtual Host | 하나의 브로커에서 여러 메시징 도메인을 가능하게 하는 논리적 컨테이너이며 이들 간의 분리와 메시징 트래픽을 컨트롤을 관리 유지 |
Connection | 클라이언트가 RabbitMQ 브로커와 상호 작용하기 위해 취해야 하는 첫 번째 단계는 연결을 설정
즉, 발행자와 소비자, Broker 사이의 물리적인 연결 |
Channels | 발행자와 소비자, Broker 사이의 논리적인 연결, 하나의 Connectoin 내에 다수의 Channel 설정 가능
단일 연결 위에 여러 논리 흐름을 다중화 |
Consumer | Message 를 수신하는 주체 |
GUI 실습
사용환경
- Mac OS
RabbitMQ 설치
# rabbitmq mac 설치 - homebrew 이용
$ brew install rabbitmq
# rabbitmq 시작
$ brew services start rabbitmq
Shell
복사
•
위 명령어로 Rabbitmq를 시작하고 http://localhost:15672 를 통해 RabbitMQ 콘솔에 접근 가능
•
기본 계정
◦
Username : guest
◦
Password : guest
계정 생성 및 권한 부여
# 계정 생성
$ /opt/homebrew/sbin/rabbitmqctl add_user admin admin
# 권한 부여 (tag 설정)
$ /opt/homebrew/sbin/rabbitmqctl set_user_tags admin administrator
# 계정 보기
$ /usr/local/sbin/rabbitmqctl list_users
Shell
복사
/opt/homebrew/sbin/rabbitmqctl 를 찾을 수 없다면?
$ find / -name rabbitmqctl 을 통해 rabbitmqctl 위치를 찾은 후 진행
Virtual Host 생성 및 유저 할당
# vhost 생성
$ /opt/homebrew/sbin/rabbitmqctl add_vhost vhost-01
# 유저 vhost 권한 부여
예) rabbitmqctl list_permissions [-p <vhost>] <user> <conf> <write> <read>
$ /opt/homebrew/sbin/rabbitmqctl set_permissions -p vhost-01 admin ".*" ".*" ".*"
Shell
복사
•
vhost 생성
•
하나의 유저는 여러개의 Virtual Host를 가질 수 있다.
•
권한 부여 방식 : 정규표현식으로 권한을 한정
.* : 모든 권한 부여
^$ : 아무 권한도 주지 않음
•
vhost 클릭 시 권한 설정 가능
Exchange 생성
•
Virtual Host : 가상 호스트 설정
•
Name : Exchange의 이름
•
Type : Exchange Type으로 direct, fanout, headers, topic 이 존재
•
Durability : 브로커를 다시 실행해도 exchange를 유지할지에 관한 옵션
◦
durable : 재시작해도 유지
◦
transient : 재시작 시 삭제
•
autoDelete : 마지막 Queue 연결이 해제되면 삭제
•
internal : 생성된 exchange에 메시지를 직접적으로 보낼 수 없고 해당 exchange가 binding된 다른 exchange를 통해 전달하도록 설정하는 옵션
Queue 생성
•
Virtual Host : 가상 호스트 설정
•
Type : quorum, classic, Stream, Default of Virtual Host
◦
quorum : 내구성(durability)를 강화시키고 큐를 복제해 안정적인 메시지 전달 구현
◦
classic : 큐를 복제하지 않기 때문에, 데이터의 안정적인 전달 보다는 많은 데이터 전달이 필요할 때 사용
◦
Stream : 보통 많은 애플리케이션이 동일한 메시지를 읽어야 하는 경우 사용 - kafka와 비슷
•
Name : Queue의 이름
•
Durability : 브로커를 다시 실행해도 Queue를 유지할지에 관한 옵션
◦
durable : 재시작해도 유지
◦
transient : 재시작 시 삭제
•
Exclusive : 현재 연결에서만 액세스할 수 있으며 해당 연결이 닫히면 삭제
•
Arguments : Message 의 TTL, Max Length 같은 추가 기능을 명시
Quorum Queue
•
다중 노드 환경에서 메시지 손실 없이 안정적인 메시지 전달을 보장하는 큐입니다.
•
기존의 Mirrored Queues는 하나의 노드에 저장된 메시지를 다른 노드로 복제하는 방식이었기 때문에, 하나의 노드에서 장애가 발생하면 해당 노드에서 처리되는 모든 메시지가 유실될 수 있었습니다.
•
하지만 Quorum Queues는 여러 노드에 메시지를 복제해서 저장하기 때문에, 하나의 노드에 장애가 발생해도 다른 노드에 저장된 메시지를 사용할 수 있어 안정적인 메시지 전달을 보장합니다.
•
Quorum Queues는 다른 큐와 동일하게 사용 가능하며, 큐 생성 시 quorum 옵션을 사용하여 Quorum Queues로 생성할 수 있습니다.
•
Quorum Queues의 레플리카 기능과 달리 모든 노드에 대한 쓰기 권한이 있어 다중 노드 환경에서 안정적인 메시지 전달을 보장합니다.
•
Quorum Queues는 odd-numbered(홀수) cluster 노드 구성을 권장합니다.
◦
짝수 노드 구성에서는 "split-brain"과 같은 문제가 발생할 수 있습니다.
Stream Queue
•
Streams : RabbitMQ 3.9에 새로 추가된 데이터 구조
◦
기존 큐와 달리 메시지를 삭제하지 않는 Append-Only 모델
◦
데이터가 유지되고 복제됨
•
Usecase
◦
여러 어플리케이션이 동시에 같은 메시지를 받아야 할때
◦
대용량 백로그 저장
◦
Timestamp 기준으로 Replay 및 시간여행 가능
◦
기존 큐 대비 엄청 빠름
•
주요 기능
◦
최소 1회 전송 보장
◦
서버측 오프셋 트래킹 지원. 컨슈머가 원하는 부분부터 재시작 가능
◦
무한 확장 가능. 크기/기간 기준 저장정책을 통해 삭제 지원
◦
초고속의 전용 바이너리 프로토콜 및 AMQP 0.91 & 1.0 지원
◦
클라이언트-서버 TLS 지원
Spring Boot 실습
가정
•
RabbitMQ 설치가 되어 있음
•
RabbitMQ의 Vhost를 가지고 있는 username과 password가 존재
•
한 프로젝트 안에 Producer와 Consumer를 둔 뒤 메시지를 주고 받는 모습 확인
Build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
}
Groovy
복사
application.yml
spring:
application:
name: rabbitmq
config:
import: classpath:/secretkey.properties
rabbitmq:
host: localhost
port: 5672
username: ${RABBITMQ_USERNAME}
password: ${RABBITMQ_PASSWORD}
virtual-host: vhost-01
rabbitmq:
exchange: vhost-01.direct
queue: q.direct_test
bindingKey: rabbitmq
routingKey: rabbitmq
YAML
복사
Config
// Direct Exchange 생성
@Bean
public DirectExchange createDirectExchange() {
return new DirectExchange(EXCHANGE_NAME_DIRECT, true, false, null);
}
// Queue 생성
// 큐 이름, durability, exclusive, audoDelete, args
// 큐 이름 : amq로 시작하는 큐 이름은 예약어이기 때문에 에러발생
// durability : 브로커를 재시작해도 유지됨
// exclusive : 단 하나의 연결에서만 사용되며, 연결이 끊기면 삭제
// audoDelete : 최소 한 명의 소비자가 있는 큪눈 마지막 소비자가 구독을 취소하면 삭제
@Bean
public Queue createQueue() {
return new Queue(QUEUE_NAME, true,false,false,null);
}
// Binding
@Bean
public Binding binding(DirectExchange exchange ) {
return BindingBuilder.bind(createQueue()).to(exchange).with(BINDING_KEY);
}
// JSON 형식의 메시지를 주고 받을 경우 설정
// 의존성 주입을 이용할 경우 RabbitTemplate 와 ConnectionFatcory는 자동 생성
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
// JSON 컨버터
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
Java
복사
ProducerService
@Service
@RequiredArgsConstructor
public class ProducerService {
private final RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.exchange}")
private String exchange;
@Value("${rabbitmq.routingKey}")
private String routingKey;
/*
exchage와 매핑된 큐에 라우팅 키를 확인 후 매핑이 옳바르다면 큐에 저장
*/
public void addQueue(Message message){
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
Java
복사
ConsumerService
@Slf4j
@Service
public class ConsumerService {
private int cnt = 1;
/*
RabbitListener를 통해 큐에 저장된 메시지를 받아옴
*/
@RabbitListener(queues = "${rabbitmq.queue}")
public void getMessage(Message message){
log.info("[ "+ cnt++ +" ]" +" Consumer get Message from Queue: " + message);
}
}
Java
복사
ProducerController
@Slf4j
@RestController
@RequiredArgsConstructor
public class ProducerController {
private final ProducerService rabbitMqService;
private int cnt = 1;
private Message message = new Message("TEST", "TEST");
@RequestMapping("/send")
public Message send() {
log.info("[ "+ cnt++ +" ]" +" Producer Put Message to Queue : " + message);
rabbitMqService.addQueue(message);
return message;
}
}
Java
복사
심화
Exchange
// 아래 모든 생성자는 Builder를 통해 생성 가능
// Direct Exchange 생성
@Bean
public DirectExchange createDirectExchange() {
return new DirectExchange(EXCHANGE_NAME_DIRECT, true, false, null);
}
// Topic Exchange 생성
@Bean
public TopicExchange createDirectExchange() {
return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false, null);
}
// Headers Exchange 생성
@Bean
public HeadersExchange createDirectExchange() {
return new HeadersExchange(EXCHANGE_NAME_HEADERS, true, false, null);
}
// Fanout Exchange 생성
@Bean
public FanoutExchange createDirectExchange() {
return new FanoutExchange(EXCHANGE_NAME_FANOUT, true, false, null);
}
// rabbitmq_delayed_message_exchange(지연큐를 위한 플로그인) 플러그인과 같은 플러그인을 사용할 경우 사용
@Bean
public CustomExchange exchange() {
Map<String,Object> headers = new HashMap<>();
headers.put("x-delayed-type","direct");
// 인자 -> exchange명, 타입, durable, autoDelete, args
return new CustomExchange(RabbitUtil.TOPIC_EXCHANGE_NAME,"x-delayed-message",true,false,headers);
}
Java
복사
Queue
// 아래 모든 생성자는 Builder를 통해 생성 가능
// Queue 생성
@Bean
public Queue createQueue() {
return new Queue(QUEUE_NAME, true);
}
// Queue 타입 지정해 생성
@Bean
public Queue createQueueByType() {
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
return new Queue(QUEUE_NAME, false, false, false, args);
}
Java
복사
Arguments | Description |
x-message-ttl | - 메시지가 큐에 대기하는 시간을 제한
- 밀리초 단위로 TTL(Time-To-Live)을 설정하여 메시지가 일정 시간 후에 큐에서 제거 |
x-expires | - 큐 자체의 수명을 설정
- 밀리초 단위로 지정된 시간이 경과하면 큐가 자동으로 삭제 |
x-max-length | - 큐에 저장될 수 있는 최대 메시지 개수를 제한하는데 사용
- 큐에 지정된 개수 이상의 메시지가 들어오면 가장 오래된 메시지가 삭제 |
x-max-length-bytes | - 큐에 저장될 수 있는 최대 바이트 수를 제한하는데 사용
- 큐의 용량을 바이트 단위로 제한하며, 이 값을 초과하는 경우 가장 오래된 메시지가 삭제 |
x-dead-letter-exchange
x-dead-letter-routing-key | - 대기열에서 만료된 메시지나 거부된 메시지를 전달할 교환 및 라우팅 키를 지정하는 데 사용
- 이를 통해 "Dead Letter Exchange" 패턴을 구현 |
x-overflow | - 큐가 가득 찼을 때 메시지 처리 방식을 지정할 수 있음
- "drop-head"로 설정하면 가장 오래된 메시지가 제거되고 새로운 메시지가 큐에 추가 |
x-queue-mode | 큐 모드를 "lazy"로 설정하여, 메모리 대신 디스크에 데이터를 유지 |
x-single-active-consumer | - 큐에 단일 소비자만 허용하도록 설정할 수 있음
- 이 소비자가 작업을 완료하기 전까지 다른 소비자가 큐에서 메시지를 가져갈 수 없음 |
x-queue-type | - 큐의 종류를 지정할 수 있음
- 기본적으로는 "classic"으로 설정되며, "quorum"으로 설정하면 큐가 quorum 모드로 동작하게 됨
- Quorum 모드는 데이터의 안정성과 가용성을 개선하기 위해 설계된 모드 |
x-max-priority | - 큐에 저장되는 메시지에 우선순위를 부여하고 싶을 때 사용
- 높은 우선순위의 메시지가 먼저 처리 |
x-keep-history | 큐가 삭제되었을 때 큐의 메타데이터와 메시지 히스토리를 보존할지 여부를 설정 |
RabbitListener
// 기본 사용
@RabbitListener(queues = "${rabbitmq.queue}")
public void getMessage(Message message){
log.info("[ "+ cnt++ +" ]" +" Consumer get Message from Queue: " + message);
}
// 바인딩을 리스너에서 바로 지정하는 방법
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", type = "fanout", delayed = "false"),
key = "orderRoutingKey")
)
Java
복사
Connection & Channel
// 새로운 커넥션 생성
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
// 새로운 채널 생성
Channel ch = connection.createChannel();
// 채널을 이용할 Exchange와 Queue 정의
//exchange -> direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//create Queue
String queueName = channel.queueDeclare().getQueue();
//binding
for(String severity:args)
channel.queueBind(queueName, EXCHANGE_NAME, severity);
Java
복사
재해복구
•
메시지 손실에 대한 문제
◦
Consumer가 메시지를 처리하다가 에러가 발생할 경우
→ Message. Acknoledgement를 이용해 Consumer가 메시지 처리 후 Ack을 보내도록 하여 만약 Ack을 보내지 않는다면, 다시 큐에 넣는다.
▪
SimpleMessageListenerContainer 를 빈에 등록
•
@RabbitListener 가 붙은 매서드는 삭제
•
AcknowledgeMode.MANUAL 설정
@Bean
public SimpleMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setQueueNames(QUEUE_NAME);
listenerContainer.setMessageListener(messageListener);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // Acknowledge 매뉴얼 설정
listenerContainer.setPrefetchCount(20);
return listenerContainer;
}
Java
복사
▪
MessageListener 생성
•
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 을 통해 메시지 테그로 처리 완료된 메시지에 대한 ACK을 보냄
@Slf4j
@Component
public class MessageListener implements ChannelAwareMessageListener {
private static int cnt = 1;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
log.info("[ "+ cnt++ +" ]" +" Consumer get Message from Queue: " + message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
Java
복사
•
channel.basicAck 을 주석처리 했을 경우
•
channel.basicAck 을 주석처리 해제 했을 경우
간단하게 Consumer의 쓰레드를 올려주는 방법도 존재
•
SimpleMessageListenerContainer 클래스에 listenerContainer.setConcurrentConsumers(10); 로 쓰레드 늘려주기!
◦
Durability
→ 디스크에 메시지를 저장해 서버를 재시작하면 해당 메시지가 계속 유지되도록 함
▪
애초에 Queue와 Exchange를 생성할 때 Durability를 true로 설정
◦
Fair dispatch
→ 메시지가 도착했다고 바로 분배하지 말고, 최대 n개씩만 분배하도록 요청할 수 있다. 그래서 이미 소비자에게 n개가 부여되었다면 놀고 있는 다른 소비자에게 분배되도록
▪
SimpleMessageListenerContainer 를 빈에 등록
•
listenerContainer.setPrefetchCount(20); 을 통해 prefetch 설정
@Bean
public SimpleMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setQueueNames(QUEUE_NAME);
listenerContainer.setMessageListener(messageListener);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // Acknowledge 매뉴얼 설정
listenerContainer.setPrefetchCount(20);
return listenerContainer;
}
Java
복사