수색…


비고

최신 버전의 AMQPStormpypi 에서 구할 수 있으며 pip를 사용하여 설치할 수도 있습니다.

pip install amqpstorm

RabbitMQ에서 메시지를 소비하는 방법

라이브러리 가져 오기부터 시작하십시오.

from amqpstorm import Connection

메시지를 소비 할 때 먼저 수신 메시지를 처리하는 함수를 정의해야합니다. 이는 호출 할 수있는 모든 함수가 될 수 있으며 메시지 객체 또는 메시지 튜플을 취해야합니다 ( start_consuming 정의 된 to_tuple 매개 변수에 따라 다름).

들어오는 메시지에서 데이터를 처리하는 것 외에도 메시지를 확인 또는 거부해야합니다. RabbitMQ가 메시지를 제대로 수신하고 처리했음을 RabbitMQ에 알려야하기 때문에 중요합니다.

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

다음으로 RabbitMQ 서버에 대한 연결을 설정해야합니다.

connection = Connection('127.0.0.1', 'guest', 'guest')

그 다음에는 채널을 설정해야합니다. 각 연결은 다중 채널을 가질 수 있으며 일반적으로 멀티 스레드 작업을 수행 할 때 스레드 당 하나씩 있어야하는 것이 좋습니다 (필수는 아님).

channel = connection.channel()

채널을 설정하고 나면 RabbitMQ가 메시지를 소비하기 시작했음을 알려야합니다. 이 경우 이전에 정의 된 on_message 함수를 사용하여 소비 된 모든 메시지를 처리합니다.

RabbitMQ 서버에서 수신 대기열은 simple_queue 가 될 simple_queue , RabbitMQ는 수신 메시지가 모두 처리되면이를 수신 확인 함을 알리고 있습니다.

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

마지막으로 RabbitMQ 서버가 전달한 메시지 처리를 시작하려면 IO 루프를 시작해야합니다.

channel.start_consuming(to_tuple=False)

RabbitMQ에 메시지를 게시하는 방법

라이브러리 가져 오기부터 시작하십시오.

from amqpstorm import Connection
from amqpstorm import Message

다음으로 RabbitMQ 서버에 대한 연결을 열어야합니다.

connection = Connection('127.0.0.1', 'guest', 'guest')

그 다음에는 채널을 설정해야합니다. 각 연결은 다중 채널을 가질 수 있으며 일반적으로 멀티 스레드 작업을 수행 할 때 스레드 당 하나씩 있어야하는 것이 좋습니다 (필수는 아님).

channel = connection.channel()

채널을 설정하고 나면 메시지를 준비 할 수 있습니다.

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

이제 우리는 단순히 호출하여 메시지를 게시 할 수 publish 및 제공 routing_key . 이 경우 메시지를 simple_queue 라는 대기열로 simple_queue .

message.publish(routing_key='simple_queue')

RabbitMQ에서 지연된 큐를 생성하는 방법

먼저 메인 큐를위한 것과 지연 큐를위한 두 개의 기본 채널을 설정해야합니다. 마지막 예제에서는 필자는 필요없는 플래그를 두 개 추가했지만 코드를보다 안정적으로 만듭니다. confirm delivery , delivery_modedurable . RabbitMQ 설명서 에서 이에 대한 자세한 정보를 찾을 수 있습니다.

채널을 설정 한 후에는 지연 채널에서 메인 대기열로 메시지를 보내는 데 사용할 수있는 기본 채널에 바인딩을 추가합니다.

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

다음으로 만료 된 메시지를 기본 대기열로 전달하도록 지연 채널을 구성해야합니다.

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (메시지 - Time To Live)

    이것은 일반적으로 특정 기간 후에 대기열에있는 이전 메시지를 자동으로 제거하는 데 사용되지만 두 개의 선택적 인수를 추가하여이 동작을 변경할 수 있으며이 매개 변수를 사용하여 메시지가 지연 대기열에 머무르는 시간 (밀리 초)을 결정할 수 있습니다.

  • x-dead-letter-routing-key

    이 변수를 사용하면 메시지를 완전히 제거하는 기본 동작 대신 만료 된 메시지를 다른 대기열로 전송할 수 있습니다.

  • 엑스 데드 레터 교환

    이 변수는 hello_delay에서 hello 대기열로 메시지를 전송하는 데 사용 된 Exchange를 결정합니다.

지연 대기열에 게시

기본 Pika 매개 변수를 모두 설정했으면 기본 게시를 사용하여 지연 대기열로 메시지를 보냅니다.

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

스크립트를 실행하면 RabbitMQ 관리 모듈에 다음 대기열이 만들어집니다. 여기에 이미지 설명을 입력하십시오.

예.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")


Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow