Python Language
Introduktion till RabbitMQ med AMQPStorm
Sök…
Anmärkningar
Den senaste versionen av AMQPStorm finns på pypi eller så kan du installera den med pip
pip install amqpstorm
Hur man konsumerar meddelanden från RabbitMQ
Börja med att importera biblioteket.
from amqpstorm import Connection
När vi konsumerar meddelanden måste vi först definiera en funktion för att hantera de inkommande meddelandena. Detta kan vara valfri funktion som måste kallas och måste ta ett meddelandeobjekt eller en meddelandetupel (beroende på parametern to_tuple
definierad i start_consuming
).
Förutom att hantera data från det inkommande meddelandet måste vi också bekräfta eller avvisa meddelandet. Detta är viktigt, eftersom vi måste låta RabbitMQ veta att vi korrekt har tagit emot och behandlat meddelandet.
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
:return:
"""
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
message.ack()
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
Nästa måste vi konfigurera anslutningen till RabbitMQ-servern.
connection = Connection('127.0.0.1', 'guest', 'guest')
Efter det måste vi skapa en kanal. Varje anslutning kan ha flera kanaler, och i allmänhet när du utför flera trådade uppgifter rekommenderas det (men krävs inte) att ha en per tråd.
channel = connection.channel()
När vi har satt upp vår kanal måste vi låta RabbitMQ veta att vi vill börja konsumera meddelanden. I det här fallet kommer vi att använda vår tidigare definierade on_message
funktion för att hantera alla våra förbrukade meddelanden.
Kön som vi kommer att lyssna på på RabbitMQ-servern kommer att bli simple_queue
, och vi berättar också för RabbitMQ att vi kommer att erkänna alla inkommande meddelanden när vi är klar med dem.
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
Slutligen måste vi starta IO-slingan för att börja behandla meddelanden som levereras av RabbitMQ-servern.
channel.start_consuming(to_tuple=False)
Hur man publicerar meddelanden till RabbitMQ
Börja med att importera biblioteket.
from amqpstorm import Connection
from amqpstorm import Message
Nästa måste vi öppna en anslutning till RabbitMQ-servern.
connection = Connection('127.0.0.1', 'guest', 'guest')
Efter det måste vi skapa en kanal. Varje anslutning kan ha flera kanaler, och i allmänhet när du utför flera trådade uppgifter rekommenderas det (men krävs inte) att ha en per tråd.
channel = connection.channel()
När vi har satt upp vår kanal kan vi börja förbereda vårt meddelande.
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)
Nu kan vi publicera meddelandet genom att helt enkelt ringa publish
och tillhandahålla en routing_key
. I det här fallet kommer vi att skicka meddelandet till en kö som heter simple_queue
.
message.publish(routing_key='simple_queue')
Hur man skapar en försenad kö i RabbitMQ
Först måste vi ställa in två grundkanaler, en för huvudkön och en för fördröjningskön. I mitt exempel i slutet inkluderar jag ett par ytterligare flaggor som inte krävs, men gör koden mer tillförlitlig. såsom confirm delivery
, delivery_mode
och durable
. Du kan hitta mer information om dessa i RabbitMQ- manualen .
Efter att vi har satt upp kanalerna lägger vi till en bindning till huvudkanalen som vi kan använda för att skicka meddelanden från fördröjningskanalen till vår huvudkö.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
Nästa måste vi konfigurera vår fördröjningskanal för att vidarebefordra meddelanden till huvudkön när de har gått ut.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
x-message-ttl (Meddelande - Time To Live)
Detta används normalt för att automatiskt ta bort gamla meddelanden i kön efter en viss tid, men genom att lägga till två valfria argument kan vi ändra detta beteende och istället låta denna parameter bestämma i millisekunder hur länge meddelanden ska stanna i fördröjningskön.
Med denna variabel kan vi överföra meddelandet till en annan kö när de har gått ut, i stället för standardbeteendet för att ta bort det helt.
Denna variabel bestämmer vilken Exchange som används för att överföra meddelandet från hej_delay till hejkö.
Publicera till fördröjningskön
När vi är klar med att ställa in alla grundläggande Pika-parametrar skickar du helt enkelt ett meddelande till fördröjningskön med hjälp av basic public.
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
När du har kört skriptet ska du se följande köer skapade i din RabbitMQ-hanteringsmodul.
Exempel.
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] Sent")