Python Language
Introduction à RabbitMQ en utilisant AMQPStorm
Recherche…
Remarques
La dernière version d' AMQPStorm est disponible sur pypi ou vous pouvez l'installer en utilisant pip
pip install amqpstorm
Comment consommer des messages de RabbitMQ
Commencez par importer la bibliothèque.
from amqpstorm import Connection
Lors de la consommation de messages, nous devons d'abord définir une fonction pour gérer les messages entrants. Cela peut être n'importe quelle fonction appelable et doit prendre un objet de message ou un tuple de message (en fonction du paramètre to_tuple
défini dans start_consuming
).
Outre le traitement des données du message entrant, nous devrons également accuser réception ou rejeter le message. Ceci est important, car nous devons informer RabbitMQ que nous avons correctement reçu et traité le message.
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
:return:
"""
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
message.ack()
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
Ensuite, nous devons configurer la connexion au serveur RabbitMQ.
connection = Connection('127.0.0.1', 'guest', 'guest')
Après cela, nous devons configurer un canal. Chaque connexion peut avoir plusieurs canaux et, en général, lors de l'exécution de tâches multithread, il est recommandé (mais pas obligatoire) d'en avoir une par thread.
channel = connection.channel()
Une fois notre chaîne configurée, nous devons informer RabbitMQ que nous souhaitons commencer à consommer des messages. Dans ce cas, nous utiliserons notre fonction on_message
précédemment définie pour gérer tous nos messages consommés.
La file d'attente que nous allons écouter sur le serveur RabbitMQ va être simple_queue
, et nous disons également à RabbitMQ que nous simple_queue
réception de tous les messages entrants une fois que nous en aurons fini.
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
Enfin, nous devons démarrer la boucle IO pour commencer le traitement des messages fournis par le serveur RabbitMQ.
channel.start_consuming(to_tuple=False)
Comment publier des messages sur RabbitMQ
Commencez par importer la bibliothèque.
from amqpstorm import Connection
from amqpstorm import Message
Ensuite, nous devons ouvrir une connexion au serveur RabbitMQ.
connection = Connection('127.0.0.1', 'guest', 'guest')
Après cela, nous devons configurer un canal. Chaque connexion peut avoir plusieurs canaux et, en général, lors de l'exécution de tâches multithread, il est recommandé (mais pas obligatoire) d'en avoir une par thread.
channel = connection.channel()
Une fois notre chaîne configurée, nous pouvons commencer à préparer notre message.
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)
Maintenant, nous pouvons publier le message en appelant simplement publish
et en fournissant une routing_key
. Dans ce cas, nous allons envoyer le message à une file d'attente appelée simple_queue
.
message.publish(routing_key='simple_queue')
Comment créer une file d'attente différée dans RabbitMQ
Nous devons d'abord configurer deux canaux de base, un pour la file d'attente principale et un pour la file d'attente des délais. Dans mon exemple à la fin, j'inclus quelques indicateurs supplémentaires qui ne sont pas requis, mais rend le code plus fiable; tels que confirm delivery
, delivery_mode
et durable
. Vous pouvez trouver plus d'informations à ce sujet dans le manuel RabbitMQ.
Après avoir configuré les canaux, nous ajoutons une liaison au canal principal que nous pouvons utiliser pour envoyer des messages du canal de délai à notre file d'attente principale.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
Ensuite, nous devons configurer notre canal de délai pour transmettre les messages à la file d'attente principale une fois qu'ils ont expiré.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
x-message-ttl (Message - Temps de vie)
Ceci est normalement utilisé pour supprimer automatiquement les anciens messages dans la file d'attente après une durée spécifique, mais en ajoutant deux arguments facultatifs, nous pouvons changer ce comportement et déterminer ce paramètre en millisecondes en combien de temps les messages resteront dans la file d'attente.
Cette variable nous permet de transférer le message vers une autre file d'attente une fois qu'ils ont expiré, au lieu du comportement par défaut de la supprimer complètement.
Cette variable détermine quel Exchange est utilisé pour transférer le message de hello_delay vers la file d'attente hello.
Publication dans la file d'attente des délais
Lorsque vous avez terminé de configurer tous les paramètres de base de Pika, vous envoyez simplement un message à la file d'attente de délai en utilisant la publication de base.
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
Une fois que vous avez exécuté le script, vous devriez voir les files d'attente suivantes créées dans votre module de gestion RabbitMQ.
Exemple.
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] Sent")