上海财经大学:消息中心点亮智慧校园
消息中心的实现依赖于高效可靠的消息队列中间件(简称消息中间件),它可以通过消息传递和消息排队模型,在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等功能。目前,应用比较广泛的消息中间件包括:RabbitMQ、ActiveMQ、Kafka、RocketMQ,其中RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议实现,该协议面向消息、队列和路由(包括点对点和发布/订阅),强调可靠性与安全性,主要应用于对数据一致性、稳定性和可靠性要求很高的场景,此外RabbitMQ还有高可用性、高易用性等优点。结合高校的统一消息服务特点(对数据一致性、稳定性和可靠性要求很高,并发量、吞吐量要求一般),考虑采用RabbitMQ来构建高校智慧校园消息中心。本文主要对基于RabbitMQ构建的智慧校园消息中心的设计方案与实现进行阐述。
RabbitMQ原理
RabbitMQ起源于金融系统,用于在分布式系统中存储转发消息,具有易用性、扩展性、高可用性等优势,其内部结构如图1所示。
图1 RabbitMQ 内部结构
1. Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
2.Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
3.Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
4.Binding:绑定,是消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
5.Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列,消息一直在队列里面,等待消费者连接到这个队列将其取走。
6.Connection:网络连接,比如一个TCP连接。
7.Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内陆虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
8.Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
9.Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的 RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP 概念的基础,必须在连接时指定,RabbitMQ默认的vhost是“/”。
10.Broker:表示消息队列服务器实体。
系统架构
基于RabbitMQ的智慧校园消息中心包括:“消息汇聚层”和“消息下发层”,消息汇聚层完成业务消息的统一汇集与存储,消息下发层则以方便、有效的途径(服务门户、短信消息、微信消息等)将消息下发给师生用户。具体架构如图2所示。
图2 基于 RabbitMQ 的智慧校园消息中心架构
1.消息汇聚层
在消息汇聚层,对于RabbitMQ而言,生产者是消息接口API,业务系统通过调用消息接口API将消息数据放入消息队列;消费者的职责则由后台轮询程序完成。在该架构下,完整的消息集成流程如下:
(1)业务系统调用消息接口API;
(2)消息接口API被调用后,首先将消息数据落地到数据库表,消息记录的初始推送状态设置为“pushstatus=0”,然后以Confirm方式将消息发送给RabbitMQ;
(3)消息接口API在接收到RabbitMQ返回的Confirm消息确认成功后,更新消息记录的推送状态“pushstatus=1”。
(4)轮询程序从RabbitMQ队列读取消息,调用消息汇聚中心接口将消息写入消息汇聚中心数据库表。
其中,步骤三是RabbitMQ的发送确认过程。在此过程中,可能出现网络闪断、MQ Broker端异常等情况,导致回送消息失败或者异常,因此需要发送方(生产者)对消息进行可靠性投递,以保障消息不丢失。为此专门设计了轮询机制,设置定时任务,每5分钟读取一次中间状态的消息(消息可以设置一个超时时间,比如超时1分钟且“pushstatus=0 “,也就是1分钟的时间窗口内没有被确认的消息,才会被定时任务拉取出来),然后将中间状态的消息重新发送到MQ,称之为”Retry send机制“。轮询程序的另外一个功能是定时比较源头与消息汇聚中心的数据差异,将差异数据再次写入消息汇聚中心(相比重新投递的定时任务,此任务的时间窗口应设置的较大,如一天内未成功写入的消息;执行时间间隔也比较长,如一个小时),称之为”Rewrite机制“。因此本方案除了利用RabbitMQ自身的可靠性机制(包括队列持久化、发送确认)之外,”Retry send机制“与”Rewrite机制“作为额外的保障措施,提供了更高的可靠性。
2.消息下发层
在消息下发层,通过调用各类发送渠道,包括校园服务门户(PC与移动)、短信平台、微信平台、邮件平台等,将消息方便及时的推送给师生用户。
系统实现
下文主要针对消息汇聚层中的生产者(消息接口API)和消费者(轮询程序)的实现过程进行阐述。消息下发层以调用第三方程序接口为主,不是本方案的核心内容,故不再赘述。
1.生产者
通过分析实际的应用场景,定义了两种消息类型:提醒与待办,提醒是业务系统发送给用户的提示消息,具有”已读“、”未读“属性;待办则是需要用户办理的一类特殊提醒,具有”未办理“、”已办理“属性。基于消息类型的定义,对于提醒,API提供了”提醒生成“与”提醒已读“两个操作;对于待办,API提供了”待办生成“与”待办消除“两个操作。由于提醒在程序实现上与待办类似,所以下文仅描述待办API的实现过程。
(1)待办生成API
待办生成API程序在功能上主要实现了待办消息数据的落地以及将待办数据放入RabbitMQ消息队列并更新推送状态。主要程序实现(java代码)如下:
/*
*待办数据写入数据库表
*/
try
{
String sql_insert = "insert into " + schema + ".TMP_TODOSERVICE(SEQ_ID, APP_ID, REFNO, MESSAGE_TYPE_CODE, TARGET_TYPE, TARGET_IDS, CONTENT, URL, DO_STEP, CREATETIME, PUSHSTATUS, DOFLAG, PUSHSTATUS_2) " + "values(" + schema + ".SEQ_TMP.NEXTVAL, " + app_id + ", '" + refno + "', '" + message_type_code + "', '" + target_type + "', '" + target_ids + "', '" + content + "', '" + url + "', '" + do_step + "', sysdate, -1, 1, -1)";
st = conn.createStatement();
st.execute(sql_insert);
} catch (Exception e) {
e.printStackTrace();
int i = 2;
return i;
}
/*
*RabbitMQ生产者,将待办数据放入RabbitMQ队列并更新推送状态
*/
JSONObject joTodo = new JSONObject();
joTodo.put("datatype", "push");
joTodo.put("app_id", Long.valueOf(app_id));
joTodo.put("app_key", app_key);
joTodo.put("refno", refno);
joTodo.put("message_type_code", message_type_code);
joTodo.put("target_type", target_type);
joTodo.put("target_ids", target_ids);
joTodo.put("content", content);
joTodo.put("url", url);
joTodo.put("do_step", do_step);
Boolean result = AMQPClientUtil.NewTask("task_queue_todo", joTodo.toString());
(2)待办消除API
消除待办API程序在功能上主要实现了待办完成数据的落地(修改已写入数据库的待办的完成状态)、将待办数据放入RabbitMQ消息队列并更新推送状态。主要程序实现(java代码)如下:
/*
*待办完成数据写入数据库表
*/
try
{
String sql_update = "update " + schema + ".TMP_TODOSERVICE set DOFLAG=0, DONETIME=sysdate " + "where APP_ID=" + app_id + " and REFNO='" + refno + "'";
st = conn.createStatement();
st.execute(sql_update);
} catch (Exception e) {
e.printStackTrace();
int i = 2;
return i;
}
/*
*RabbitMQ生产者,将待办完成数据放入RabbitMQ队列并更新推送状态
*/
JSONObject joTodo_complete = new JSONObject();
joTodo_complete.put("datatype", "complete");
joTodo_complete.put("app_id", Long.valueOf(app_id));
joTodo_complete.put("app_key", app_key);
joTodo_complete.put("refno", refno);
Boolean result = AMQPClientUtil.NewTask("task_queue_todo", oTodo_complete.toString());
2.消费者
作为消费者的轮询程序实现的功能包括:(1)读取消息队列中的待办数据并调用消息汇聚中心接口,将数据写入消息汇聚中心数据库表;(2)“Retry Send”功能,定时拉取推送MQ失败的消息,重新发送给RabbitMQ;(3)“Rewrite”功能,定时比较源头与消息汇聚中心数据的差异(消息落地数据库表与消息汇聚中心数据库表),调用消息汇聚中心接口将差异数据重新写入。主要程序实现(java代码)如下:
/*
*RabbitMQ消费者,从RabbitMQ队列获取待办数据并写入消息汇聚中心
*/
try {
JSONObject jsonObj = JSONObject.fromObject(message);
long app_id = Long.parseLong(jsonObj.get("app_id").toString());
String app_key = jsonObj.get("app_key").toString();
String refno = jsonObj.get("refno").toString();
if (StringUtils.isEmpty(app_key)) {
app_key = (String)appKeyMap.get(Long.valueOf(app_id));
}
if((null != jsonObj.get("datatype")) && ("push".equals(jsonObj.get("datatype").toString()))) {
String message_type_code = jsonObj.get("message_type_code").toString();
String target_type = jsonObj.get("target_type").toString();
String target_ids = jsonObj.get("target_ids").toString();
String content = jsonObj.get("content").toString();
String url = jsonObj.get("url").toString();
String do_step = jsonObj.get("do_step").toString();
ret = TodoIServiceUtil.todoServicePush(portalServerUrl, app_id, app_key, refno, message_type_code, target_type, target_ids, content, url, do_step);//调用消息汇聚中心接口
} else if ((null != jsonObj.get("datatype")) && ("complete".equals(jsonObj.get("datatype").toString())))
{
ret = TodoIServiceUtil.todoServiceComplete(portalServerUrl, app_id, app_key, refno);//调用消息汇聚中心接口
}
}
/*
*定时Retry send
*/
try
{
retrySendTodoPushFailureFromDb();//重发推送mq失败的未完成待办
retrySendTodoStateFailureFromDb();//重发推送mq失败的已完成待办
}
/*
*定时Rewrite
*/
try
{
matchTodoPushFailureFromDb();//重写未写入消息汇聚中心的待办
matchTodoCompleteFailureFromDb();//重写完成状态不一致的待办
}
系统验证
在方案验证环节,对系统可靠性(消息接收成功率)与及时性(消息的平均延迟时间(毫秒))进行了测试与考察,定义如下:
同时将“Retry Send机制”的触发时间设定为5分钟,时间窗口设定为1分钟,“Rewrite机制”的触发时间设定为10分钟,时间窗口设定为5分钟。java程序循环调用待办推送接口API,分别发送待办2000条、5000条、10000条。在每条消息发出时,记录其发送时间,并和数据库记录生成时间做比较,得到每条消息的延迟时间。三次验证结果分别统计见表1。
验证过程并没有考虑程序本身执行时间以及网络延迟的影响,可见随着消息发送的增加,消息的平均延迟时间差别并不大;另外,在验证过程中,遇到了RabbitMQ因网络连接超时等情况而发送失败的情况,但方案中“Retry send机制”与“Rewrite机制”保证了消息仍然被准确接收,验证了方案的可靠性。
验证过程未包含RabbitMQ的吞吐量测试,有资料表明,RabbitMQ吞吐量可达到5.95w/s,在消息持久化场景下,吞吐量也能达到2.6w/s左右。这也说明AMQP协议为了保证消息的可靠性在吞吐量上做了一定程度的取舍。
应用成效
基于RabbitMQ的智慧校园消息中心方案已在上海财经大学的一站式校园服务门户中投入使用,经过三年多的运行,目前消息中心已实现了面向教职工的75种消息和面向学生的48种消息的汇聚与下发。同时支持用户个性化设置消息接收渠道(门户站内信、手机短信以及微信消息)和业务消息类型,方便师生用户及时、准确的接收个人所关注的业务状态变更提醒,大大提升了业务办理效率以及用户使用体验,取得了很好的应用效果。
本文描述了一种基于RabbitMQ的智慧校园消息中心设计方案以及主要的程序实现。方案在利用RabbitMQ自身可靠性机制的基础上,增加了“Retry Send机制”与“Rewrite机制”,提高了消息接收的整体可靠性。测试验证结果与实际应用成效表明,该方案可以很好的满足高校中的消息集成需求,为高校智慧校园建设提供大力支撑。
(作者:胡庆亮 王珊珊 高亮,单位为上海财经大学信息化办公室)