खोज…


टिप्पणियों

के नवीनतम संस्करण AMQPStorm पर उपलब्ध है pypi या आप उपयोग कर स्थापित कर सकते हैं पिप

pip install amqpstorm

RabbitMQ के संदेशों का उपभोग कैसे करें

पुस्तकालय आयात करने के साथ शुरू करो।

from amqpstorm import Connection

संदेशों का उपभोग करते समय, हमें पहले आने वाले संदेशों को संभालने के लिए एक फ़ंक्शन को परिभाषित करना होगा। यह किसी भी प्रतिदेय समारोह हो, और संदेश वस्तु, या संदेश टपल (के आधार पर लेने के लिए है सकते हैं to_tuple पैरामीटर में परिभाषित start_consuming )।

आने वाले संदेश से डेटा को संसाधित करने के अलावा, हमें संदेश को स्वीकार या अस्वीकार भी करना होगा। यह महत्वपूर्ण है, क्योंकि हमें यह बताने की आवश्यकता है कि RabbitMQ ने हमें संदेश को ठीक से प्राप्त और संसाधित किया है।

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

अगला हमें RabbitMQ सर्वर से कनेक्शन स्थापित करने की आवश्यकता है।

connection = Connection('127.0.0.1', 'guest', 'guest')

उसके बाद हमें एक चैनल स्थापित करने की आवश्यकता है। प्रत्येक कनेक्शन में कई चैनल हो सकते हैं, और सामान्य तौर पर बहु-थ्रेडेड कार्य करते समय, इसकी सिफारिश की जाती है (लेकिन आवश्यक नहीं है) एक थ्रेड प्रति एक है।

channel = connection.channel()

एक बार जब हम अपना चैनल सेट कर लेते हैं, तो हमें RabbitMQ को यह बताना होगा कि हम संदेशों का उपभोग करना शुरू करना चाहते हैं। इस मामले में हम अपने पहले से इस्तेमाल किए गए संदेशों को संभालने के लिए अपने पहले से परिभाषित on_message फ़ंक्शन का उपयोग करेंगे।

RabbitMQ सर्वर पर हम जिस कतार के बारे में सुन रहे हैं, वह simple_queue होने जा रहा है, और हम RabbitMQ को यह भी बता रहे हैं कि हम उनके साथ काम करते ही आने वाले सभी संदेशों को स्वीकार कर लेंगे।

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

अंत में हमें RabbitMQ सर्वर द्वारा दिए गए प्रोसेसिंग संदेशों को शुरू करने के लिए IO लूप शुरू करना होगा।

channel.start_consuming(to_tuple=False)

RabbitMQ को संदेश कैसे प्रकाशित करें

पुस्तकालय आयात करने के साथ शुरू करो।

from amqpstorm import Connection
from amqpstorm import Message

अगला हमें RabbitMQ सर्वर से कनेक्शन खोलने की आवश्यकता है।

connection = Connection('127.0.0.1', 'guest', 'guest')

उसके बाद हमें एक चैनल स्थापित करने की आवश्यकता है। प्रत्येक कनेक्शन में कई चैनल हो सकते हैं, और सामान्य तौर पर बहु-थ्रेडेड कार्य करते समय, इसकी सिफारिश की जाती है (लेकिन आवश्यक नहीं है) एक थ्रेड प्रति एक है।

channel = connection.channel()

एक बार जब हम अपना चैनल सेट कर लेते हैं, तो हम अपना संदेश तैयार करना शुरू कर सकते हैं।

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

अब हम बस को फोन करके संदेश प्रकाशित कर सकते हैं publish और एक प्रदान routing_key । इस मामले में हम मैसेज को एक कतार में भेजने जा रहे हैं जिसे simple_queue कहा जाता है।

message.publish(routing_key='simple_queue')

RabbitMQ में विलंबित कतार कैसे बनाएं

पहले हमें दो बुनियादी चैनल स्थापित करने की आवश्यकता है, एक मुख्य कतार के लिए, और एक विलंब कतार के लिए। अंत में मेरे उदाहरण में, मुझे कुछ अतिरिक्त झंडे शामिल हैं जिनकी आवश्यकता नहीं है, लेकिन कोड को अधिक विश्वसनीय बनाता है; जैसे confirm delivery , delivery_mode और durable confirm delivery । आप RabbitMQ मैनुअल में इन पर अधिक जानकारी पा सकते हैं।

हमने जिन चैनलों को सेट किया है, उसके बाद हम मुख्य चैनल के लिए एक बंधन जोड़ते हैं जिसका उपयोग हम देरी चैनल से संदेश भेजने के लिए अपनी मुख्य कतार में कर सकते हैं।

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

अगला हमें अपने विलंबित चैनल को मुख्य कतार में संदेशों को अग्रेषित करने के लिए कॉन्फ़िगर करना होगा, जब वे समाप्त हो जाएंगे।

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})
  • x-message-ttl (संदेश - लाइव टू टाइम)

    यह आमतौर पर एक विशिष्ट अवधि के बाद कतार में पुराने संदेशों को स्वचालित रूप से हटाने के लिए उपयोग किया जाता है, लेकिन दो वैकल्पिक तर्कों को जोड़कर हम इस व्यवहार को बदल सकते हैं, और इसके बजाय मिलीसेकंड में इस पैरामीटर को निर्धारित करते हैं कि विलंब कतार में कितने समय तक संदेश रहेंगे।

  • एक्स-मृत अक्षर-मार्ग कुंजी

    यह चर हमें संदेश को पूरी तरह से हटाने के डिफ़ॉल्ट व्यवहार के बजाय, समाप्त होने पर एक अलग कतार में स्थानांतरित करने की अनुमति देता है।

  • एक्स-मृत अक्षर-विनिमय

    यह वैरिएबल निर्धारित करता है कि एक्सचेंज ने किस संदेश को hello_delay से हैलो कतार में स्थानांतरित किया है।

देरी कतार में प्रकाशन

जब हम सभी बुनियादी पिका पैरामीटर सेट कर रहे होते हैं तो आप मूल प्रकाशन का उपयोग करके देरी कतार में एक संदेश भेजते हैं।

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

एक बार जब आप स्क्रिप्ट निष्पादित कर लेते हैं, तो आपको अपने RabbitMQ प्रबंधन मॉड्यूल में बनाई गई निम्न कतारों को देखना चाहिए। यहाँ छवि विवरण दर्ज करें

उदाहरण।

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")


Modified text is an extract of the original Stack Overflow Documentation
के तहत लाइसेंस प्राप्त है CC BY-SA 3.0
से संबद्ध नहीं है Stack Overflow