Python Language
AMQPStorm을 사용한 RabbitMQ 소개
RabbitMQ에서 메시지를 소비하는 방법
라이브러리 가져 오기부터 시작하십시오.
from amqpstorm import Connection
메시지를 소비 할 때 먼저 수신 메시지를 처리하는 함수를 정의해야합니다. 이는 호출 할 수있는 모든 함수가 될 수 있으며 메시지 객체 또는 메시지 튜플을 취해야합니다 ( start_consuming
정의 된 to_tuple
매개 변수에 따라 다름).
들어오는 메시지에서 데이터를 처리하는 것 외에도 메시지를 확인 또는 거부해야합니다. RabbitMQ가 메시지를 제대로 수신하고 처리했음을 RabbitMQ에 알려야하기 때문에 중요합니다.
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
다음으로 RabbitMQ 서버에 대한 연결을 설정해야합니다.
connection = Connection('', 'guest', 'guest')
그 다음에는 채널을 설정해야합니다. 각 연결은 다중 채널을 가질 수 있으며 일반적으로 멀티 스레드 작업을 수행 할 때 스레드 당 하나씩 있어야하는 것이 좋습니다 (필수는 아님).
channel =
채널을 설정하고 나면 RabbitMQ가 메시지를 소비하기 시작했음을 알려야합니다. 이 경우 이전에 정의 된 on_message
함수를 사용하여 소비 된 모든 메시지를 처리합니다.
RabbitMQ 서버에서 수신 대기열은 simple_queue
가 될 simple_queue
, RabbitMQ는 수신 메시지가 모두 처리되면이를 수신 확인 함을 알리고 있습니다.
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
마지막으로 RabbitMQ 서버가 전달한 메시지 처리를 시작하려면 IO 루프를 시작해야합니다.
RabbitMQ에 메시지를 게시하는 방법
라이브러리 가져 오기부터 시작하십시오.
from amqpstorm import Connection
from amqpstorm import Message
다음으로 RabbitMQ 서버에 대한 연결을 열어야합니다.
connection = Connection('', 'guest', 'guest')
그 다음에는 채널을 설정해야합니다. 각 연결은 다중 채널을 가질 수 있으며 일반적으로 멀티 스레드 작업을 수행 할 때 스레드 당 하나씩 있어야하는 것이 좋습니다 (필수는 아님).
channel =
채널을 설정하고 나면 메시지를 준비 할 수 있습니다.
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)
이제 우리는 단순히 호출하여 메시지를 게시 할 수 publish
및 제공 routing_key
. 이 경우 메시지를 simple_queue
라는 대기열로 simple_queue
RabbitMQ에서 지연된 큐를 생성하는 방법
먼저 메인 큐를위한 것과 지연 큐를위한 두 개의 기본 채널을 설정해야합니다. 마지막 예제에서는 필자는 필요없는 플래그를 두 개 추가했지만 코드를보다 안정적으로 만듭니다. confirm delivery
, delivery_mode
및 durable
. RabbitMQ 설명서 에서 이에 대한 자세한 정보를 찾을 수 있습니다.
채널을 설정 한 후에는 지연 채널에서 메인 대기열로 메시지를 보내는 데 사용할 수있는 기본 채널에 바인딩을 추가합니다.
channel.queue.bind(exchange='', routing_key='hello', queue='hello')
다음으로 만료 된 메시지를 기본 대기열로 전달하도록 지연 채널을 구성해야합니다.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': 'hello'
x-message-ttl (메시지 - Time To Live)
이것은 일반적으로 특정 기간 후에 대기열에있는 이전 메시지를 자동으로 제거하는 데 사용되지만 두 개의 선택적 인수를 추가하여이 동작을 변경할 수 있으며이 매개 변수를 사용하여 메시지가 지연 대기열에 머무르는 시간 (밀리 초)을 결정할 수 있습니다.
이 변수를 사용하면 메시지를 완전히 제거하는 기본 동작 대신 만료 된 메시지를 다른 대기열로 전송할 수 있습니다.
이 변수는 hello_delay에서 hello 대기열로 메시지를 전송하는 데 사용 된 Exchange를 결정합니다.
지연 대기열에 게시
기본 Pika 매개 변수를 모두 설정했으면 기본 게시를 사용하여 지연 대기열로 메시지를 보냅니다.
properties={'delivery_mod': 2})
스크립트를 실행하면 RabbitMQ 관리 모듈에 다음 대기열이 만들어집니다.
from amqpstorm import Connection
connection = Connection('', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel =
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel =
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': '', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
properties={'delivery_mode': 2})
print("[x] Sent")