Recherche…


Remarques

La dernière version d' AMQPStorm est disponible sur pypi ou vous pouvez l'installer en utilisant pip

pip install amqpstorm

Comment consommer des messages de RabbitMQ

Commencez par importer la bibliothèque.

from amqpstorm import Connection

Lors de la consommation de messages, nous devons d'abord définir une fonction pour gérer les messages entrants. Cela peut être n'importe quelle fonction appelable et doit prendre un objet de message ou un tuple de message (en fonction du paramètre to_tuple défini dans start_consuming ).

Outre le traitement des données du message entrant, nous devrons également accuser réception ou rejeter le message. Ceci est important, car nous devons informer RabbitMQ que nous avons correctement reçu et traité le message.

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

Ensuite, nous devons configurer la connexion au serveur RabbitMQ.

connection = Connection('127.0.0.1', 'guest', 'guest')

Après cela, nous devons configurer un canal. Chaque connexion peut avoir plusieurs canaux et, en général, lors de l'exécution de tâches multithread, il est recommandé (mais pas obligatoire) d'en avoir une par thread.

channel = connection.channel()

Une fois notre chaîne configurée, nous devons informer RabbitMQ que nous souhaitons commencer à consommer des messages. Dans ce cas, nous utiliserons notre fonction on_message précédemment définie pour gérer tous nos messages consommés.

La file d'attente que nous allons écouter sur le serveur RabbitMQ va être simple_queue , et nous disons également à RabbitMQ que nous simple_queue réception de tous les messages entrants une fois que nous en aurons fini.

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

Enfin, nous devons démarrer la boucle IO pour commencer le traitement des messages fournis par le serveur RabbitMQ.

channel.start_consuming(to_tuple=False)

Comment publier des messages sur RabbitMQ

Commencez par importer la bibliothèque.

from amqpstorm import Connection
from amqpstorm import Message

Ensuite, nous devons ouvrir une connexion au serveur RabbitMQ.

connection = Connection('127.0.0.1', 'guest', 'guest')

Après cela, nous devons configurer un canal. Chaque connexion peut avoir plusieurs canaux et, en général, lors de l'exécution de tâches multithread, il est recommandé (mais pas obligatoire) d'en avoir une par thread.

channel = connection.channel()

Une fois notre chaîne configurée, nous pouvons commencer à préparer notre message.

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

Maintenant, nous pouvons publier le message en appelant simplement publish et en fournissant une routing_key . Dans ce cas, nous allons envoyer le message à une file d'attente appelée simple_queue .

message.publish(routing_key='simple_queue')

Comment créer une file d'attente différée dans RabbitMQ

Nous devons d'abord configurer deux canaux de base, un pour la file d'attente principale et un pour la file d'attente des délais. Dans mon exemple à la fin, j'inclus quelques indicateurs supplémentaires qui ne sont pas requis, mais rend le code plus fiable; tels que confirm delivery , delivery_mode et durable . Vous pouvez trouver plus d'informations à ce sujet dans le manuel RabbitMQ.

Après avoir configuré les canaux, nous ajoutons une liaison au canal principal que nous pouvons utiliser pour envoyer des messages du canal de délai à notre file d'attente principale.

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

Ensuite, nous devons configurer notre canal de délai pour transmettre les messages à la file d'attente principale une fois qu'ils ont expiré.

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (Message - Temps de vie)

    Ceci est normalement utilisé pour supprimer automatiquement les anciens messages dans la file d'attente après une durée spécifique, mais en ajoutant deux arguments facultatifs, nous pouvons changer ce comportement et déterminer ce paramètre en millisecondes en combien de temps les messages resteront dans la file d'attente.

  • x-dead-letter-routing-key

    Cette variable nous permet de transférer le message vers une autre file d'attente une fois qu'ils ont expiré, au lieu du comportement par défaut de la supprimer complètement.

  • échange de lettres mortes

    Cette variable détermine quel Exchange est utilisé pour transférer le message de hello_delay vers la file d'attente hello.

Publication dans la file d'attente des délais

Lorsque vous avez terminé de configurer tous les paramètres de base de Pika, vous envoyez simplement un message à la file d'attente de délai en utilisant la publication de base.

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

Une fois que vous avez exécuté le script, vous devriez voir les files d'attente suivantes créées dans votre module de gestion RabbitMQ. entrer la description de l'image ici

Exemple.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")


Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow