Python Language
AMQPStorm을 사용한 RabbitMQ 소개
수색…
비고
RabbitMQ에서 메시지를 소비하는 방법
라이브러리 가져 오기부터 시작하십시오.
from amqpstorm import Connection
메시지를 소비 할 때 먼저 수신 메시지를 처리하는 함수를 정의해야합니다. 이는 호출 할 수있는 모든 함수가 될 수 있으며 메시지 객체 또는 메시지 튜플을 취해야합니다 ( start_consuming
정의 된 to_tuple
매개 변수에 따라 다름).
들어오는 메시지에서 데이터를 처리하는 것 외에도 메시지를 확인 또는 거부해야합니다. RabbitMQ가 메시지를 제대로 수신하고 처리했음을 RabbitMQ에 알려야하기 때문에 중요합니다.
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
:return:
"""
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
message.ack()
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
다음으로 RabbitMQ 서버에 대한 연결을 설정해야합니다.
connection = Connection('127.0.0.1', 'guest', 'guest')
그 다음에는 채널을 설정해야합니다. 각 연결은 다중 채널을 가질 수 있으며 일반적으로 멀티 스레드 작업을 수행 할 때 스레드 당 하나씩 있어야하는 것이 좋습니다 (필수는 아님).
channel = connection.channel()
채널을 설정하고 나면 RabbitMQ가 메시지를 소비하기 시작했음을 알려야합니다. 이 경우 이전에 정의 된 on_message
함수를 사용하여 소비 된 모든 메시지를 처리합니다.
RabbitMQ 서버에서 수신 대기열은 simple_queue
가 될 simple_queue
, RabbitMQ는 수신 메시지가 모두 처리되면이를 수신 확인 함을 알리고 있습니다.
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
마지막으로 RabbitMQ 서버가 전달한 메시지 처리를 시작하려면 IO 루프를 시작해야합니다.
channel.start_consuming(to_tuple=False)
RabbitMQ에 메시지를 게시하는 방법
라이브러리 가져 오기부터 시작하십시오.
from amqpstorm import Connection
from amqpstorm import Message
다음으로 RabbitMQ 서버에 대한 연결을 열어야합니다.
connection = Connection('127.0.0.1', 'guest', 'guest')
그 다음에는 채널을 설정해야합니다. 각 연결은 다중 채널을 가질 수 있으며 일반적으로 멀티 스레드 작업을 수행 할 때 스레드 당 하나씩 있어야하는 것이 좋습니다 (필수는 아님).
channel = connection.channel()
채널을 설정하고 나면 메시지를 준비 할 수 있습니다.
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)
이제 우리는 단순히 호출하여 메시지를 게시 할 수 publish
및 제공 routing_key
. 이 경우 메시지를 simple_queue
라는 대기열로 simple_queue
.
message.publish(routing_key='simple_queue')
RabbitMQ에서 지연된 큐를 생성하는 방법
먼저 메인 큐를위한 것과 지연 큐를위한 두 개의 기본 채널을 설정해야합니다. 마지막 예제에서는 필자는 필요없는 플래그를 두 개 추가했지만 코드를보다 안정적으로 만듭니다. confirm delivery
, delivery_mode
및 durable
. RabbitMQ 설명서 에서 이에 대한 자세한 정보를 찾을 수 있습니다.
채널을 설정 한 후에는 지연 채널에서 메인 대기열로 메시지를 보내는 데 사용할 수있는 기본 채널에 바인딩을 추가합니다.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
다음으로 만료 된 메시지를 기본 대기열로 전달하도록 지연 채널을 구성해야합니다.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
x-message-ttl (메시지 - Time To Live)
이것은 일반적으로 특정 기간 후에 대기열에있는 이전 메시지를 자동으로 제거하는 데 사용되지만 두 개의 선택적 인수를 추가하여이 동작을 변경할 수 있으며이 매개 변수를 사용하여 메시지가 지연 대기열에 머무르는 시간 (밀리 초)을 결정할 수 있습니다.
이 변수를 사용하면 메시지를 완전히 제거하는 기본 동작 대신 만료 된 메시지를 다른 대기열로 전송할 수 있습니다.
이 변수는 hello_delay에서 hello 대기열로 메시지를 전송하는 데 사용 된 Exchange를 결정합니다.
지연 대기열에 게시
기본 Pika 매개 변수를 모두 설정했으면 기본 게시를 사용하여 지연 대기열로 메시지를 보냅니다.
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
스크립트를 실행하면 RabbitMQ 관리 모듈에 다음 대기열이 만들어집니다.
예.
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] Sent")