(15条消息) Kafka原理篇
1 Kafka简介
Kafka是分布式发布订阅消息系统。他最初由Linkedin公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的可划分的冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟地不停流转。传统的企业消息系统并不是非常适合大规模的数据处理。为了已在同时搞定在线应用(消息)和离线应用(数据文件和日志),Kafka就出现了。Kafka可以起到两个作用:
① 降低系统组网复杂度;
② 降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口在插座上,
Kafka承担高速数据总线的作用。
Kafka主要的特点:
① 消息系统的特点:生产者消费者模型,FIFO。
分析:消息系统基本的特点是保证了有基本的生产者消费者模型,partition内部是FIFO的,partition之间不是FIFO的,当然我们可以把topic设为一个partition,这样就是严格的FIFO。
② 高性能:单节点支持上千个客户端,百MB/S吞吐。
分析:接近网卡。
③ 持久性:将消息持久化到普通磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication(备份)防止数据丢失。
分析:直接append到磁盘里面去,这样做的好处:第一个是直接持久化,数据不会丢;第二个是顺序写,然后消费数据也是顺序地读,所以持久化的同时保证顺序,因为磁盘顺序读比较快。
④ 分布式:数据副本冗余、流量负载均衡和可扩展。
分析:分布式,数据冗余就是同一份数据可以分到不同的broker上面去,也就是当一份数据的磁盘坏掉的时候,数据不会丢失,比如3个副本,就是在3个机器磁盘都坏掉的情况下数据才会丢。当然分配副本的时候是按照负载均衡来的。可扩展可以理解为在线扩展,不需要停掉服务。
⑤ 很灵活:消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
分析:消费方式非常灵活,第一是消息持久化时间跨度比较长,一天或者一星期等;第二是consumer自己维护消费状态、Queue模型、发布订阅(广播)的模型和回滚的模型。
⑥ 支持online和offline。
2 Kafka工作原理
2.1 Kafka架构
Kafka的整体架构非常简单,是显示分布式架构,producer、broker和consumer都可以是多个。producer和consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。客户端和服务器端的通信,是基于简单、高性能且与编程无关的TCP协议。几个概念如下:
(1)Broker:消息中间件处理结点,一个Kafka结点就是一个broker,多个broker可以
组成一个Kafka集群。
(2)Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
(3)Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个强有序的队列。一个partition只对应一个broker,一个broker可以管多个partition。
(4)Segment:partition物理上由多个segment组成。
(5)offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫作offset,用于partition唯一标识一条消息。
(6)Producer:消息和数据生产者,向Kafka的一个topic发布一些消息。producer自己决定往哪个partition写消息,可以是轮询的负载均衡,或者是基于hash的partition策略。producer不丢数据:“至少一次,严格一次”。
(7)Consumer:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumer。
每个consumer都有对应的group。
group内是queue消费模型:① 各个consumer消费不同的partition;② 一个消息在group内只消费一次。
group间是public-subscribe消费模型:① 各个group各自独立消费,互不影响;② 一个消息被每个group消费一次。
注:消息传递中“至多一次”、“至少一次”和“正好一次”三种情况:① 至多一次:服务器的消息最多传递一次,如果再传递一次,就会造成负面影响。至多一次不会重试。至多一次在实际中案例是信用卡扣款,最多只能发出一次扣款小四,如果不成功,可以让客户端重新发出扣款请求,但是我们不能在分布式系统内部做出一次以上的动作,会带来负面影响,客户钞票多扣了。② 至少一次:一个消息至少传递一次以上,当然会造成消息内容重复冗余,但是可靠性提高了。至少一次是系统会在故障时重试请求,直至调用成功。③ 正好一次:正好一次传递是不可能的,也就是通过网络两个服务器之间的调用恰好一次就完成正确的通讯是不可能的。正好一次是通过消息接收方发送确认收到的方式试图保障每次消息传递都能可靠性传递完成,这是不可能的,因为这个发送,收到和确认的过程中一旦出现问题,就无法保证传递完成。
2.2 partition
2.2.1 partition中文件存储方式
(1)每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
(2)每个partition只需要支持顺序读写文件就行了,segment文件生命周期由服务器端配置参数决定。
这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
2.2.2 partiton中segment文件存储结构
(1)segment file组成:由2大部分组成,分别为index file和data file,这2个文件一一对应,成对出现,后缀“.index”和“.log”分别表示为segment索引文件、数据文件。
(2)segment文件命名规则:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
上图为一个topic中的一个partition中的segment文件列表。
上图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partition表示第368772个message)、以及该消息的物理偏移地址为497。
Segment data file由许多message组成,下面详细说明message物理结构如下:
2.2.3 在partition中如何通过offset查找message
(1)第一步查找segment file,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log。
(2)第二步通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
注:这种方式极其向MySQL索引的查找方式,大家学习完B+树之后,不妨看一下MySQL innodb下的索引实现原理。
3 Kafka的设计
3.1 吞吐量
高吞吐量是Kafka需要实现的核心目标之一,为此Kafka做了一下一些设计:
(1)数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。
(2)zero-copy(零拷贝):减少IO操作步骤。
(3)数据批量发送。
(4)数据压缩。
(5)topic划分为多个partition,提高并发性。
3.2 负载均衡
(1)producer根据用户指定的算法,将消息发送到指定的partition。
(2)存在多个partition,每个partition有自己的副本,每个副本分布在不同的broker结点上。一般来说下一个副本会在当前broker的下一个broker中。
(3)多个partition需要选取出leader partition,leader partition负责读写,并由zookeeper负责故障转移。
(4)通过zookeeper管理broker与consumer的动态加入与离开。
3.3 拉取系统
由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:
(1)简化Kafka设计。
(2)consumer根据消费能力自主控制消息拉取速度。
(3)consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等。
3.4 可扩展性
当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时做出调整。
(1)消息队列
比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。
(2)行为跟踪
Kafka的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的topic里。那么这些结果被订阅者拿到后,就可以作进一步的实时处理,或实时监控,或放到Hadoop/离线仓库里处理。
(3)元数据监控
作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。
(4)日志收集
日志收集方面,其实开源产品很多,包括scribe、Apache flume。很多人使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如scribe或者flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。
(5)流处理
这个场景可能比较多,也和好理解。保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行阶段性处理、汇总、扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS(Really Simple Syndication是一种描述和同步网站内容的格式)数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。
(6)事件源
事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。
(7)持久性日志(commit log)
Kafka可以成为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。
3.5 零拷贝
零拷贝是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间(user space)而直接在内核空间(kernel space)中传输到网络的方式。
Non-Zero Copy方式:
Zero Copy方式:
从上图中可以清楚地看到,Zero Copy的模式中,避免了数据在用户空间和内存空间之间的拷贝,从而提高了系统的整体性能。零拷贝过程:
① 发出sendfile系统调用,导致用户空间到内核空间的上下文切换(第一次上下文切换)。通过DMA将磁盘文件中的内容拷贝到内核空间缓冲区中(第一次拷贝: hard driver ——> kernel buffer)。
② 然后再将数据从内核空间缓冲区拷贝到内核中与socket相关的缓冲区中(第二次拷贝: kernel buffer ——> socket buffer)。
③ sendfile系统调用返回,导致内核空间到用户空间的上下文切换(第二次上下文切换)。通过DMA引擎将内核空间socket缓冲区中的数据传递到协议引擎(第三次拷贝: socket buffer ——> protocol engine)。
4 Kafka与其他消息队列对比
(1)RabbitMQ:分布式,支持多种MQ协议,重量级。
RabbitMQ也是常见的消息队列,它支持多种MQ的协议,jms等多种协议等等,它的缺点比较重,erlang。
(2)ActiveMQ:与RabbitMQ类似。
ActiveMQ和RabbitMQ类似,支持的协议比较多。
(3)ZeroMQ:以库的形式提供,使用复杂,无持久化。
ZeroMQ是一个socket的通信库,它是以库的形式提供的,所以说你需要写程序来实现消息系统,它只管内存和通信那一块,持久化也得自己写。storm0.9之前,那些spout和bolt,bolt和bolt之间那些底层的通信就是由ZeroMQ来通信的,它并不是一个消息队列,就是一个通信库,在0.9之后呢,因为license的原因,ZeroMQ就由Netty取代了,Netty本身就是一个网络通信库嘛,所以说更合适是在通信库这一层,不应该是MessageQueue这一层。
(4)redis:单机、纯内存性好,持久化较差。
Redis本身是一个内存的KV系统,但是它也有队列的一些数据结构,能够实现一些消息队列的功能,当然它在单机纯内存的情况下,性能会比较好,持久化做的稍差,当持久化的时候性能下降地会比较厉害。
(5)Kestrel:单机,持久化。
Kestrel,这个也是在storm里面可以配合来使用的消息队列,是twitter开源的消息队列,也是单机,它提供的持久化是以日志的情况来写的,如果用的时候,你需要自己来搭建几台Kestrel的服务,自己处理负载均衡,自己去做容错。
(6)Kafka:分布式,较长时间持久化,高性能,轻量灵活。
Kafka分布式的,不需要你在上层做分布式的工作,另外有较长时间持久化,前面基本就被消费者就消费掉了,另外在长时间持久化下性能还比较高,顺序读和顺序写,另外还通过sendFile这样零拷贝的技术直接从内核控件缓存区拷贝到socket相关的缓存区,减少了内存的拷贝次数,还有批量读批量写来提高网络读取文件的性能,最后一点就是比较轻量和灵活。
参考文章:
[1] https://tech.meituan.com/2015/01/13/kafka-fs-design-theory.html
[2] https://www.biaodianfu.com/kafka.html
[3] http://kafka.apache.org/intro
[4] https://blog.csdn.net/u013096088/article/details/79122671
[5] https://blog.csdn.net/u013018618/article/details/80146617