Szukaj…


Uwagi

Najnowsza wersja AMQPStorm jest dostępna w pypi lub można ją zainstalować za pomocą pip

pip install amqpstorm

Jak korzystać z wiadomości z RabbitMQ

Zacznij od importowania biblioteki.

from amqpstorm import Connection

Podczas korzystania z wiadomości musimy najpierw zdefiniować funkcję do obsługi wiadomości przychodzących. Może to być dowolna funkcja wywoływana i musi ona pobierać obiekt komunikatu lub krotkę komunikatu (w zależności od parametru to_tuple zdefiniowanego w to_tuple start_consuming ).

Oprócz przetwarzania danych z przychodzącej wiadomości będziemy również musieli potwierdzić lub odrzucić wiadomość. Jest to ważne, ponieważ musimy poinformować RabbitMQ, że poprawnie otrzymaliśmy i przetworzyliśmy wiadomość.

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

Następnie musimy skonfigurować połączenie z serwerem RabbitMQ.

connection = Connection('127.0.0.1', 'guest', 'guest')

Następnie musimy skonfigurować kanał. Każde połączenie może mieć wiele kanałów i ogólnie przy wykonywaniu zadań wielowątkowych zaleca się (ale nie jest to wymagane), aby mieć jeden na wątek.

channel = connection.channel()

Po skonfigurowaniu naszego kanału musimy poinformować RabbitMQ, że chcemy zacząć odbierać wiadomości. W takim przypadku użyjemy naszej wcześniej zdefiniowanej funkcji on_message do obsługi wszystkich zużytych wiadomości.

Kolejka, której będziemy słuchać na serwerze simple_queue , będzie simple_queue , a my mówimy również simple_queue , że będziemy potwierdzać wszystkie przychodzące wiadomości, gdy skończymy z nimi.

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

Wreszcie musimy uruchomić pętlę we / wy, aby rozpocząć przetwarzanie komunikatów dostarczanych przez serwer RabbitMQ.

channel.start_consuming(to_tuple=False)

Jak publikować wiadomości w RabbitMQ

Zacznij od importowania biblioteki.

from amqpstorm import Connection
from amqpstorm import Message

Następnie musimy otworzyć połączenie z serwerem RabbitMQ.

connection = Connection('127.0.0.1', 'guest', 'guest')

Następnie musimy skonfigurować kanał. Każde połączenie może mieć wiele kanałów i ogólnie przy wykonywaniu zadań wielowątkowych zaleca się (ale nie jest to wymagane), aby mieć jeden na wątek.

channel = connection.channel()

Po skonfigurowaniu naszego kanału możemy zacząć przygotowywać naszą wiadomość.

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

Teraz możemy opublikować wiadomość, po prostu wywołując publish i podając routing_key . W takim przypadku wyślemy wiadomość do kolejki o nazwie simple_queue .

message.publish(routing_key='simple_queue')

Jak utworzyć opóźnioną kolejkę w RabbitMQ

Najpierw musimy skonfigurować dwa podstawowe kanały, jeden dla kolejki głównej i jeden dla kolejki opóźniającej. W moim przykładzie na końcu dołączam kilka dodatkowych flag, które nie są wymagane, ale zwiększają niezawodność kodu; takie jak confirm delivery , delivery_mode i durable . Więcej informacji na ten temat można znaleźć w instrukcji RabbitMQ.

Po skonfigurowaniu kanałów dodajemy do głównego kanału powiązanie, którego możemy używać do wysyłania wiadomości z kanału opóźniającego do naszej głównej kolejki.

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

Następnie musimy skonfigurować nasz kanał opóźnień, aby przekazywał komunikaty do głównej kolejki po ich wygaśnięciu.

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (Wiadomość - Czas życia)

    Zwykle jest to używane do automatycznego usuwania starych wiadomości w kolejce po określonym czasie, ale dodając dwa opcjonalne argumenty, możemy zmienić to zachowanie i zamiast tego ten parametr określa w milisekundach, jak długo wiadomości pozostaną w kolejce opóźniającej.

  • x-dead-letter-routing-key

    Ta zmienna pozwala nam przenieść wiadomość do innej kolejki po jej wygaśnięciu, zamiast domyślnego zachowania całkowitego jej usunięcia.

  • x-dead-letter-exchange

    Ta zmienna określa, której Exchange użył do przesłania wiadomości z hello_delay do hello queue.

Publikowanie w kolejce opóźniającej

Kiedy skończymy konfigurować wszystkie podstawowe parametry Pika, po prostu wyślesz wiadomość do kolejki opóźnień za pomocą podstawowej publikacji.

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

Po wykonaniu skryptu powinieneś zobaczyć następujące kolejki utworzone w module zarządzania RabbitMQ. wprowadź opis zdjęcia tutaj

Przykład.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow