application配置
<!-- 集成ActiveMQ -->
<!-- 账号密码 暂时为 admin 未作更改 -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
<!-- SingleConnectionFactory:对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。 -->
<!-- CachingConnectionFactory:继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能, -->
<!-- 同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。 -->
<!-- 配置JMS连接工厂-->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<!-- <constructor-arg ref="amqConnectionFactory" /> --><!-- 相当于上面一行 -->
<property name="sessionCacheSize" value="100" /><!-- session缓存数量 -->
</bean>
<!-- 定义消息队列目的地 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg value="Movingupon"/>
</bean>
<!-- 配置JMS模板,Spring提供的JMS工具类。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destination" />
<property name="receiveTimeout" value="10000" /><!-- 接收超时时间 -->
<!-- true是topic(订阅),false是queue(队列),默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 使用注解 -->
<!-- 监听器注解支持 -->
<!-- <jms:annotation-driven /> -->
<!-- 监听器相当于消费者,会立即消费监听到的消息 -->
<!-- 配置消息队列监听者 -->
<bean id="queueMessageListener" class="com.mov.mwweb.service.activeMQ.listener.QueueMessageListener" />
<!-- 使用bean注入 -->
<!-- 消息监听容器,配置连接工厂,监听的目标是destination,监听器是上面定义的监听器 -->
<bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import com.mov.mw.service.third.jpush.IJPushService;
import com.mov.mw.service.third.jpush.imp.JPushServiceImpl;
/**
* 队列监听器 监听器相当已一个消费者,会立即消费监听到的消息
* @author wst 2018-2-28 上午10:23:45
*
*/
//@Component
//@EnableJms
public class QueueMessageListener implements MessageListener {
//@JmsListener(containerFactory="jmsListenerContainerFactory", destination = "Movingupon")
public void onMessage(Message message) {
processMessage(message);
}
/**
* 消息处理流程
* @author wst 2018-3-5 下午2:01:14
* @param message
*/
public void processMessage(Message message){
try {
//map信息体-使用这种
MapMessage msg = (MapMessage) message;
String[] aliases = {msg.getString("target")};
IJPushService iJPushService = new JPushServiceImpl();
String isCustom = msg.getString("isCustom");
// 自定义消息是没有通知的(透传)
if(isCustom.equals("Y")){
iJPushService.sendToAliasesMessage(aliases, msg.getString("title"), msg.getString("alert"), msg.getString("extrasparam"), msg.getString("msgType"));
}else{
iJPushService.sendToAliases(
aliases,
msg.getString("alert"),
msg.getString("title"),
msg.getString("extrasparam"),
msg.getString("msgType"));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
/**
* 消费者
* @author wst 2018-2-28 上午10:24:03
*
*/
@Service(value = "consumer")
public class ConsumerService {
@Autowired
private JmsTemplate jmsTemplate;
/**
* 接收队列消息
*
* @author wst 2018-3-2 下午5:56:52
* @param destination
* @return
*/
public TextMessage receive(Destination destination) {
// String queueName = destination.toString();
TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
try {
System.out.println(Thread.currentThread().getName() + "消费了 " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
return textMessage;
}
}
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import org.json.simple.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import com.mov.mw.basic.ServerResponse;
import com.mov.mw.db.entity.MsgMessage;
import com.mov.mw.enums.ResponseMessageEnum;
import com.mov.mw.exception.MwException;
import com.mov.mwweb.service.ent.IEntMsgService;
/**
* 生产者
* @author wst 2018-2-28 下午12:48:24
*
*/
@Service(value = "producer")
public class ProducerService {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
IEntMsgService iEntMsgService;
/**
* 发送队列信息
*
* @author wst 2018-2-28 下午1:17:16
* @param msg 信息
*/
public void sendMessage(final String message) {
// Destination destination = jmsTemplate.getDefaultDestination();//实际是Object,队列的名字
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
/**
* 创建json消息
*
* @author wst 2018-3-5 下午4:06:44
* @param userId 用户id
* @param messageType 消息类型
* @param target 推送目标,例如registrationId、tag....
* @param message 消息
* @param langVer
*/
public void creareMsg(Integer userId, final String messageType, final String target,
final String message, final String langVer) {
try {
final String mt = iEntMsgService.getMessageTypeByCode(messageType, langVer);
// 将消息保存导数据库表
MsgMessage msg = new MsgMessage();
msg.setUserId(userId);
msg.setMessageType(messageType);
msg.setTitle(mt);
msg.setContent(message);
// msg.setBizId(bizId);
// msg.setCustomParam(customParam);
// msg.setStatus(status);
msg.setCreateTime(new Date());
final ServerResponse<Integer> result = iEntMsgService.insertMessage(msg, langVer);
// 消息加入队列 仅在保存成功之后才加入到队列
if (result.getStatus() == 0) {
// 使用json形式拼接消息并发送消息
// JSONObject jsonMessage = new JSONObject();
// jsonMessage.put("target", target);
// jsonMessage.put("alert", message);
// jsonMessage.put("title",
// iEntMsgService.getMessageTypeByCode(messageType, langVer));
// jsonMessage.put("extras", createJson(result.getData(), mt, message));
// jsonMessage.put("customParam", "");
// jsonMessage.put("status", "");
// jsonMessage.put("bizId", "");
// sendMessage(jsonMessage.toJSONString());
// 使用jms模板拼接消息并发送消息
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage msg = session.createMapMessage();
msg.setString("target", target);
msg.setString("alert", message);
msg.setString("isCustom", "N");
msg.setString("title", iEntMsgService.getMessageTypeByCode(messageType, langVer));
msg.setString("extrasparam", createJson(result.getData(), mt, message));
return msg;
}
});
}
} catch (Exception e) {
e.printStackTrace();
throw new MwException(ResponseMessageEnum.SAVE_FAIL.getMessage(langVer));
}
}
/**
* 创建json消息
*
* @author wst 2018-3-5 下午4:06:44
* @param userId 用户id
* @param messageType 消息类型
* @param target 推送目标,例如registrationId、tag....
* @param isCustom 自定义消息 "Y" (透传) or "N"
* @param message 消息
* @param langVer
*/
public void creareisCustomMsg(Integer userId, final String messageType, final String target,
final String message, final String isCustom, final String langVer) {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage msg = session.createMapMessage();
msg.setString("target", target);
msg.setString("isCustom", isCustom);
msg.setString("alert", message);
msg.setString("title", "Movingupon");
msg.setString("msgType", messageType);
msg.setString("extrasparam", "");
return msg;
}
});
}
/**
* 创建消息notification
*
* @author wst 2018-3-19 上午11:27:02
* @param msgId 消息id
* @param title 消息标题
* @param content 消息内容
* @return
*/
@SuppressWarnings("unchecked")
public String createJson(Integer msgId, String title, String content) {
JSONObject jmsg = new JSONObject();
jmsg.put("msgId", msgId);
jmsg.put("title", title);
jmsg.put("content", content);
return jmsg.toString();
}
}