Sök…


Anmärkningar

Den senaste versionen av AMQPStorm finns på pypi eller så kan du installera den med pip

pip install amqpstorm

Hur man konsumerar meddelanden från RabbitMQ

Börja med att importera biblioteket.

from amqpstorm import Connection

När vi konsumerar meddelanden måste vi först definiera en funktion för att hantera de inkommande meddelandena. Detta kan vara valfri funktion som måste kallas och måste ta ett meddelandeobjekt eller en meddelandetupel (beroende på parametern to_tuple definierad i start_consuming ).

Förutom att hantera data från det inkommande meddelandet måste vi också bekräfta eller avvisa meddelandet. Detta är viktigt, eftersom vi måste låta RabbitMQ veta att vi korrekt har tagit emot och behandlat meddelandet.

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

Nästa måste vi konfigurera anslutningen till RabbitMQ-servern.

connection = Connection('127.0.0.1', 'guest', 'guest')

Efter det måste vi skapa en kanal. Varje anslutning kan ha flera kanaler, och i allmänhet när du utför flera trådade uppgifter rekommenderas det (men krävs inte) att ha en per tråd.

channel = connection.channel()

När vi har satt upp vår kanal måste vi låta RabbitMQ veta att vi vill börja konsumera meddelanden. I det här fallet kommer vi att använda vår tidigare definierade on_message funktion för att hantera alla våra förbrukade meddelanden.

Kön som vi kommer att lyssna på på RabbitMQ-servern kommer att bli simple_queue , och vi berättar också för RabbitMQ att vi kommer att erkänna alla inkommande meddelanden när vi är klar med dem.

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

Slutligen måste vi starta IO-slingan för att börja behandla meddelanden som levereras av RabbitMQ-servern.

channel.start_consuming(to_tuple=False)

Hur man publicerar meddelanden till RabbitMQ

Börja med att importera biblioteket.

from amqpstorm import Connection
from amqpstorm import Message

Nästa måste vi öppna en anslutning till RabbitMQ-servern.

connection = Connection('127.0.0.1', 'guest', 'guest')

Efter det måste vi skapa en kanal. Varje anslutning kan ha flera kanaler, och i allmänhet när du utför flera trådade uppgifter rekommenderas det (men krävs inte) att ha en per tråd.

channel = connection.channel()

När vi har satt upp vår kanal kan vi börja förbereda vårt meddelande.

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

Nu kan vi publicera meddelandet genom att helt enkelt ringa publish och tillhandahålla en routing_key . I det här fallet kommer vi att skicka meddelandet till en kö som heter simple_queue .

message.publish(routing_key='simple_queue')

Hur man skapar en försenad kö i RabbitMQ

Först måste vi ställa in två grundkanaler, en för huvudkön och en för fördröjningskön. I mitt exempel i slutet inkluderar jag ett par ytterligare flaggor som inte krävs, men gör koden mer tillförlitlig. såsom confirm delivery , delivery_mode och durable . Du kan hitta mer information om dessa i RabbitMQ- manualen .

Efter att vi har satt upp kanalerna lägger vi till en bindning till huvudkanalen som vi kan använda för att skicka meddelanden från fördröjningskanalen till vår huvudkö.

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

Nästa måste vi konfigurera vår fördröjningskanal för att vidarebefordra meddelanden till huvudkön när de har gått ut.

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (Meddelande - Time To Live)

    Detta används normalt för att automatiskt ta bort gamla meddelanden i kön efter en viss tid, men genom att lägga till två valfria argument kan vi ändra detta beteende och istället låta denna parameter bestämma i millisekunder hur länge meddelanden ska stanna i fördröjningskön.

  • X-dead-brev routing-nyckel

    Med denna variabel kan vi överföra meddelandet till en annan kö när de har gått ut, i stället för standardbeteendet för att ta bort det helt.

  • X-dead-letter-utbyte

    Denna variabel bestämmer vilken Exchange som används för att överföra meddelandet från hej_delay till hejkö.

Publicera till fördröjningskön

När vi är klar med att ställa in alla grundläggande Pika-parametrar skickar du helt enkelt ett meddelande till fördröjningskön med hjälp av basic public.

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

När du har kört skriptet ska du se följande köer skapade i din RabbitMQ-hanteringsmodul. ange bildbeskrivning här

Exempel.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow