远方蔚蓝
一刹那情真,相逢不如不见

文章数量 126

访问次数 199887

运行天数 1437

最近活跃 2024-10-04 23:36:48

进入后台管理系统

Spring整合activeMQ


application配置
    <!-- 集成ActiveMQ -->
    <!-- 账号密码 暂时为 admin 未作更改 -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
    
    <!-- SingleConnectionFactory:对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。 -->
	<!-- CachingConnectionFactory:继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能, -->
	<!-- 同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。 -->
                           
    <!-- 配置JMS连接工厂-->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <!-- <constructor-arg ref="amqConnectionFactory" /> --><!-- 相当于上面一行 -->
        <property name="sessionCacheSize" value="100" /><!-- session缓存数量 -->
    </bean>
    <!-- 定义消息队列目的地 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg value="Movingupon"/>
    </bean>
    <!-- 配置JMS模板,Spring提供的JMS工具类。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="destination" />
        <property name="receiveTimeout" value="10000" /><!-- 接收超时时间 -->
        <!-- true是topic(订阅),false是queue(队列),默认是false,此处显示写出false -->
        <property name="pubSubDomain" value="false" />
    </bean>
    
    <!-- 使用注解 -->
    <!-- 监听器注解支持 -->  
    <!-- <jms:annotation-driven /> -->
	<!-- 监听器相当于消费者,会立即消费监听到的消息 -->
    <!-- 配置消息队列监听者 -->
    <bean id="queueMessageListener" class="com.mov.mwweb.service.activeMQ.listener.QueueMessageListener" />
	<!-- 使用bean注入 -->
    <!-- 消息监听容器,配置连接工厂,监听的目标是destination,监听器是上面定义的监听器 -->
      
    <bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="destination" />
        <property name="messageListener" ref="queueMessageListener" />
    </bean>
     
    <bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import com.mov.mw.service.third.jpush.IJPushService;
import com.mov.mw.service.third.jpush.imp.JPushServiceImpl;
/**
 * 队列监听器 监听器相当已一个消费者,会立即消费监听到的消息
 * @author wst 2018-2-28 上午10:23:45
 *
 */
//@Component 
//@EnableJms 
public class QueueMessageListener implements MessageListener {
	
	//@JmsListener(containerFactory="jmsListenerContainerFactory", destination = "Movingupon")  
    public void onMessage(Message message) {
		processMessage(message);
    }
    
    /**
     * 消息处理流程
     * @author wst 2018-3-5 下午2:01:14
     * @param message
     */
    public void processMessage(Message message){
    	try {
    	//map信息体-使用这种
    	MapMessage msg = (MapMessage) message;
    	String[] aliases = {msg.getString("target")};
    	IJPushService iJPushService = new JPushServiceImpl();
    	String isCustom = msg.getString("isCustom");
    	// 自定义消息是没有通知的(透传)
    	if(isCustom.equals("Y")){
    		iJPushService.sendToAliasesMessage(aliases, msg.getString("title"), msg.getString("alert"), msg.getString("extrasparam"), msg.getString("msgType"));
    	}else{
    		iJPushService.sendToAliases(
        			aliases,
        			msg.getString("alert"),
    				msg.getString("title"),
    				msg.getString("extrasparam"),
    				msg.getString("msgType"));
    		}
		} catch (Exception e) {
			e.printStackTrace();
		}
    }
}
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
/**
 * 消费者
 * @author wst 2018-2-28 上午10:24:03
 *
 */
@Service(value = "consumer")
public class ConsumerService {
	@Autowired
	private JmsTemplate jmsTemplate;
	/**
	 * 接收队列消息
	 * 
	 * @author wst 2018-3-2 下午5:56:52
	 * @param destination
	 * @return
	 */
	public TextMessage receive(Destination destination) {
		// String queueName = destination.toString();
		TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
		try {
			System.out.println(Thread.currentThread().getName() + "消费了 " + textMessage.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
		return textMessage;
	}
}
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import org.json.simple.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.mov.mw.basic.ServerResponse;
import com.mov.mw.db.entity.MsgMessage;
import com.mov.mw.enums.ResponseMessageEnum;
import com.mov.mw.exception.MwException;
import com.mov.mwweb.service.ent.IEntMsgService;
/**
 * 生产者
 * @author wst 2018-2-28 下午12:48:24
 *
 */
@Service(value = "producer")
public class ProducerService {
	@Autowired
	private JmsTemplate jmsTemplate;
	@Autowired
	IEntMsgService iEntMsgService;
	/**
	 * 发送队列信息
	 * 
	 * @author wst 2018-2-28 下午1:17:16
	 * @param msg 信息
	 */
	public void sendMessage(final String message) {
		// Destination destination = jmsTemplate.getDefaultDestination();//实际是Object,队列的名字
		jmsTemplate.send(new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}
	
	/**
	 * 创建json消息
	 * 
	 * @author wst 2018-3-5 下午4:06:44
	 * @param userId 用户id
	 * @param messageType 消息类型
	 * @param target 推送目标,例如registrationId、tag....
	 * @param message 消息
	 * @param langVer
	 */
	public void creareMsg(Integer userId, final String messageType, final String target,
			 final String message, final String langVer) {
		try {
			final String mt = iEntMsgService.getMessageTypeByCode(messageType, langVer);
			// 将消息保存导数据库表
			MsgMessage msg = new MsgMessage();
			msg.setUserId(userId);
			msg.setMessageType(messageType);
			msg.setTitle(mt);
			msg.setContent(message);
			// msg.setBizId(bizId);
			// msg.setCustomParam(customParam);
			// msg.setStatus(status);
			msg.setCreateTime(new Date());
			final ServerResponse<Integer> result = iEntMsgService.insertMessage(msg, langVer);
		
			// 消息加入队列 仅在保存成功之后才加入到队列
			if (result.getStatus() == 0) {
				// 使用json形式拼接消息并发送消息
				// JSONObject jsonMessage = new JSONObject();
				// jsonMessage.put("target", target);
				// jsonMessage.put("alert", message);
				// jsonMessage.put("title",
				// iEntMsgService.getMessageTypeByCode(messageType, langVer));
				// jsonMessage.put("extras", createJson(result.getData(), mt, message));
				// jsonMessage.put("customParam", "");
				// jsonMessage.put("status", "");
				// jsonMessage.put("bizId", "");
				// sendMessage(jsonMessage.toJSONString());
				
				// 使用jms模板拼接消息并发送消息
				jmsTemplate.send(new MessageCreator() {
					public Message createMessage(Session session) throws JMSException {
						MapMessage msg = session.createMapMessage();
						msg.setString("target", target);
						msg.setString("alert", message);
						msg.setString("isCustom", "N");
						msg.setString("title", iEntMsgService.getMessageTypeByCode(messageType, langVer));
						msg.setString("extrasparam", createJson(result.getData(), mt, message));
						return msg;
					}
				});
			}
		} catch (Exception e) {
			e.printStackTrace();
			throw new MwException(ResponseMessageEnum.SAVE_FAIL.getMessage(langVer));
		}
	}
	
	/**
	 * 创建json消息
	 * 
	 * @author wst 2018-3-5 下午4:06:44
	 * @param userId 用户id
	 * @param messageType 消息类型
	 * @param target 推送目标,例如registrationId、tag....
	 * @param isCustom 自定义消息  "Y" (透传) or "N"
	 * @param message 消息
	 * @param langVer
	 */
	public void creareisCustomMsg(Integer userId, final String messageType, final String target,
			 final String message, final String isCustom, final String langVer) {
			jmsTemplate.send(new MessageCreator() {
				public Message createMessage(Session session) throws JMSException {
					MapMessage msg = session.createMapMessage();
					msg.setString("target", target);
					msg.setString("isCustom", isCustom);
					msg.setString("alert", message);
					msg.setString("title", "Movingupon");
					msg.setString("msgType", messageType);
					msg.setString("extrasparam", "");
					return msg;
				}
			});
	}
	/**
	 * 创建消息notification
	 * 
	 * @author wst 2018-3-19 上午11:27:02
	 * @param msgId 消息id
	 * @param title 消息标题
	 * @param content 消息内容
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public String createJson(Integer msgId, String title, String content) {
		JSONObject jmsg = new JSONObject();
		jmsg.put("msgId", msgId);
		jmsg.put("title", title);
		jmsg.put("content", content);
		return jmsg.toString();
	}
}