サーチ…


備考

最新バージョンのAMQPStormpypiで入手可能ですまたは、 pipを使用してインストールすることもできます

pip install amqpstorm

RabbitMQからメッセージを消費する方法

まず、ライブラリのインポートを開始します。

from amqpstorm import Connection

メッセージを消費するときは、最初に受信メッセージを処理する関数を定義する必要があります。これは任意の呼び出し可能な関数とすることができ、メッセージオブジェクト、またはメッセージタプル( to_tuple定義されたto_tupleパラメータに応じて)をstart_consumingん。

受信メッセージからデータを処理する他に、メッセージを肯定応答または拒否する必要があります。 RabbitMQにメッセージを正しく受信し処理したことを知らせる必要があるので、これは重要です。

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

次に、RabbitMQサーバーへの接続を設定する必要があります。

connection = Connection('127.0.0.1', 'guest', 'guest')

その後、チャンネルを設定する必要があります。各接続は複数のチャネルを持つことができます。一般に、マルチスレッドのタスクを実行する場合は、スレッドごとに1つのスレッドを持つことをお勧めします(必須ではありません)。

channel = connection.channel()

チャンネルを設定したら、RabbitMQにメッセージの消費を開始させておく必要があります。この場合は、以前定義したon_message関数を使用して、すべての消費メッセージを処理します。

RabbitMQサーバーでsimple_queueているsimple_queueになります。また、RabbitMQでは、受信したすべてのメッセージが完了したらそれを確認するように指示しています。

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

最後に、RabbitMQサーバによって配信されたメッセージの処理を開始するためにIOループを開始する必要があります。

channel.start_consuming(to_tuple=False)

RabbitMQにメッセージを公開する方法

まず、ライブラリのインポートを開始します。

from amqpstorm import Connection
from amqpstorm import Message

次に、RabbitMQサーバーへの接続を開く必要があります。

connection = Connection('127.0.0.1', 'guest', 'guest')

その後、チャンネルを設定する必要があります。各接続は複数のチャネルを持つことができます。一般に、マルチスレッドのタスクを実行する場合は、スレッドごとに1つのスレッドを持つことをお勧めします(必須ではありません)。

channel = connection.channel()

私たちのチャンネルを設定したら、メッセージを準備することができます。

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

これで、単にpublishを呼び出してrouting_key publishだけでメッセージを公開できます。この場合、メッセージをsimple_queueというキューに送信します。

message.publish(routing_key='simple_queue')

RabbitMQで遅延キューを作成する方法

まず、メイン・キュー用と遅延キュー用の2つの基本チャネルを設定する必要があります。最後の例では、不要なフラグをいくつか追加していますが、コードの信頼性は向上しています。以下のようなconfirm deliverydelivery_modedurable 。これらの詳細は、RabbitMQ マニュアルを参照してください。

チャンネルを設定したら、遅延チャンネルからメインキューにメッセージを送信するために使用できるメインチャンネルにバインディングを追加します。

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

次に、期限切れのメッセージをメインキューに転送するように遅延チャネルを設定する必要があります。

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (メッセージ - Time To Live)

    これは、通常、特定の時間の経過後にキュー内の古いメッセージを自動的に削除するために使用されますが、2つのオプションの引数を追加することで、この動作を変更し、遅延キューのメッセージの長さをミリ秒単位で判断できます。

  • x-dead-letter-routing-key

    この変数を使用すると、メッセージを完全に削除するデフォルトの動作ではなく、期限切れになったメッセージを別のキューに転送できます。

  • xデッドレター交換

    この変数は、メッセージをhello_delayからhelloキューに転送するために使用されたExchangeを決定します。

遅延キューに公開する

すべての基本的なPikaパラメータを設定したら、基本的なパブリッシュを使って遅延キューにメッセージを送信するだけです。

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

スクリプトを実行すると、RabbitMQ管理モジュールで次のキューが作成されます。 ここに画像の説明を入力

例。

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")


Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow