kafka学习笔记一:整体了解kafka

kafka

消息队列

因为mq很常见嘛所以就不多介绍啦,看一下消息队列的两种模式

点对点模式

消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费后,queue中不再有存储,所以消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对于一个消息而言,只有一个消费者可以消费。(其实kafka的每个consumer group内可以理解为是点对点模式)

消息/订阅模式

消息生产者(发布)将消息发布到topic中,同时有多个消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。也就是kafka使用的模式

基础架构

image-20221008131302682

这张图感觉看过好多好多遍。。。

  • Producer:消息生产者(发布)将消息发布到topic中,同时有多个消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
  • Consumer:消息消费者,向kafka broker取消息的客户端
  • Consumer Group:消费者组,由多个consumer组成。消费者组每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者属于某个消费者组,即消费者组是逻辑上的一个订阅者
  • Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成,一个broker可以容纳多个topic
  • Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic
  • Partition:为了实现扩展性,一个非常大的topic分为多个partition,partition可以理解为物理分区,是最小的存储单元,掌握着一个 Topic 的部分数据。
  • Replication:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower

Partition

作为kafka的存储核心,还是单独捞出来写一下吧

Partition 中的每条记录都会被分配一个唯一的序号,称为 Offset(偏移量),Offset 是一个递增的、不可变的数字,由 Kafka 自动维护,当一条记录写入 Partition 的时候,它就被追加到 log 文件的末尾,并被分配一个序号,作为 Offset。

消息的顺序性:一个 Topic 如果有多个 Partition 的话,那么从 Topic 这个层面来看,消息是无序的。但单独看 Partition 的话,Partition 内部消息是有序的。所以,一个 Partition 内部消息有序,一个 Topic 跨 Partition 是无序的。如果强制要求 Topic 整体有序,就只能让 Topic 只有一个 Partition。

一个 Kafka 集群由多个 Broker(就是 Server) 构成,每个 Broker 中含有集群的部分数据,Kafka 把 Topic 的多个 Partition 分布在多个 Broker 中。

好处:

  • 增加吞吐量,把partition放在不同的broker里,那么整个topic的吞吐就不会受限于一台机器的i/o性能
  • 增加consumer消费能力,把不同的consumer分配去消费不同的partition,Consumer 的多个实例就可以连接不同的 Broker,大大提升了消息处理能力。
  • 增加数据备份,一个partition在不同的broker分为leader和follower去备份数据

存储

看一下kafka的存储机制:

image-20221008142532471

其实就是分片和索引的机制,每个index文件里面都会以当前segment的第一条消息的offset命名,并且顺序向下记录,本质上就是实现了一套稀疏索引。

要查找一个offset的数据的时候,先去遍历每个segment,定位到该offset在哪个segment里面,再在segment里面查找到对应offset的数据

Producer

生产者负责的是发送消息到对应的partition,所以需要确认的有以下件事

一:发送数据到哪个partition

二:partition对应的broker是哪个

三:如何保证数据已经发送成功,需不需要重试

分区器

其实分区器前面还有拦截器和序列化器,分别做了一些数据处理和数据序列化的逻辑,之后分区器用来确定需要发送的分区,kafka提供了三种分区方式:

  1. 指定partition的情况下,直接将指定的值作为partition值
  2. 没有指定partition值但有key的情况下,将key的hash 值与 topic 的 partition 数进行取余得到 partition 值
  3. 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

broker信息

这个后面会提到,kafka的集群是借助zk管理的,也就是说每个brocker都会向zk注册。而kafka集群中会选举一个broker作为controller,controller会负责整个集群的主题管理(包括主题创建,删除,分区分配,增加和重分配)等操作。

Kafka使用了一种称为”Metadata Request”的机制来获取Broker的元数据信息。当Producer需要发送消息到指定Partition时,它会向Kafka集群发送一个Metadata Request请求,请求包含了目标Topic和Partition的信息。

Kafka集群中的一个Broker(通常是Controller)会接收到这个请求,并返回包含了目标Partition所在的Broker的元数据信息的响应。这个元数据信息包括了Broker的地址(IP和端口),Producer可以根据这些信息来建立与目标Broker的连接。

producer本地有一个属性会保存集群中broker的列表,但是Kafka的元数据信息是动态变化的,因为集群中的Broker和Partition可能会发生变化。因此,Producer通常会定期或在需要时发送Metadata Request来获取最新的元数据信息,以确保消息发送到正确的Broker和Partition。

数据可靠性

为保证producer发送的数据,即可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement 确认收到),如果producer收到ack,就会进行下一轮发送,否则会重新发送数据。

那么什么时候发送ack呢?

首先producer发送的一定是leader节点,leader节点收到数据之后会有两个操作,一个是落盘,一个是同步数据到follower,所以这里kafka提供了三种不同的ack返回方案配置:

  • 0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接受到还没有写入磁盘就已经返回,当broker由故障的时候有可能丢失数据
  • 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果follower同步成功之前leader故障,那么将丢失数据
  • -1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果follower同步成功之后,broker发送ack之前,leader发生故障,那么会操成数据重复

0和-1,一个是at most once,最多只发一次,数据不重复,但是会丢失,一个是at least once,数据保证不会丢失但是会重复,所以用容易丢失的方案则要考虑数据是否重要,用at least once的方案就需要考虑数据幂等性

至于all的方案,zk我们知道,是只要保证有一半以上的follower同步成功就ok,但是kafka选择的则是全部follower都同步成功才可以,因为如果用zk的方案,2n+1,那么就会有n台机器是增加出来的应付故障的冗余数据,zk的znode节点数据相当小,但是kafka这种侧重数据存储的逻辑会造成n台机器的数据冗余,成本过高

ISR

那么如果其中有一台机器出问题了同步不了怎么办?ack就一直卡在那里么?

Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间未向leader 同步数据 , 则该 follower将 被 踢 出 ISR,该时间阈值由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。

数据幂等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> Kafka 通过 Producer Id(生产者标识符)和 Sequence Number(序列号)来保证消息不会被重复发送。以下是 Kafka 如何实现这一点的工作原理:
>
> Producer Id(PID): 每个 Kafka 生产者在初始化时都会获得一个唯一的 Producer Id。Producer Id 是一个持久性标识符,通常与生产者的客户端实例相关联。Kafka 会跟踪每个生产者的 Producer Id。
>
> Sequence Number(序列号): 每个请求(消息)都包含一个单调递增的序列号。序列号从1开始,每次递增1。序列号用于标识请求的顺序。
>
> Producer Id 和 Sequence Number 的存储: Kafka 将 Producer Id 和 Sequence Number 存储在每个分区的分区日志中。这个存储允许 Kafka 跟踪每个分区的最新序列号。
>
> 消息发送过程: 当生产者发送一条消息到 Kafka 时,消息会携带生产者的 Producer Id 和序列号。Kafka 会根据 Producer Id 和序列号来验证消息的幂等性和顺序性。
>
> Kafka 服务器的处理: Kafka 服务器会在接收到消息后,首先检查 Producer Id 和序列号是否已经在该分区的分区日志中出现过。如果这个 Producer Id 和序列号已经存在,说明消息已经被处理,Kafka 将丢弃重复的消息。如果 Producer Id 和序列号是新的,Kafka 会接受消息并更新分区的最新序列号。
>
> 重试机制: 如果生产者因某种原因未收到来自 Kafka 的确认(ACK)并怀疑消息未被成功发送,它会重新发送相同的消息。由于消息中包含了 Producer Id 和序列号,Kafka 可以识别重复的消息并再次将其过滤掉。
>
> 通过这个机制,Kafka 确保了消息不会被重复发送。每条消息都有唯一的 Producer Id 和序列号,这使得 Kafka 能够在接收消息时检查重复,并且保持消息的幂等性和顺序性。这对于确保数据的可靠性和一致性非常重要,尤其在分布式系统中。
>

Consumer

消费模式

pull模式:消费者从broker中主动拉取数据

好处:

  1. 消费者可以根据自己的消费速率来控制拉取速度
  2. 消费者可以自由决定拉取的数据量,比如我们之前一批一百个拉数据处理,可以避免高频少量数据写入ck
  3. 可以通过增加消费者的数量来增加吞吐量(我理解要结合partition数量来看)

