Ricerca…


Osservazioni

L'ultima versione di AMQPStorm è disponibile su pypi oppure è possibile installarla tramite pip

pip install amqpstorm

Come consumare messaggi da RabbitMQ

Inizia con l'importazione della libreria.

from amqpstorm import Connection

Quando si consumano messaggi, dobbiamo prima definire una funzione per gestire i messaggi in arrivo. Questa può essere una funzione chiamabile e deve prendere un oggetto messaggio o una tupla di messaggi (in base al parametro to_tuple definito in start_consuming ).

Oltre all'elaborazione dei dati dal messaggio in arrivo, dovremo anche riconoscere o rifiutare il messaggio. Questo è importante, poiché è necessario far sapere a RabbitMQ che abbiamo ricevuto e elaborato correttamente il messaggio.

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

Quindi dobbiamo impostare la connessione al server RabbitMQ.

connection = Connection('127.0.0.1', 'guest', 'guest')

Dopodiché abbiamo bisogno di creare un canale. Ogni connessione può avere più canali e, in generale, quando si eseguono attività multi-thread, è consigliabile (ma non obbligatorio) avere uno per thread.

channel = connection.channel()

Una volta configurato il nostro canale, dobbiamo far sapere a RabbitMQ che vogliamo iniziare a consumare messaggi. In questo caso useremo la nostra funzione on_message precedentemente definita per gestire tutti i nostri messaggi consumati.

La coda che ascolteremo sul server RabbitMQ sarà simple_queue , e stiamo anche dicendo a RabbitMQ che avremo riconosciuto tutti i messaggi in arrivo una volta che avremo finito con loro.

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

Infine, è necessario avviare il loop IO per avviare l'elaborazione dei messaggi consegnati dal server RabbitMQ.

channel.start_consuming(to_tuple=False)

Come pubblicare messaggi su RabbitMQ

Inizia con l'importazione della libreria.

from amqpstorm import Connection
from amqpstorm import Message

Quindi è necessario aprire una connessione al server RabbitMQ.

connection = Connection('127.0.0.1', 'guest', 'guest')

Dopodiché abbiamo bisogno di creare un canale. Ogni connessione può avere più canali e, in generale, quando si eseguono attività multi-thread, è consigliabile (ma non obbligatorio) avere uno per thread.

channel = connection.channel()

Una volta impostato il nostro canale, possiamo iniziare a preparare il nostro messaggio.

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

Ora possiamo pubblicare il messaggio semplicemente chiamando la publish e fornendo un routing_key . In questo caso, invieremo il messaggio a una coda chiamata simple_queue .

message.publish(routing_key='simple_queue')

Come creare una coda in ritardo in RabbitMQ

Per prima cosa dobbiamo impostare due canali di base, uno per la coda principale e uno per la coda di ritardo. Nel mio esempio alla fine, includo un paio di flag aggiuntivi che non sono richiesti, ma rendono il codice più affidabile; come confirm delivery , delivery_mode e durable . Puoi trovare maggiori informazioni su questi nel manuale RabbitMQ.

Dopo aver impostato i canali, aggiungiamo un'associazione al canale principale che possiamo usare per inviare messaggi dal canale di ritardo alla nostra coda principale.

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

Successivamente dobbiamo configurare il nostro canale di ritardo per inoltrare i messaggi alla coda principale una volta scaduti.

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (Message - Time To Live)

    Questo è normalmente usato per rimuovere automaticamente i vecchi messaggi nella coda dopo una durata specifica, ma aggiungendo due argomenti opzionali possiamo cambiare questo comportamento, e invece questo parametro determina in millisecondi per quanto tempo i messaggi rimarranno nella coda di delay.

  • x--alternativo-chiave lettera morta

    Questa variabile ci consente di trasferire il messaggio in una coda diversa una volta scaduti, invece del comportamento predefinito di rimuoverlo completamente.

  • x-dead-letter-scambio

    Questa variabile determina quale Exchange ha usato per trasferire il messaggio da hello_delay a hello queue.

Pubblicazione sulla coda di ritardo

Quando abbiamo finito di impostare tutti i parametri di base di Pika, semplicemente invii un messaggio alla coda di ritardo usando la pubblicazione di base.

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

Una volta eseguito lo script, dovresti vedere le seguenti code create nel tuo modulo di gestione RabbitMQ. inserisci la descrizione dell'immagine qui

Esempio.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow