java-ee
Служба обмена сообщениями Java (JMS)
Поиск…
Вступление
Java Message Service - это Java API, который позволяет приложениям создавать, отправлять, получать и читать сообщения. API JMS определяет общий набор интерфейсов и связанных семантик, которые позволяют программам, написанным на языке программирования Java, взаимодействовать с другими реализациями обмена сообщениями. JMS обеспечивает связь, которая не только слабо связана, но также асинхронна и надежна.
замечания
Служба сообщений Java (JMS) - это стандартный Java API, который позволяет приложениям создавать, отправлять, получать и читать сообщения асинхронно.
JMS определяет общий набор интерфейсов и классов, которые позволяют приложениям взаимодействовать с другими поставщиками сообщений.
JMS похожа на JDBC: JDBC подключается к различным базам данных (Derby, MySQL, Oracle, DB2 и т. Д.), А JMS подключается к различным провайдерам (OpenMQ, MQSeries, SonicMQ и т. Д.).
Реализация JMS-ссылки - это Open Message Queue (OpenMQ). Это проект с открытым исходным кодом и может использоваться в автономных приложениях или может быть создан на сервере приложений. Это поставщик JMS по умолчанию, интегрированный в GlassFish.
Создание ConnectionFactory
Заводы подключений - это управляемые объекты, которые позволяют приложению подключаться к провайдеру, создавая объект Connection
. javax.jms.ConnectionFactory
- это интерфейс, который инкапсулирует параметры конфигурации, определенные администратором.
Для использования ConnectionFactory
клиент должен выполнить поиск JNDI (или использовать инъекцию). Следующий код получает объект JNDI InitialContext
и использует его для поиска объекта ConnectionFactory
под именем JNDI:
Context ctx = new InitialContext();
ConnectionFactory connectionFactory =
(ConnectionFactory) ctx.lookup("jms/javaee7/ConnectionFactory");
Методами, доступными в этом интерфейсе, являются createConnection()
которые возвращают объект Connection
и новые методы JMS 2.0 createContext()
которые возвращают JMSContext
.
Можно создать Connection
или JMSContext
либо с идентификатором пользователя по умолчанию, либо указав имя пользователя и пароль:
public interface ConnectionFactory {
Connection createConnection() throws JMSException;
Connection createConnection(String userName, String password) throws JMSException;
JMSContext createContext();
JMSContext createContext(String userName, String password);
JMSContext createContext(String userName, String password, int sessionMode);
JMSContext createContext(int sessionMode);
}
Использование библиотеки ActiveMQ для обмена сообщениями (реализаций, связанных с конкретным провайдером activemq jms)
Настройка ActiveMQ
- Загрузите дистрибутив ActiveMQ с сайта activemq.apache.org и распакуйте его где-нибудь
- Вы можете сразу запустить сервер, запуская необеспеченный на localhost, используя скрипт bin / activemq
- Когда он запущен, вы можете получить доступ к консоли своего локального сервера по адресу http: // localhost: 8161 / admin /
- Настройте его, изменив conf / activemq.xml
- Поскольку в заголовке предлагаются следующие примеры конкретных реализаций провайдера userimq jms, и поэтому activemq-all.jar необходимо добавить в путь к классам.
Отправка сообщения через автономный клиент
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsClientMessageSender {
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQ-specific
Connection con = null;
try {
con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); // non-transacted session
Queue queue = session.createQueue("test.queue"); // only specifies queue name
MessageProducer producer = session.createProducer(queue);
Message msg = session.createTextMessage("hello queue"); // text message
producer.send(msg);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (con != null) {
try {
con.close(); // free all resources
} catch (JMSException e) { /* Ignore */ }
}
}
}
}
Опрос сообщений
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsClientMessagePoller {
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQ-specific
Connection con = null;
try {
con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); // non-transacted session
Queue queue = session.createQueue("test.queue"); // only specifies queue name
MessageConsumer consumer = session.createConsumer(queue);
con.start(); // start the connection
while (true) { // run forever
Message msg = consumer.receive(); // blocking!
if (!(msg instanceof TextMessage))
throw new RuntimeException("Expected a TextMessage");
TextMessage tm = (TextMessage) msg;
System.out.println(tm.getText()); // print message content
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
con.close();
} catch (JMSException e) {/* Ignore */ }
}
}
}
Использование MessageListener
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsClientMessageListener {
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQ-specific
Connection con = null;
try {
con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); // non-transacted session
Queue queue = session.createQueue("test.queue"); // only specifies queue name
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
if (!(msg instanceof TextMessage))
throw new RuntimeException("no text message");
TextMessage tm = (TextMessage) msg;
System.out.println(tm.getText()); // print message
} catch (JMSException e) {
System.err.println("Error reading message");
}
}
});
con.start(); // start the connection
Thread.sleep(60 * 1000); // receive messages for 60s
} catch (JMSException e1) {
e1.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
con.close(); // free all resources
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
Использование поиска на основе jndi для обмена сообщениями (пример, не относящийся к реализации)
Этот метод позволяет писать и развертывать неспецифический код на нескольких платформах jms. Ниже базового примера подключается к серверу activemq jms и отправляется сообщение.
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class JmsClientJndi {
public static void main(String[] args) {
Properties jndiProps = new Properties();
// Following two could be set via a system property for flexibility in the code.
jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
jndiProps.setProperty(Context.PROVIDER_URL, "tcp://localhost:61616");
QueueConnection conn = null;
QueueSession session = null;
QueueSender sender = null;
InitialContext jndi = null;
try {
jndi = new InitialContext(jndiProps);
QueueConnectionFactory factory = (QueueConnectionFactory) jndi.lookup("ConnectionFactory");
conn = factory.createQueueConnection();
conn.start();
session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) jndi.lookup("dynamicQueues/test.queue");
sender = session.createSender(queue);
TextMessage msg = session.createTextMessage();
msg.setText("Hello worlds !!!!! ");
sender.send(msg);
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (sender != null)
sender.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}