上面讲到Kafka采用pull模式获取数据, 在没有数据时会进行空转, Kafka针对这一点采用在消费者消费数据时传入一个时长参数 timeout, 如果没消息是, 消费者等待该 time

分区分配策略

我们已知一个consumer group内采用的是点对点的策略,也就是说每个consumer消费的内容是不一样的,kafka采用的是把partition分配给consumer的逻辑,那么把哪个分区分配到哪个consumer呢?

Range

对于每个topic,Range是对每个Topic而言的(即一个Topic一个Topic的分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按字母顺序进行排序,然后用partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区,如果除不尽,那么前几个消费者线程将会多消费一个分区。

这样对于单个topic来说,分配是尽可能平衡的,但是如果这个consumer消费了多个topic呢?

假如我们有两个topic T1,T2,分别有10个分区,最后的分配结果将会是这样

1
2
3
C1-0:T1(0,1,2,3) T2(0,1,2,3)
C2-0:T1(4,5,6) T2(4,5,6)
C2-1:T1(7,8,9) T2(7,8,9)

如上,只针对一个topic而言,C1-0消费者多消费1个分区影响不是很大。如果有 N 多个 topic,那么针对每个 topic,消费者 C1-0 都将多消费 1 个分区,topic越多,C1-0 消费的分区会比其他消费者明显多消费 N 个分区。这就是 Range 范围分区的一个很明显的弊端了

RoundRobin

总觉得很耳熟不知道为啥

RoundRobinAssignor策略的原理是将消费者组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询的方式逐个将分区以此分配给每个消费者。

使用RoundRobinAssignor策略有两个前提条件必须满足:

  1. 同一个消费者组里面的所有消费者的num.streams(消费者线程数)必须相等
  2. 每个消费者订阅的主题必须相同
    1. 如果消费者订阅的主题不一样的话,如果某个消费者没有订阅某个topic,那么分配的时候会跳过这个消费者,会导致分配的不均匀

StickyAssignor

代码逻辑本身很复杂,没仔细看,比起前面的分配原则可以看一下区别

sticky这个词的意思是黏性的,其实也就是说重分配的时候,尽量把原来就在这个consumer下的分区保留,这样可以减少销毁和重新建立连接的开销

Offset

消费者offset

每个消费者在消费每个分区的时候都会记录自己当前消费的offset,那么如果consumer出现故障,或者是服务重新发布之后,需要从之前的offset开始消费,所以这个offset不能被consumer自己保存,自己保存的话服务重建就不见了

kafka本身保存了消费者offset,可以想象成一个kv

key是由consumer group id + topic + partition 共同构成的复合key,value是offset

这个offset在kafka 0.9版本的时候是保存在zk的,但是zk这玩意儿频繁写不是什么好事,所以新版把这个值写入了kafka的一个内部topic,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

partition水位

partition水位也就是partition当前的最大可读offset的值,这里引入两个概念HW(high watermark)和LEO(log end offset)

hw就是当前partition最大可读的offset

image-20221010155119686

当follower挂了,follower发生故障后会被临时提出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

如果是leader挂了呢?leader发生故障后,会从ISR种选出一个新的leader,之后,为保证多个副本之间数据一致性,其余的follower会先将各自log文件高于HW的部分截掉,然后从新的leader同步数据。

虽然我不大确定leader的选举机制,但是i guess是leo最大的

那么hw是怎么同步的?

  1. 首先,生产者不停地向Leader写入数据,这时候Leader的LEO可能已经达到了10,但是HW依然是0,两个Follower向Leader请求同步数据,他们的值都是0。

  2. 然后,消息还在继续写入,Leader的LEO值又发生了变化,两个Follower也各自拉取到了自己的消息,于是更新自己的LEO值,但是这时候Leader的HW依然没有改变。

  3. 此时,Follower再次向Leader拉取数据,这时候Leader会更新自己的HW值,取Follower中的最小的LEO值来更新。
  4. 之后,Leader响应自己的HW给Follower,Follower更新自己的HW值,因为又拉取到了消息,所以再次更新LEO,流程以此类推。

读写

缓冲和缓存

缓存区是内存和cpu之间的存储区,用来存储cpu多次从内存读取到的数据,多次读取同一变量,就会将该变量放到缓存区中,当再次调用时直接从缓存区取,不从内存中取,当内存中该变量改变时,这种情况会导致内存不可见问题;这样做的目地是比访问内存更快。具体可以自己看高速缓存

缓冲区是内存中的存储区, 缓冲区分为内核缓冲区和用户缓冲区;功能不同。

内核缓冲区是网络传输,文件io,控制台输入输入等操作的“中转站”,为不可见内存,即用户不能直接使用。

内核缓冲区是为了暂时存储磁盘数据,因为每次从磁盘读数据开销非常大,所以比方说文件读取,最大的性能瓶颈就卡在了读磁盘上

  1. 数据预读
    1. 数据预读指的是,当程序发起 read() 系统调用时,内核会比请求更多地读取磁盘上的数据,保存在缓冲区,以备程序后续使用。这种数据的预取策略其实就是基于局部性原理
    2. 因此当我们向内核请求读取数据时,内核会先到内核缓冲区中去寻找,如果命中数据,则不需要进行真正的磁盘 I/O,直接从缓冲区中返回数据就行了;如果缓存未命中,则内核会从磁盘中读取请求的 page,并同时读取紧随其后的几个 page(比如三个),如果文件是顺序访问的,那么下一个读取请求就会命中之前预读的缓存(当然了,预读算法非常复杂,这里只是一个简化的逻辑)。
  2. 延时回写
    1. 延时回写指的是,当程序发起 write() 系统调用时,内核并不会直接把数据写入到磁盘文件中,而仅仅是写入到缓冲区中,几秒后(或者说等数据堆积了一些后)才会真正将数据刷新到磁盘中。对于系统调用来说,数据写入缓冲区后,就返回了。
    2. 延迟往磁盘写入数据的最大一个好处就是,可以合并更多的数据一次性写入磁盘,把小块的 I/O 变成大块 I/O,减少磁盘处理命令次数,从而提高提盘性能。
    3. 另一个好处是,当其它进程紧接着访问该文件时,内核可以从直接从缓冲区中提供更新的文件数据(这里又是充当 Cache 了)。

用户缓冲区则是可见内存,对用户来说可以直接使用。

因为内核缓冲区是不可见的,所以如果用户需要读内核缓冲区的数据,则需要切换到内核态,把数据读出来再切换到用户态放进堆里,涉及到上下文切换和堆栈变化。所以后来就设计了用户缓冲区,和内核缓冲区的操作是一样一样的,就是作用不一样,一个是为了减少磁盘io,一个是为了减少上下文切换

零拷贝

那么说完上面的先看一下正常来说查看一个磁盘文件的流程:

Step1: 从磁盘中读取数据到内核缓冲区

Step2: cpu从缓存中拷贝到用户缓冲区

image.png

写同理,用户缓冲区->socket缓冲区->网卡

对于kafka来说整个IO的过程需要进行两次DMA拷贝,两次CPU拷贝,四次上下文切换。总共四次拷贝,四次切换。这个代价确实有些大。

那么零拷贝技术倒也不是说真的就不拷贝了,而是尽量根据不同场景简化文件读写的拷贝次数,从而尽量减少上下文切换,提高数据传输效率。

歪个楼,虽然不是kafka的零拷贝,但是也是一种零拷贝技术:mmap+write,本质上就是利用mmap将内核缓冲区的的地址直接映射到用户缓冲区,具体可以看一下这个博客:https://baijiahao.baidu.com/s?id=1769186849807925293&wfr=spider&for=pc

img

而kafka用到的就是文中的sendfile()方法,基于java.nio包下的FileChannel.transferTo()实现零拷贝。简单解释就是不需要走一遍用户进程和用户缓冲区的处理,直接把数据copy到socket缓冲区

img

这种方式下用户程序不能对数据进行修改,而只是单纯地完成了一次数据传输过程。整个拷贝过程会发生 2 次上下文切换,1 次 CPU 拷贝和 2 次 DMA 拷贝。

page cache

Kafka 对消息的存储和缓存严重依赖于文件系统。人们对于“磁盘速度慢”具有普遍印象,事实上,磁盘的速度比人们预期的要慢的多,也快得多,这取决于人们使用磁盘的方式。

Kafka重度依赖底层OS提供的page cache功能。当上层有写操作时,OS只是将数据写入到page cache,同时标记page属性为dirty。当读操作发生时,先从page cache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。

个人理解page cache就是上面提到的内核缓冲区

zookeeper

在这里插入图片描述

Controller

这就又要提到前面写过的controller了

怎么选出来的呢controller,就是和zk帮忙选master一样,所有broker都会去注册controller节点,最小的那个就是当前的controller

  1. 主题管理:这里的主题管理,就是指控制器帮助我们完成对 Kafka 主题的创建、删除以及分区增加的操作。换句话说,当我们执行kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。
  2. 分区重分配:主要是指,kafka-reassign-partitions 脚本提供的对已有主题分区进行细粒度的分配功能。这部分功能也是控制器实现的。
  3. Preferred 领导者选举:主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。后面会详细介绍
  4. 集群成员管理:这是控制器提供的第 4 类功能,包括自动检测新增 Broker、Broker 主动关闭及被动宕机。
    1. 这种自动检测是依赖于zk的Watch 功能和 ZooKeeper 临时节点组合实现的。比如,控制器组件会利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。
    2. 侦测 Broker 存活性则是依赖于zk的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。
  5. 控制器的最后一大类工作,就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

节点注册

看上面的图就能看出来,broker注册在broker/ids节点下,topic注册在broker/topics下

消费者组注册在consumers/{group_id}节点下,下面有三个子节点分别是:

ids:消费者

owners:该消费者组消费的topic

offsets:记录每个topic每个partition的offset

image-20221009184409066

分区重分配

当集群中增加或者broker宕机的时候,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配。

  1. 生成一个分区分配方案
  2. 先通过控制器为每个分区添加新副本(增加副本因子) , 新的副本将从分区的 leader副本那里复制所有的数据。根据分区的大小不同, 复制过程可能需要花一些时间, 因为数据是通过网络复制到新副本上的。
  3. 在复制完成之后,控制器将旧副本从副本清单里移除(恢复为原先的副本因子数)。注意在重分配的过程中要确保有足够的空间。

Leader和Follower

先说一下replica的概念,就是对同一个分区,为了保证数据不丢失,会给数据搞多个副本用于存储和同步,也就是leader和follower,不过和zk不一样,kafka的follower只负责同步,所有的数据生产和消费都是走leader的

常见的有以下几种情况会触发Partition的Leader Replica选举:

  1. Leader Replica 失效:当 Leader Replica 出现故障或者失去连接时,Kafka 会触发 Leader Replica 选举。
  2. Broker 宕机:当 Leader Replica 所在的 Broker 节点发生故障或者宕机时,Kafka 也会触发 Leader Replica 选举。
  3. 新增 Broker:当集群中新增 Broker 节点时,Kafka 还会触发 Leader Replica 选举,以重新分配 Partition 的 Leader。
  4. 新建分区:当一个新的分区被创建时,需要选举一个 Leader Replica。
  5. ISR 列表数量减少:当 Partition 的 ISR 列表数量减少时,可能会触发 Leader Replica 选举。当 ISR 列表中副本数量小于 Replication Factor(副本因子)时,为了保证数据的安全性,就会触发 Leader Replica 选举。
  6. 手动触发:通过 Kafka 管理工具(kafka-preferred-replica-election.sh),可以手动触发选举,以平衡负载或实现集群维护

leader选举

用zk的master原则,brokers注册顺序最小的就会变成leader

leader负载均衡

理论上我们肯定希望leader分布在不同的broker上,这样所有的broker都可以承担起一部分i/o,不会导致某个broker负载过高

Preferred Replica是Kafka中的一个概念,用于指定每个分区的首选副本(Preferred Replica)。首选副本是指在进行Leader选举时,优先选择作为分区Leader的副本。