(3条消息) RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案二
RabbitMQ的大约的介绍,上一篇已经有介绍了,这篇不介绍,直接描述RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)。使用了Spring-rabbit 发送消息和接收消息,我们使用的Maven来管理Jar包,在Maven的pom.xml文件中引入jar包[java] view plain copy
<span style="font-size:18px;"> <dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.3.6.RELEASE</version></dependency></span>1.实现生产者 第一步:是要设置调用安装RabbitMQ的IP、端口等配置一个global.properties文件
第二步:通过SpringMVC把global.properties文件读进来[java] view plain copy
<span style="font-size:18px;"><!-- 注入属性文件 --><bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"><property name="locations"><list><value>classpath:global.properties</value></list></property></bean> </span>第三步:配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类等,在SpringMVC的配置文件加入下面这些<bean id="rmqProducer2" class="cn.test.spring.rabbitmq.RmqProducer"></bean>[java] view plain copy
<span style="font-size:18px;"> <!-- 创建连接类 --><bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"><constructor-arg value="localhost" /><property name="username" value="${rmq.manager.user}" /><property name="password" value="${rmq.manager.password}" /><property name="host" value="${rmq.ip}" /><property name="port" value="${rmq.port}" /></bean><bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin"><constructor-arg ref="connectionFactory" /></bean><!-- 创建rabbitTemplate 消息模板类 --><bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"><constructor-arg ref="connectionFactory"></constructor-arg></bean> </span>第四步:实现消息类实体和发送消息 类实体[java] view plain copy
<span style="font-size:18px;">/*** 消息**/public class RabbitMessage implements Serializable{private static final long serialVersionUID = -6487839157908352120L;private Class<?>[] paramTypes;//参数类型private String exchange;//交换器private Object[] params;private String routeKey;//路由keypublic RabbitMessage(){}public RabbitMessage(String exchange,String routeKey,Object...params){this.params=params;this.exchange=exchange;this.routeKey=routeKey;}@SuppressWarnings("rawtypes")public RabbitMessage(String exchange,String routeKey,String methodName,Object...params){this.params=params;this.exchange=exchange;this.routeKey=routeKey;int len=params.length;Class[] clazzArray=new Class[len];for(int i=0;i<len;i++)clazzArray[i]=params[i].getClass();this.paramTypes=clazzArray;}public byte[] getSerialBytes(){byte[] res=new byte[0];ByteArrayOutputStream baos=new ByteArrayOutputStream();ObjectOutputStream oos;try {oos = new ObjectOutputStream(baos);oos.writeObject(this);oos.close();res=baos.toByteArray();} catch (IOException e) {e.printStackTrace();}return res;}public String getRouteKey() {return routeKey;}public String getExchange() {return exchange;}public void setExchange(String exchange) {this.exchange = exchange;}public void setRouteKey(String routeKey) {this.routeKey = routeKey;}public Class<?>[] getParamTypes() {return paramTypes;}public Object[] getParams() {return params;}}</span>发送消息[java] view plain copy
<span style="font-size:18px;">/*** 生产着**/public class RmqProducer{@Resourceprivate RabbitTemplate rabbitTemplate;/*** 发送信息* @param msg*/public void sendMessage(RabbitMessage msg){try {System.out.println(rabbitTemplate.getConnectionFactory().getHost());System.out.println(rabbitTemplate.getConnectionFactory().getPort());//发送信息rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);} catch (Exception e) {}}}</span>说明: 1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);源代码中的send调用的方法,一些发送消息帮我们实现好了。
2.上面的代码实现没申明交换器和队列,RabbitMQ不知交换器和队列他们的绑定关系,如果RabbitMQ管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。
我们也可以用代码申明:rabbitAdmin要申明:eclareExchange方法 参数是交换器BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchangerabbitAdmin.declareBinding(binding);//声明绑定关系源代码有这些方法:
这样就可以实现交换器和队列的绑定关系交换器我们可以申明为持久化,还有使用完不会自动删除TopicExchange 参数的说明:name是交换器名称,durable:true 是持久化 autoDelete:false使用完不删除源代码:
队列也可以申明为持久化
第五步:实现测试类[java] view plain copy
<span style="font-size:18px;">@Resourceprivate RmqProducer rmqProducer2;@Testpublic void test() throws IOException{String exchange="testExchange";交换器String routeKey="testQueue";//队列String methodName="test";//调用的方法//参数Map<String,Object> param=new HashMap<String, Object>();param.put("data","hello");RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param);//发送消息rmqProducer2.sendMessage(msg);}</span>结果:RabbitMQ有一条消息
2.消费者第一步:RabbitMQ服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类、消息转换器、消息转换器监听器等,在SpringMVC的配置文件加入下面这些[java] view plain copy
<span style="font-size:18px;"> <!-- 创建连接类 --><bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"><constructor-arg value="localhost" /><property name="username" value="${rmq.manager.user}" /><property name="password" value="${rmq.manager.password}" /><property name="host" value="${rmq.ip}" /><property name="port" value="${rmq.port}" /></bean><bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin"><constructor-arg ref="connectionFactory" /></bean><!-- 创建rabbitTemplate 消息模板类 --><bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"><constructor-arg ref="connectionFactory"></constructor-arg></bean><!-- 创建消息转换器为SimpleMessageConverter --><bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean><!-- 设置持久化的队列 --><bean id="queue" class="org.springframework.amqp.core.Queue"><constructor-arg index="0" value="testQueue"></constructor-arg><constructor-arg index="1" value="true"></constructor-arg><constructor-arg index="2" value="false"></constructor-arg><constructor-arg index="3" value="false"></constructor-arg></bean><!--创建交换器的类型 并持久化--><bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange"><constructor-arg index="0" value="testExchange"></constructor-arg><constructor-arg index="1" value="true"></constructor-arg><constructor-arg index="2" value="false"></constructor-arg></bean><util:map id="arguments"></util:map><!-- 绑定交换器、队列 --><bean id="binding" class="org.springframework.amqp.core.Binding"><constructor-arg index="0" value="testQueue"></constructor-arg><constructor-arg index="1" value="QUEUE"></constructor-arg><constructor-arg index="2" value="testExchange"></constructor-arg><constructor-arg index="3" value="testQueue"></constructor-arg><constructor-arg index="4" value="#{arguments}"></constructor-arg></bean><!-- 用于接收消息的处理类 --><bean id="rmqConsumer" class="cn.test.spring.rabbitmq.RmqConsumer"></bean><bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"><constructor-arg ref="rmqConsumer" /><property name="defaultListenerMethod" value="rmqProducerMessage"></property><property name="messageConverter" ref="serializerMessageConverter"></property></bean><!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列 queues可以传多个--><bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"><property name="queues" ref="queue"></property><property name="connectionFactory" ref="connectionFactory"></property><property name="messageListener" ref="messageListenerAdapter"></property></bean></span>说明:1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以传入多个队列
2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter有哪个消费者适配器来处理 ,参数defaultListenerMethod是默认调用方法来处理消息。3.交换器和队列的持久化在生产者有介绍过了。4.org.springframework.amqp.core.Binding这个类的绑定,在SpringMVC配置文件中配置时,DestinationType这个参数要注意点源代码:
第二步:处理消息[java] view plain copy
<span style="font-size:18px;">/*** 消费者**/public class RmqConsumer{public void rmqProducerMessage(Object object){RabbitMessage rabbitMessage=(RabbitMessage) object;System.out.println(rabbitMessage.getExchange());System.out.println(rabbitMessage.getRouteKey());System.out.println(rabbitMessage.getParams().toString());}}</span>在启动过程中会报这样的错误,可能是你的交换器和队列没配置好