博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【一头扎进JMS】(3)----ActiviteMQ点对点消息实现+消息监听
阅读量:4946 次
发布时间:2019-06-11

本文共 4395 字,大约阅读时间需要 14 分钟。

上篇博客介绍了点对点消息的实现,这次将在此基础上,添加消息监听.监听的好处:自动监听消息,在监听到消息生产者消息后立即回传接到命令,并开始进行处理,进行异步处理,避免消费者等待.

消息生产者:

package com.java1234.activemq;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 消息生产者 * @author Administrator * */public class JMSProducer {	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址	private static final int SENDNUM=10; // 发送的消息数量		public static void main(String[] args) {				ConnectionFactory connectionFactory; // 连接工厂		Connection connection = null; // 连接		Session session; // 会话 接受或者发送消息的线程		Destination destination; // 消息的目的地		MessageProducer messageProducer; // 消息生产者				// 实例化连接工厂		connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);				try {			connection=connectionFactory.createConnection(); // 通过连接工厂获取连接			connection.start(); // 启动连接			session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session			destination=session.createQueue("FirstQueue1"); // 创建消息队列			messageProducer=session.createProducer(destination); // 创建消息生产者			sendMessage(session, messageProducer); // 发送消息			session.commit();		} catch (Exception e) {			// TODO Auto-generated catch block			e.printStackTrace();		} finally{			if(connection!=null){				try {					connection.close();				} catch (JMSException e) {					// TODO Auto-generated catch block					e.printStackTrace();				}			}		}	}		/**	 * 发送消息	 * @param session	 * @param messageProducer	 * @throws Exception	 */	public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{		for(int i=0;i

监听器:

package com.java1234.activemq;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * 消息监听 * @author Administrator * */public class Listener implements MessageListener{	@Override	public void onMessage(Message message) {		// TODO Auto-generated method stub		try {			System.out.println("收到的消息:"+((TextMessage)message).getText());		} catch (JMSException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}	}}

消息消费者:

package com.java1234.activemq;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 消息消费者 * @author Administrator * */public class JMSConsumer2 {	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址		public static void main(String[] args) {		ConnectionFactory connectionFactory; // 连接工厂		Connection connection = null; // 连接		Session session; // 会话 接受或者发送消息的线程		Destination destination; // 消息的目的地		MessageConsumer messageConsumer; // 消息的消费者				// 实例化连接工厂		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);						try {			connection=connectionFactory.createConnection();  // 通过连接工厂获取连接			connection.start(); // 启动连接			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session			destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列			//发布者和消费者模式的就是这个不同-start			//destination=session.createTopic("FirstQueue1");  // 创建订阅者连接的消息队列			//发布者和消费者模式的就是这个不同-end			messageConsumer=session.createConsumer(destination); // 创建消息消费者			//添加监听-start			messageConsumer.setMessageListener(new Listener()); // 注册消息监听			//监听的好处:自动监听消息			//生产者生产出来以后,主动通知它,消费者激发事件,直接去接受就可以了.			//添加监听-end		} catch (JMSException e) {			// TODO Auto-generated catch block			e.printStackTrace();		} 	}}
在消息消费者中,添加监听后,只是在代码中多配置了消息监听.

执行结果:

添加了监听器后,先执行消息消费者,然后再执行消息生产者,这样,如果消息生产者有消息产生的时候,消息监听就会监听到.

转载于:https://www.cnblogs.com/chenxiaochan/p/7253389.html

你可能感兴趣的文章
类型重命名 typedef
查看>>
leecode第七十二题(编辑距离)
查看>>
git 常用命令
查看>>
java休眠
查看>>
Android之SQLite
查看>>
kafka基本原理
查看>>
Silverlight实用窍门系列:16.以某点为圆心绘制多条线,线与线之间角度相同以组成圆【附带源码实例】...
查看>>
MFC 类库
查看>>
精读《syntax-parser 源码》
查看>>
审核流(3)低调奢华,简单不凡,实例演示-SNF.WorkFlow--SNF快速开发平台3.1
查看>>
Ionic1.x项目中的Installing npm packages问题
查看>>
第三周作业(更新)
查看>>
CSS3 transition属性配合Js实现超链接“背景”过渡渐变出现效果
查看>>
(巧用)事件代理
查看>>
Jmeter非命令行执行脚本
查看>>
Python学习笔记-05
查看>>
人情与面子
查看>>
JS加载获取父窗体传递的参数
查看>>
NumPy 学习笔记(三)
查看>>
选择排序详解
查看>>