Kafka在车好多的最佳实践
文章作者:葛凯文@车好多
内容来源:作者授权
01
消息队列
1. Kafka简介
Kafka 的官方的介绍:
Apache Kafka® 是一个开源的分布式的事件流平台,有成千上万的公司将 Kafka 作为高性能数据流通道、流式分析和数据整合工具的至关重要的应用来使用。
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines,streaming analytics,data integration,and mission-critical applications.
2. 消息队列的选型
3. 集群管理工具
在公司早期,数据量比较小,仅用两台机器通过手工部署组成一个最小集群。随着公司发展和 Kafka 在公司推广,Kafka 逐渐成为瓶颈,需要做一些扩容等运维操作,手工运维集群的复杂程度上升明显,我们团队开始调研使用一些工具来处理这类操作。我们首先引入了 Cloudera Manager ( 简称CM ),它是 Cloudera 公司开发的一款大数据相关集群安装部署工具,这款工具包括自动化安装部署、集中管理、集群监控报警等功能,可以在短时间内快速安装集群,提高集群管理的效率。但是有一些自定义开发的插件无法通过 CM 直接部署,我们引入了 AWX。它是 Red Hat® 公司提供的一套开源工具,可以通过图形化界面来操作 Ansible,还提供了一些运行记录和定时运行任务等功能。
02
Kafka第一次大升级
使用 Kafka 的一段比较短的时间内,我们只有几台低配置的机器,接入的服务比较少,负载也很低。随着业务逐步增多加上业务旺季数据量爆发式地上涨,网卡流量每天的大部分时段在 70% 以上,磁盘 IO 也处于一个比较高的区间。
我们第一次遇到比较严重的问题,是由于某一个任务启动的时候,从数据库加载了一些信息到 Kafka 中,接受这些数据的 topic 的类型为 compact + delete,数据频繁更新的时候,会不断地刷写文件和删除原文件,导致了这个 topic 的 IO 持续处于高位,影响同磁盘下其他任务的同步,继而导致大量同步延迟,ISR 降低。
1. 原因分析
设备老旧,一台机器只有一块磁盘来抗住 IO 的压力,网卡也仅是一块千兆网卡,在大流量的任务运行时都可能被打满,导致同步延迟和读写数据速度慢
初期集群只有两台机器,因此默认配置为 2 副本,最小可用副本数 1 副本,如果有某个机器宕机,可能出现数据丢失的情况
数据量小但是重要的 topic 和数据量大相对不重要的 topic 在同一个集群上,流量激增的时候,可能导致前者的可用性下降
存在部分历史遗留问题,如:早期使用 Kafka 的时候,没有对 PHP 和 GO 语言的客户端进行验证,partition 的 leader 切换部分客户端无法自动处理;一部分用户使用通过 Zookeeper 连接的老版本的 Kafka 客户端,offset 仍然记录在 Zookeeper,consumer 实例可能出现脑裂且不方便统一做监控等等
我们开始准备升级工作,这时涉及到两种方案:
A 方案是重新搭建服务,由业务方进行迁移
B 方案是在原先的集群上先扩容再缩容,逐步替换掉老的机器,再滚动升级
2. 方案的对比 & 选择
可能风险:A 方案不存在数据同步导致网卡流量和磁盘 IO 激增的情况;B 方案因机器老旧,可能因为数据迁移导致网卡流量和磁盘 IO激增的情况 影响范围:A 方案由业务方逐个控制,遇到问题随时回滚,影响范围小;B 方案的操作同时影响所有的使用方,假如迁移后仅有个别使用方遇到问题需要全部回滚 集群拆分:A 方案可以在迁移时候主动选择使用在线/离线集群;B 方案需要全部指向离线集群,有高优先级需求的业务还需要单独再做迁移 集群管理:A 方案可以通过更成熟的工具,界面化地维护 Kafka 集群;B 方案很难直接接入管理工具
B 方案优势:
跨部门沟通:A 方案沟通复杂,需要建立各个业务方的信任;B 方案只需要通知在某个时间段操作即可
具体业务:A 方案需要协调各个业务方的时间,跨部门使用的 topic 迁移存在先后关系;B 方案可以忽略此问题
旧版客户端:A 方案需要逐个迁移,不再提供通过 Zookeeper 连接的方式,需要业务方在迁移前清理历史遗留问题;B 方案不需要推动业务方清理历史遗留问题
基于以上的方案对比,我们在迁移的前置工作中,除了提供稳定的集群外,还做了以下几件事情:
开发一个小工具,可以将 Zookeeper 上的 offset 信息同步到 broker 中,当上线新的连接方式的消费的时候,可以直接从对应 offset 开始消费,无需业务方开发额外代码,并维护了一份迁移的文档以帮助业务方理解和使用该工具。以此来解决老旧客户端的问题
设计先在新集群上线消费者,然后迁移生产者的方案并形成文档,在跨部门跨项目的场景中,建议使用此方案进行迁移,降低沟通成本
快速上线 topic 的元信息维护系统,记录 topic 的使用方信息,便于沟通
在业务方迁移时候,Kafka-admin 团队驻场,有任何问题可以第一时间解决
单机的服务的可用性,受到众多硬件环境及软件本身的因素影响,可用性远小于集群服务的可用性,但是将使两个可用性只有 99% 的集群并联起来,整体服务的可用性也可以达到 99.99%。因此我们规划了多个 Kafka 集群的方案,来提升整体 Kafka 服务的可用性,避免单个 Kafka 集群的可用性降低带来对整体的影响。
拆分集群:
业务专用集群一般为极重要 ( 可以特批机器 ) 的业务,只为某一个业务服务,用极低的资源冗余,换取高 SLA,所有数据会同步到 offline 集群
online 集群作为'重要' ( 目前定义重要是:直接影响用户 ) 业务使用的集群,用一部分资源冗余,换取相对高的 SLA,所有数据会同步到 offline 集群
offline 集群作为数据的全集,会从 online 集群和重要业务单独集群同步数据过来,其余与线上业务无关的数据同步任务,会使用这套集群,另外如果其他集群有不可短时间立即恢复的问题出现,可以立即切换域名指向 offline 集群,把对下游应用影响的时间压缩到最短
分析集群,从 offline 集群同步指定的 topic,为分析类任务的批量读取提供服务
04
AVRO
对 Kafka 的集群进行拆分之后,服务的稳定性得到了保障。我们团队的主要工作方向慢慢向帮助技术人员提效和降低大数据技术使用门槛发展。一方面,跨业务消息传递无强制格式要求,业务方通常使用 Json 类型字符串,虽然 Json 使用灵活,但是如果业务产生变更,业务上下游的通知不及时,很容易解析异常;偶尔也会有一些误发送其他 topic 的数据或不完整格式的脏数据被发到集群中,也会导致消息解析失败。另一方面,我们也开始规划以 Kafka 为核心链路,向其他存储如:ES / Hadoop ( Hive,HBase ) / KUDU / ClickHouse / NEO4J 等组件同步数据的方案。业务上只需要一次把数据写入 Kafka 中,即可在各个其他需要的组件中使用该数据,降低了其他组件的接入门槛,从传统的安装大数据环境 → 数据导入→ 清洗为列式存储再进行查询,到在业务代码发送数据到 Kafka 后直接通过查询引擎 ( 如 Presto ) 查询,便利地享受大数据带来的红利。结合了跨业务方对数据格式的需求和下游系统大部分需要依赖数据的结构同步,我们引入了 AVRO 的序列化方式。
1. Schema registry 和兼容性
2. AVRO 类型消息收发过程
3. SDK
1. Kafka connect
从 Kafka 到其他组件,简单来看就是消费消息到下游组件。我们选择了 Kafka connect 作为数据的输入输出的平台,通过开发和改造插件的形式,灵活控制增减任务。什么是 Kafka connect ? Kafka Connect是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。Kafka connect 以插件的形式,完成不同上下游组件的数据输入输出。我们除了可以使用一些开源的插件来完成这些工作,还可以自定义开发,完善符合业务要求的业务逻辑,使用起来非常灵活。
2. Source
我们早期使用的 canal 同步 binlog 到 Kafka 中,但是在我们选型的时候,canal 还是主从结构,嵌入到 Kafka connect 中不是很方便;现在来看 Debezium 也是个不错的选择,不过在选型的时候,它还没发布正式版本,当时也没有选择。最终我们选择了Maxwell,一个比较小巧的工具改造起来比较方便,也便于嵌入 Kafka connect。
3. Sink
写入数据到 ElasticSearch,为数据的搜索提供服务
写入到图数据库 Neo4J,为图谱做数据源
写入到 Redis,维护最新的数据,提升缓存命中率等等
① HDFS
我们经常遇到一些场景,需要将一些比较像日志类型的结构化的数据进行永久存储,存储到 MySQL 之类的关系型数据库的话,数据量比较大,优先级和重要程度又没有这么高,积累下来查询又比较慢,还可能影响重要业务的查询。我们为了解决这类问题,引入了同步数据到 HDFS ( Hive ),在发送的时候可以使用异步方式发送到 Kafka,不会 block 主要业务流程,查询的时候可以通过 Presto 等 OLAP 引擎查询,提升查询速度。
时效性与小文件:
Kafka 里边的数据本身是流式的,写入到 HDFS ( Hive ) 就需要考虑多久做一次文件的刷写,文件刷新频率过低,业务上查询数据的实时性就会变差;反之提升文件刷新频率,HDFS 对小文件又非常不友好,日积月累会导致 HDFS 集群的巨大压力。我们结合业务方对时效性的要求和公司 HDFS 的现状,制定了生成文件的规则:每 5 分钟进行一次文件的刷写。
小文件问题,需要合并:
如果只是限制了每 5 分钟刷写一次文件,每个 partition 每天仍然要产生将近 300 个文件,长期积累下去,小文件的个数仍然会对 HDFS 的服务造成影响,我们开发了一套小文件合并的程序,在晚上把每天每个 partition 的小文件合并掉成一个。简单介绍一下合并的过程:
通过 spark,将当天的分区的数据 select insert into 一个临时目录 校对新生成的数据和原始数据是否一致 将原分区目录中的文件移动到一个临时目录,如无需回滚 7 天后删除 将新生成的数据文件移动到原目录
通过以上的合并方案,能够有效控制小文件个数在一定范围,降低对 HDFS 的影响。
我们另外一类比较大的应用场景是将数据同步到 HBase 和 KUDU 中,为离线和在线的数仓提供服务。车好多集团和很多流量类型的互联网公司的一个比较大的区别是: 业务周期比较长,数据更新频繁。一辆车从开始入库到多项检测,再到销售过程,周期最长可能会持续好几个月。这样就会有一些订单的周期会横跨多个月,并且经常产生状态的更新。HBase 和 KUDU 可以帮助我们更新数据的变化,利用这一特性,将前文提到的 Kafka 中的 MySQL binlog 数据 sink 到 HBase 和 KUDU 中。如前文提到的 Avro Schema 兼容性问题,当数据库的数据格式在规范内进行变更,下游存储可以自动适应新的数据结构,将新的表结构同步到 KUDU 和 Hive on HBase 的表中。为数仓系统提供近实时的 ODS 层数据,支持离线和实时的报表统计需求,这是 CDC 产生的数据的一种主要应用形式。
数仓体系
4. 遇到过的问题
① 版本的问题:
我们开始使用的版本存在两个比较明显的问题:
每次添加任务会做一次全局的rebalance,这会使大多数不需要做 balance 的任务重启。等到全部任务再回到正常状态,整个集群至少会产生秒级别的数据延迟
任务重启的时候,有概率会导致原任务停止失败,又重新拉起了新的任务。因为绝大多数是 sink 类型的任务,受到 partition 个数的限制,初期体现并不明显,增加了 source 类型的任务,这个问题才逐渐凸显
② 升级:
计划升级之后,我们梳理了一下老的集群,当初为了避免全局 rebalance 带来的影响,于是建立了多套小集群,在升级之后我们希望能合并成尽量少的集群,方便管理。升级过程中仍然发现了许多问题:
一些与业务方约定的内容和公司规范有关的自定义开发内容代码提交混乱,升级时无法快速打入新版本 多个小集群有不同的 Kafka connect 版本和不同的组件的插件版本
日志和 JMX 端口配置混乱,不利于错误排查,上线部署流程不规范
借着升级的机会,我们:
重新梳理了自定义开发的代码
统一了多个版本的 Kafka connect 和相关插件
统一了监控配置和部署流程
时刻考虑写下的代码是否是可维护的,尤其是在一些比较成熟的项目中的二次开发,规范 review 机制,制定二次开发代码提交的规范 规范部署流程,通过 Ansible 的标准部署方式,配置文件通过 git 管理,做到配置文件的版本可追溯 升级过程中如果遇到问题,应该立即回滚,而不是在线上环境做更多的尝试,有可能导致二次事故
06
我们使用的组件大多是有一些开源的工具,提供给我们最通用的基础功能,但是仍然有一些公司内部深度定制的功能,已经权限相关的功能需要增加。
1. Kafka 平台
相比很多开源的 Kafka 提供的工具,我们做了一些建设,除了开源常用的功能外,补充了几个我们使用频率比较高的需求:
开源工具有很多可以对 topic 等内容操作的功能,我们调整了只开放读操作 增加通过 offset 或者时间点获取数据的方式 增加一些公司内部约定好的格式,如 avro 序列化,可以直接解析并展示 展示一些 topic 所属部门的相关信息,方便沟通 增加生产消费的监控展示和一些报警的订阅
2. schema 工具
Avro Schema registry 主要的作用是管理 schema 的历史版本,这里边涉及到兼容性的问题 ( 可以参考官网 )。我们选择的兼容性如前文提到的,只允许在原 schema 的基础上追加带有默认值的字段。我们维护了一套 schema 的管理工具,主要功能如下:
按照事先约定的规范,对提交的内容做格式检查 每个业务部门有自己的组,组内人员可以提交 schema,小组 leader 进行审核 自动化检查提交的 schema 是否符合规范和兼容性,符合则提交到 git 中,对修改内容的版本做记录 通过 git 的 CI/CD,将提交的 schema 进行编译,生成 go 代码和 jar 包提交到 git 和公司 Maven 私服
通过这套流程,让 schema 的修改标准化,并且自动化地做好编译打包工作,流程完毕之后可以直接引用。
07
总结与展望
今天的分享就到这里,谢谢大家。