Python Language
Wprowadzenie do RabbitMQ za pomocą AMQPStorm
Szukaj…
Uwagi
Najnowsza wersja AMQPStorm jest dostępna w pypi lub można ją zainstalować za pomocą pip
pip install amqpstorm
Jak korzystać z wiadomości z RabbitMQ
Zacznij od importowania biblioteki.
from amqpstorm import Connection
Podczas korzystania z wiadomości musimy najpierw zdefiniować funkcję do obsługi wiadomości przychodzących. Może to być dowolna funkcja wywoływana i musi ona pobierać obiekt komunikatu lub krotkę komunikatu (w zależności od parametru to_tuple
zdefiniowanego w to_tuple
start_consuming
).
Oprócz przetwarzania danych z przychodzącej wiadomości będziemy również musieli potwierdzić lub odrzucić wiadomość. Jest to ważne, ponieważ musimy poinformować RabbitMQ, że poprawnie otrzymaliśmy i przetworzyliśmy wiadomość.
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
:return:
"""
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
message.ack()
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
Następnie musimy skonfigurować połączenie z serwerem RabbitMQ.
connection = Connection('127.0.0.1', 'guest', 'guest')
Następnie musimy skonfigurować kanał. Każde połączenie może mieć wiele kanałów i ogólnie przy wykonywaniu zadań wielowątkowych zaleca się (ale nie jest to wymagane), aby mieć jeden na wątek.
channel = connection.channel()
Po skonfigurowaniu naszego kanału musimy poinformować RabbitMQ, że chcemy zacząć odbierać wiadomości. W takim przypadku użyjemy naszej wcześniej zdefiniowanej funkcji on_message
do obsługi wszystkich zużytych wiadomości.
Kolejka, której będziemy słuchać na serwerze simple_queue
, będzie simple_queue
, a my mówimy również simple_queue
, że będziemy potwierdzać wszystkie przychodzące wiadomości, gdy skończymy z nimi.
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
Wreszcie musimy uruchomić pętlę we / wy, aby rozpocząć przetwarzanie komunikatów dostarczanych przez serwer RabbitMQ.
channel.start_consuming(to_tuple=False)
Jak publikować wiadomości w RabbitMQ
Zacznij od importowania biblioteki.
from amqpstorm import Connection
from amqpstorm import Message
Następnie musimy otworzyć połączenie z serwerem RabbitMQ.
connection = Connection('127.0.0.1', 'guest', 'guest')
Następnie musimy skonfigurować kanał. Każde połączenie może mieć wiele kanałów i ogólnie przy wykonywaniu zadań wielowątkowych zaleca się (ale nie jest to wymagane), aby mieć jeden na wątek.
channel = connection.channel()
Po skonfigurowaniu naszego kanału możemy zacząć przygotowywać naszą wiadomość.
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)
Teraz możemy opublikować wiadomość, po prostu wywołując publish
i podając routing_key
. W takim przypadku wyślemy wiadomość do kolejki o nazwie simple_queue
.
message.publish(routing_key='simple_queue')
Jak utworzyć opóźnioną kolejkę w RabbitMQ
Najpierw musimy skonfigurować dwa podstawowe kanały, jeden dla kolejki głównej i jeden dla kolejki opóźniającej. W moim przykładzie na końcu dołączam kilka dodatkowych flag, które nie są wymagane, ale zwiększają niezawodność kodu; takie jak confirm delivery
, delivery_mode
i durable
. Więcej informacji na ten temat można znaleźć w instrukcji RabbitMQ.
Po skonfigurowaniu kanałów dodajemy do głównego kanału powiązanie, którego możemy używać do wysyłania wiadomości z kanału opóźniającego do naszej głównej kolejki.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
Następnie musimy skonfigurować nasz kanał opóźnień, aby przekazywał komunikaty do głównej kolejki po ich wygaśnięciu.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
x-message-ttl (Wiadomość - Czas życia)
Zwykle jest to używane do automatycznego usuwania starych wiadomości w kolejce po określonym czasie, ale dodając dwa opcjonalne argumenty, możemy zmienić to zachowanie i zamiast tego ten parametr określa w milisekundach, jak długo wiadomości pozostaną w kolejce opóźniającej.
Ta zmienna pozwala nam przenieść wiadomość do innej kolejki po jej wygaśnięciu, zamiast domyślnego zachowania całkowitego jej usunięcia.
Ta zmienna określa, której Exchange użył do przesłania wiadomości z hello_delay do hello queue.
Publikowanie w kolejce opóźniającej
Kiedy skończymy konfigurować wszystkie podstawowe parametry Pika, po prostu wyślesz wiadomość do kolejki opóźnień za pomocą podstawowej publikacji.
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
Po wykonaniu skryptu powinieneś zobaczyć następujące kolejki utworzone w module zarządzania RabbitMQ.
Przykład.
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] Sent")