Python Language
Introducción a RabbitMQ utilizando AMQPStorm
Buscar..
Observaciones
La última versión de AMQPStorm está disponible en pypi o puede instalarla usando pip
pip install amqpstorm
Cómo consumir mensajes de RabbitMQ
Comience con la importación de la biblioteca.
from amqpstorm import Connection
Al consumir mensajes, primero debemos definir una función para manejar los mensajes entrantes. Esta puede ser cualquier función que se pueda llamar y tiene que tomar un objeto de mensaje o una tupla de mensaje (según el parámetro to_tuple
definido en start_consuming
).
Además de procesar los datos del mensaje entrante, también tendremos que Reconocer o Rechazar el mensaje. Esto es importante, ya que necesitamos informar a RabbitMQ que recibimos y procesamos el mensaje correctamente.
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
:return:
"""
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
message.ack()
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
A continuación, debemos configurar la conexión al servidor RabbitMQ.
connection = Connection('127.0.0.1', 'guest', 'guest')
Después de eso tenemos que configurar un canal. Cada conexión puede tener múltiples canales y, en general, al realizar tareas de subprocesos múltiples, se recomienda (pero no es obligatorio) tener uno por subproceso.
channel = connection.channel()
Una vez que tengamos configurado nuestro canal, debemos informarle a RabbitMQ que queremos comenzar a consumir mensajes. En este caso, usaremos nuestra función on_message
previamente definida para manejar todos nuestros mensajes consumidos.
La cola que escucharemos en el servidor RabbitMQ será simple_queue
, y también le estamos diciendo a RabbitMQ que reconoceremos todos los mensajes entrantes una vez que hayamos terminado con ellos.
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
Finalmente, necesitamos iniciar el bucle IO para comenzar a procesar los mensajes entregados por el servidor RabbitMQ.
channel.start_consuming(to_tuple=False)
Cómo publicar mensajes a RabbitMQ
Comience con la importación de la biblioteca.
from amqpstorm import Connection
from amqpstorm import Message
Luego necesitamos abrir una conexión al servidor RabbitMQ.
connection = Connection('127.0.0.1', 'guest', 'guest')
Después de eso tenemos que configurar un canal. Cada conexión puede tener múltiples canales y, en general, al realizar tareas de subprocesos múltiples, se recomienda (pero no es obligatorio) tener uno por subproceso.
channel = connection.channel()
Una vez que tengamos configurado nuestro canal, podemos comenzar a preparar nuestro mensaje.
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)
Ahora podemos publicar el mensaje simplemente llamando a publish
y proporcionando una routing_key
. En este caso, vamos a enviar el mensaje a una cola llamada simple_queue
.
message.publish(routing_key='simple_queue')
Cómo crear una cola retrasada en RabbitMQ
Primero debemos configurar dos canales básicos, uno para la cola principal y otro para la cola de demora. En mi ejemplo al final, incluyo un par de marcas adicionales que no son necesarias, pero hacen que el código sea más confiable; tales como confirm delivery
, delivery_mode
y durable
. Puede encontrar más información sobre estos en el manual de RabbitMQ.
Después de configurar los canales, agregamos un enlace al canal principal que podemos utilizar para enviar mensajes desde el canal de retardo a nuestra cola principal.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
A continuación, debemos configurar nuestro canal de retardo para reenviar los mensajes a la cola principal una vez que hayan caducado.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
x-message-ttl (Mensaje - Time To Live)
Normalmente se usa para eliminar automáticamente los mensajes antiguos en la cola después de una duración específica, pero al agregar dos argumentos opcionales podemos cambiar este comportamiento y, en cambio, hacer que este parámetro determine en milisegundos el tiempo que los mensajes permanecerán en la cola de demora.
Esta variable nos permite transferir el mensaje a una cola diferente una vez que han caducado, en lugar del comportamiento predeterminado de eliminarlo por completo.
Esta variable determina qué Exchange usó para transferir el mensaje de hello_delay a hello queue.
Publicación en la cola de retardo
Cuando hayamos terminado de configurar todos los parámetros básicos de Pika, simplemente envíe un mensaje a la cola de demora utilizando la publicación básica.
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
Una vez que haya ejecutado el script, debería ver las siguientes colas creadas en su módulo de administración RabbitMQ.
Ejemplo.
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] Sent")