Zoeken…


Opmerkingen

De nieuwste versie van AMQPStorm is beschikbaar op pypi of u kunt het installeren met behulp van pip

pip install amqpstorm

Hoe berichten van RabbitMQ te consumeren

Begin met het importeren van de bibliotheek.

from amqpstorm import Connection

Bij het consumeren van berichten moeten we eerst een functie definiëren om de inkomende berichten af te handelen. Dit kan elke opvraagbare functie zijn en moet een berichtobject of een berichttuple nemen (afhankelijk van de parameter to_tuple die is gedefinieerd in start_consuming ).

Naast het verwerken van de gegevens van het binnenkomende bericht, moeten we het bericht ook bevestigen of weigeren. Dit is belangrijk, omdat we RabbitMQ moeten laten weten dat we het bericht correct hebben ontvangen en verwerkt.

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

Vervolgens moeten we de verbinding met de RabbitMQ-server instellen.

connection = Connection('127.0.0.1', 'guest', 'guest')

Daarna moeten we een kanaal opzetten. Elke verbinding kan meerdere kanalen hebben en in het algemeen wordt het aanbevolen (maar niet vereist) om per thread meerdere taken uit te voeren.

channel = connection.channel()

Nadat we ons kanaal hebben ingesteld, moeten we RabbitMQ laten weten dat we berichten willen gaan gebruiken. In dit geval zullen we onze eerder gedefinieerde functie on_message gebruiken om al onze verbruikte berichten af te handelen.

De wachtrij waar we naar zullen luisteren op de RabbitMQ-server wordt simple_queue en we vertellen RabbitMQ ook dat we alle inkomende berichten zullen bevestigen zodra we klaar zijn.

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

Eindelijk moeten we de IO-lus starten om berichten te verwerken die worden afgeleverd door de RabbitMQ-server.

channel.start_consuming(to_tuple=False)

Hoe berichten te publiceren naar RabbitMQ

Begin met het importeren van de bibliotheek.

from amqpstorm import Connection
from amqpstorm import Message

Vervolgens moeten we een verbinding met de RabbitMQ-server openen.

connection = Connection('127.0.0.1', 'guest', 'guest')

Daarna moeten we een kanaal opzetten. Elke verbinding kan meerdere kanalen hebben en in het algemeen wordt het aanbevolen (maar niet vereist) om per thread meerdere taken uit te voeren.

channel = connection.channel()

Zodra we ons kanaal hebben ingesteld, kunnen we beginnen met het voorbereiden van onze boodschap.

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

Nu kunnen we het bericht publiceren door eenvoudigweg publish aan te roepen en een routing_key . In dit geval gaan we het bericht verzenden naar een wachtrij met de naam simple_queue .

message.publish(routing_key='simple_queue')

Een vertraagde wachtrij maken in RabbitMQ

Eerst moeten we twee basiskanalen instellen, een voor de hoofdwachtrij en een voor de vertragingswachtrij. In mijn voorbeeld op het einde voeg ik een aantal extra vlaggen toe die niet vereist zijn, maar die de code betrouwbaarder maken; zoals confirm delivery , delivery_mode en durable . Meer informatie hierover vindt u in de RabbitMQ- handleiding .

Nadat we de kanalen hebben ingesteld, voegen we een binding toe aan het hoofdkanaal dat we kunnen gebruiken om berichten van het vertragingskanaal naar onze hoofdwachtrij te verzenden.

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

Vervolgens moeten we ons vertragingskanaal configureren om berichten door te sturen naar de hoofdwachtrij zodra ze zijn verlopen.

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (Message - Time To Live)

    Dit wordt normaal gebruikt om na een bepaalde duur automatisch oude berichten in de wachtrij te verwijderen, maar door twee optionele argumenten toe te voegen kunnen we dit gedrag wijzigen en in plaats daarvan laten we in milliseconden bepalen hoe lang berichten in de vertragingswachtrij blijven.

  • x-dead-letter-routing-key

    Met deze variabele kunnen we het bericht overbrengen naar een andere wachtrij zodra ze zijn verlopen, in plaats van het standaardgedrag om het volledig te verwijderen.

  • x-dead-letter-uitwisseling

    Deze variabele bepaalt welke Exchange het bericht van hello_delay naar hallo wachtrij heeft overgebracht.

Publiceren naar de vertragingswachtrij

Wanneer we klaar zijn met het instellen van alle basis Pika-parameters, stuurt u eenvoudig een bericht naar de vertragingswachtrij met behulp van basispublicatie.

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

Nadat u het script hebt uitgevoerd, ziet u de volgende wachtrijen die zijn gemaakt in uw RabbitMQ-beheermodule. voer hier de afbeeldingsbeschrijving in

Voorbeeld.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")


Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow