Python Language
RabQMQ का परिचय AMQPStorm का उपयोग करके
खोज…
टिप्पणियों
के नवीनतम संस्करण AMQPStorm पर उपलब्ध है pypi या आप उपयोग कर स्थापित कर सकते हैं पिप
pip install amqpstorm
RabbitMQ के संदेशों का उपभोग कैसे करें
पुस्तकालय आयात करने के साथ शुरू करो।
from amqpstorm import Connection
संदेशों का उपभोग करते समय, हमें पहले आने वाले संदेशों को संभालने के लिए एक फ़ंक्शन को परिभाषित करना होगा। यह किसी भी प्रतिदेय समारोह हो, और संदेश वस्तु, या संदेश टपल (के आधार पर लेने के लिए है सकते हैं to_tuple
पैरामीटर में परिभाषित start_consuming
)।
आने वाले संदेश से डेटा को संसाधित करने के अलावा, हमें संदेश को स्वीकार या अस्वीकार भी करना होगा। यह महत्वपूर्ण है, क्योंकि हमें यह बताने की आवश्यकता है कि RabbitMQ ने हमें संदेश को ठीक से प्राप्त और संसाधित किया है।
def on_message(message):
"""This function is called on message received.
:param message: Delivered message.
:return:
"""
print("Message:", message.body)
# Acknowledge that we handled the message without any issues.
message.ack()
# Reject the message.
# message.reject()
# Reject the message, and put it back in the queue.
# message.reject(requeue=True)
अगला हमें RabbitMQ सर्वर से कनेक्शन स्थापित करने की आवश्यकता है।
connection = Connection('127.0.0.1', 'guest', 'guest')
उसके बाद हमें एक चैनल स्थापित करने की आवश्यकता है। प्रत्येक कनेक्शन में कई चैनल हो सकते हैं, और सामान्य तौर पर बहु-थ्रेडेड कार्य करते समय, इसकी सिफारिश की जाती है (लेकिन आवश्यक नहीं है) एक थ्रेड प्रति एक है।
channel = connection.channel()
एक बार जब हम अपना चैनल सेट कर लेते हैं, तो हमें RabbitMQ को यह बताना होगा कि हम संदेशों का उपभोग करना शुरू करना चाहते हैं। इस मामले में हम अपने पहले से इस्तेमाल किए गए संदेशों को संभालने के लिए अपने पहले से परिभाषित on_message
फ़ंक्शन का उपयोग करेंगे।
RabbitMQ सर्वर पर हम जिस कतार के बारे में सुन रहे हैं, वह simple_queue
होने जा रहा है, और हम RabbitMQ को यह भी बता रहे हैं कि हम उनके साथ काम करते ही आने वाले सभी संदेशों को स्वीकार कर लेंगे।
channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
अंत में हमें RabbitMQ सर्वर द्वारा दिए गए प्रोसेसिंग संदेशों को शुरू करने के लिए IO लूप शुरू करना होगा।
channel.start_consuming(to_tuple=False)
RabbitMQ को संदेश कैसे प्रकाशित करें
पुस्तकालय आयात करने के साथ शुरू करो।
from amqpstorm import Connection
from amqpstorm import Message
अगला हमें RabbitMQ सर्वर से कनेक्शन खोलने की आवश्यकता है।
connection = Connection('127.0.0.1', 'guest', 'guest')
उसके बाद हमें एक चैनल स्थापित करने की आवश्यकता है। प्रत्येक कनेक्शन में कई चैनल हो सकते हैं, और सामान्य तौर पर बहु-थ्रेडेड कार्य करते समय, इसकी सिफारिश की जाती है (लेकिन आवश्यक नहीं है) एक थ्रेड प्रति एक है।
channel = connection.channel()
एक बार जब हम अपना चैनल सेट कर लेते हैं, तो हम अपना संदेश तैयार करना शुरू कर सकते हैं।
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)
अब हम बस को फोन करके संदेश प्रकाशित कर सकते हैं publish
और एक प्रदान routing_key
। इस मामले में हम मैसेज को एक कतार में भेजने जा रहे हैं जिसे simple_queue
कहा जाता है।
message.publish(routing_key='simple_queue')
RabbitMQ में विलंबित कतार कैसे बनाएं
पहले हमें दो बुनियादी चैनल स्थापित करने की आवश्यकता है, एक मुख्य कतार के लिए, और एक विलंब कतार के लिए। अंत में मेरे उदाहरण में, मुझे कुछ अतिरिक्त झंडे शामिल हैं जिनकी आवश्यकता नहीं है, लेकिन कोड को अधिक विश्वसनीय बनाता है; जैसे confirm delivery
, delivery_mode
और durable
confirm delivery
। आप RabbitMQ मैनुअल में इन पर अधिक जानकारी पा सकते हैं।
हमने जिन चैनलों को सेट किया है, उसके बाद हम मुख्य चैनल के लिए एक बंधन जोड़ते हैं जिसका उपयोग हम देरी चैनल से संदेश भेजने के लिए अपनी मुख्य कतार में कर सकते हैं।
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
अगला हमें अपने विलंबित चैनल को मुख्य कतार में संदेशों को अग्रेषित करने के लिए कॉन्फ़िगर करना होगा, जब वे समाप्त हो जाएंगे।
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000,
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': 'hello'
})
x-message-ttl (संदेश - लाइव टू टाइम)
यह आमतौर पर एक विशिष्ट अवधि के बाद कतार में पुराने संदेशों को स्वचालित रूप से हटाने के लिए उपयोग किया जाता है, लेकिन दो वैकल्पिक तर्कों को जोड़कर हम इस व्यवहार को बदल सकते हैं, और इसके बजाय मिलीसेकंड में इस पैरामीटर को निर्धारित करते हैं कि विलंब कतार में कितने समय तक संदेश रहेंगे।
यह चर हमें संदेश को पूरी तरह से हटाने के डिफ़ॉल्ट व्यवहार के बजाय, समाप्त होने पर एक अलग कतार में स्थानांतरित करने की अनुमति देता है।
यह वैरिएबल निर्धारित करता है कि एक्सचेंज ने किस संदेश को hello_delay से हैलो कतार में स्थानांतरित किया है।
देरी कतार में प्रकाशन
जब हम सभी बुनियादी पिका पैरामीटर सेट कर रहे होते हैं तो आप मूल प्रकाशन का उपयोग करके देरी कतार में एक संदेश भेजते हैं।
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mod': 2})
एक बार जब आप स्क्रिप्ट निष्पादित कर लेते हैं, तो आपको अपने RabbitMQ प्रबंधन मॉड्यूल में बनाई गई निम्न कतारों को देखना चाहिए।
उदाहरण।
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic.publish(exchange='',
routing_key='hello_delay',
body='test',
properties={'delivery_mode': 2})
print("[x] Sent")