Python Language
AMQPStormを使用したRabbitMQの紹介
サーチ…
備考
RabbitMQからメッセージを消費する方法
まず、ライブラリのインポートを開始します。
from amqpstorm import Connection
メッセージを消費するときは、最初に受信メッセージを処理する関数を定義する必要があります。これは任意の呼び出し可能な関数とすることができ、メッセージオブジェクト、またはメッセージタプル( to_tuple
定義されたto_tuple
パラメータに応じて)をstart_consuming
ん。
受信メッセージからデータを処理する他に、メッセージを肯定応答または拒否する必要があります。 RabbitMQにメッセージを正しく受信し処理したことを知らせる必要があるので、これは重要です。
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
:return:
"""
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
message.ack()
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
次に、RabbitMQサーバーへの接続を設定する必要があります。
connection = Connection('127.0.0.1', 'guest', 'guest')
その後、チャンネルを設定する必要があります。各接続は複数のチャネルを持つことができます。一般に、マルチスレッドのタスクを実行する場合は、スレッドごとに1つのスレッドを持つことをお勧めします(必須ではありません)。
channel = connection.channel()
チャンネルを設定したら、RabbitMQにメッセージの消費を開始させておく必要があります。この場合は、以前定義したon_message
関数を使用して、すべての消費メッセージを処理します。
RabbitMQサーバーでsimple_queue
ているsimple_queue
になります。また、RabbitMQでは、受信したすべてのメッセージが完了したらそれを確認するように指示しています。
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
最後に、RabbitMQサーバによって配信されたメッセージの処理を開始するためにIOループを開始する必要があります。
channel.start_consuming(to_tuple=False)
RabbitMQにメッセージを公開する方法
まず、ライブラリのインポートを開始します。
from amqpstorm import Connection
from amqpstorm import Message
次に、RabbitMQサーバーへの接続を開く必要があります。
connection = Connection('127.0.0.1', 'guest', 'guest')
その後、チャンネルを設定する必要があります。各接続は複数のチャネルを持つことができます。一般に、マルチスレッドのタスクを実行する場合は、スレッドごとに1つのスレッドを持つことをお勧めします(必須ではありません)。
channel = connection.channel()
私たちのチャンネルを設定したら、メッセージを準備することができます。
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)
これで、単にpublish
を呼び出してrouting_key
publish
だけでメッセージを公開できます。この場合、メッセージをsimple_queue
というキューに送信します。
message.publish(routing_key='simple_queue')
RabbitMQで遅延キューを作成する方法
まず、メイン・キュー用と遅延キュー用の2つの基本チャネルを設定する必要があります。最後の例では、不要なフラグをいくつか追加していますが、コードの信頼性は向上しています。以下のようなconfirm delivery
、 delivery_mode
とdurable
。これらの詳細は、RabbitMQ マニュアルを参照してください。
チャンネルを設定したら、遅延チャンネルからメインキューにメッセージを送信するために使用できるメインチャンネルにバインディングを追加します。
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
次に、期限切れのメッセージをメインキューに転送するように遅延チャネルを設定する必要があります。
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
x-message-ttl (メッセージ - Time To Live)
これは、通常、特定の時間の経過後にキュー内の古いメッセージを自動的に削除するために使用されますが、2つのオプションの引数を追加することで、この動作を変更し、遅延キューのメッセージの長さをミリ秒単位で判断できます。
この変数を使用すると、メッセージを完全に削除するデフォルトの動作ではなく、期限切れになったメッセージを別のキューに転送できます。
この変数は、メッセージをhello_delayからhelloキューに転送するために使用されたExchangeを決定します。
遅延キューに公開する
すべての基本的なPikaパラメータを設定したら、基本的なパブリッシュを使って遅延キューにメッセージを送信するだけです。
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
スクリプトを実行すると、RabbitMQ管理モジュールで次のキューが作成されます。
例。
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] Sent")