Suche…


Bemerkungen

Die neueste Version von AMQPStorm ist verfügbar unter pypi oder Sie können es installieren mit pip

pip install amqpstorm

So verwenden Sie Nachrichten von RabbitMQ

Beginnen Sie mit dem Importieren der Bibliothek.

from amqpstorm import Connection

Wenn Sie Nachrichten verwenden, müssen Sie zunächst eine Funktion definieren, um die eingehenden Nachrichten zu verarbeiten. Dies kann eine beliebige aufrufbare Funktion sein und muss ein Nachrichtenobjekt oder ein Nachrichtentupel (abhängig vom in to_tuple definierten start_consuming ) start_consuming .

Neben der Verarbeitung der Daten aus der eingehenden Nachricht müssen wir die Nachricht auch bestätigen oder ablehnen. Dies ist wichtig, da wir RabbitMQ mitteilen müssen, dass wir die Nachricht ordnungsgemäß empfangen und verarbeitet haben.

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

Als Nächstes müssen wir die Verbindung zum RabbitMQ-Server einrichten.

connection = Connection('127.0.0.1', 'guest', 'guest')

Danach müssen wir einen Kanal einrichten. Jede Verbindung kann mehrere Kanäle haben. Im Allgemeinen wird bei Multithread-Tasks empfohlen, einen pro Thread zu haben (dies ist jedoch nicht erforderlich).

channel = connection.channel()

Sobald wir unseren Kanal eingerichtet haben, müssen wir RabbitMQ mitteilen, dass wir damit beginnen möchten, Nachrichten zu verbrauchen. In diesem Fall verwenden wir unsere zuvor definierte Funktion on_message , um alle unsere verbrauchten Nachrichten zu verarbeiten.

Die Warteschlange, die wir auf dem RabbitMQ-Server simple_queue , wird simple_queue , und wir teilen RabbitMQ mit, dass wir alle eingehenden Nachrichten bestätigen werden, sobald wir damit fertig sind.

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

Schließlich müssen wir die E / A-Schleife starten, um die Verarbeitung der vom RabbitMQ-Server gelieferten Nachrichten zu starten.

channel.start_consuming(to_tuple=False)

So veröffentlichen Sie Nachrichten an RabbitMQ

Beginnen Sie mit dem Importieren der Bibliothek.

from amqpstorm import Connection
from amqpstorm import Message

Als nächstes müssen wir eine Verbindung zum RabbitMQ-Server herstellen.

connection = Connection('127.0.0.1', 'guest', 'guest')

Danach müssen wir einen Kanal einrichten. Jede Verbindung kann mehrere Kanäle haben. Im Allgemeinen wird bei Multithread-Tasks empfohlen, einen pro Thread zu haben (dies ist jedoch nicht erforderlich).

channel = connection.channel()

Sobald wir unseren Kanal eingerichtet haben, können wir mit der Vorbereitung unserer Nachricht beginnen.

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

Jetzt können wir die Nachricht publish indem wir einfach publish aufrufen und einen routing_key . In diesem Fall senden wir die Nachricht an eine Warteschlange mit dem Namen simple_queue .

message.publish(routing_key='simple_queue')

So erstellen Sie eine verzögerte Warteschlange in RabbitMQ

Zuerst müssen wir zwei Basiskanäle einrichten, einen für die Hauptwarteschlange und einen für die Verzögerungswarteschlange. In meinem Beispiel am Ende füge ich ein paar zusätzliche Flags hinzu, die nicht erforderlich sind, aber den Code zuverlässiger machen. wie confirm delivery , delivery_mode und durable . Weitere Informationen hierzu finden Sie im RabbitMQ- Handbuch .

Nachdem wir die Kanäle eingerichtet haben, fügen wir dem Hauptkanal eine Bindung hinzu, mit der wir Nachrichten vom Verzögerungskanal an unsere Hauptwarteschlange senden können.

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

Als Nächstes müssen wir unseren Verzögerungskanal so konfigurieren, dass Nachrichten nach Ablauf an die Hauptwarteschlange weitergeleitet werden.

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (Nachricht - Zeit zum Leben)

    Normalerweise wird dies zum automatischen Entfernen alter Nachrichten in der Warteschlange nach einer bestimmten Dauer verwendet. Durch Hinzufügen zweier optionaler Argumente können Sie dieses Verhalten ändern. Stattdessen lässt sich dieser Parameter in Millisekunden festlegen, wie lange Nachrichten in der Verzögerungswarteschlange verbleiben.

  • x-dead-letter-routing-key

    Diese Variable ermöglicht es uns, die Nachricht nach ihrem Ablauf in eine andere Warteschlange zu übertragen, anstatt sie standardmäßig zu entfernen.

  • x-dead-letter-exchange

    Diese Variable bestimmt, mit welcher Exchange die Nachricht von hello_delay in die Hello-Warteschlange übertragen wurde.

Veröffentlichung in der Verzögerungswarteschlange

Wenn Sie alle grundlegenden Pika-Parameter eingerichtet haben, senden Sie einfach eine Nachricht in die Verzögerungswarteschlange.

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

Nachdem Sie das Skript ausgeführt haben, sollten die folgenden Warteschlangen in Ihrem RabbitMQ-Verwaltungsmodul erstellt werden. Geben Sie hier die Bildbeschreibung ein

Beispiel.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow