武汉首支数字产业基金成立,总规模10亿元
06-18
使用 kafka 可以实现系统解耦、削减流量峰值、缓冲、实现系统间异步通信等。Kafka 非常适合在活动跟踪、消息传递、指标、日志记录等场景中使用。
和流媒体。本文主要介绍kafka中的基本概念。
1.kafka的整体结构下图展示了kafka的很多细节。暂时先不关注:图中展示了kafka的一些重要组件。
让我们一一介绍一下。 1.1 Broker服务代理节点。
其实就是一个kafka实例或者服务节点,多个broker构成了kafka集群。 1.2 生产者生产者。
也就是说,写入消息的一方将消息写入代理。 1.3 消费者。
也就是说,读取消息的一方从代理处读取消息。 1.4 消费者组消费者组。
一个或多个消费者组成一个消费者组,不同的消费者组可以订阅同一主题的消息,而不会互相影响。 1.5 ZooKeeperkafka 使用zookeeper管理集群元数据、控制器选举等操作。
1.6 主题。每条消息都属于某个主题,Kafka按照主题来划分消息,这是一种逻辑分类。
1.7 分区分区。同一主题下的消息可以进一步分为多个分区,一个分区只属于一个主题。
1.8 复制品。一个分区可以有多个副本以提高灾难恢复能力。
1.9 如果Leader和Follower分区有多个副本,则需要同步方法。 Kafka采用一主多从的方式进行消息同步。
主副本提供读写能力,从副本不提供读写能力,仅作为主副本的备份。 1.10 偏移偏移。
分区中的每条消息都有其所在分区的偏移量。该偏移量唯一标识了消息在当前分区中的位置,并保证了该分区内的顺序,但不保证跨分区的顺序。
。简单来说,Kafka作为一个消息系统,本质上是一个数据系统。
既然是数据系统,我们需要解决两个根本问题:当我们把数据交给kafka时,kafka如何存储;当我们要求kafka取回数据时,kafka是如何返回的。 2.如何存储消息(逻辑层面) 目前,大多数数据系统都以append log和B+树的格式将数据存储在磁盘上。
Kafka使用附加日志格式将数据存储在磁盘上。整体结构如下图: 追加日志格式可以提高写入性能(毕竟只需要追加到日志文件的后面即可),但同时也需要支持,不是很友好。
为了提高读取性能,Kafka需要额外的操作。 Kafka数据如何存储是一个比较大的问题。
让我们从逻辑层面开始。 2.1 Topic+Partition的两层结构 Kafka对消息进行两层分类,即Topic和Partition。
将一个主题划分为多个分区的好处是显而易见的。多个分区可以为Kafka提供可扩展性和水平扩展能力,分区的冗余也可以提高数据的可靠性。
不同的分区也可以部署在不同的broker上,冗余副本提高可靠性。 2.2 Offset 对于追加日志格式,新数据只需要追加到文件末尾即可。
对于具有多个分区的主题,每条消息都有一个需要追加的对应分区(分区器)。该消息在其所在的分区中有一个唯一的标识符,即偏移量offset: 这样的结构有以下特点: 分区提高了写入性能和数据可靠性;保证消息在分区内是连续的,但不能跨分区。
了解了 Kafka 在逻辑层面如何存储消息后,我们看看如何以用户的身份写入和读取数据。 3.如何写入数据。
接下来我们从用户的角度来看一下如何将数据写入Kafka。 3.1 总体流程 生产者向Kafka写入消息的总体流程如下所示: 生产端有两个主线程:main和sender,两者通过共享内存RecordAccumulator进行通信。
步骤如下:KafkaProducer创建消息;生产者拦截器在发送消息之前做一些准备工作,比如过滤不符合要求的消息、修改消息内容等;序列化器将消息转换为字节数组; Partitioner计算消息的目标分区,然后将数据存储到RecordAccumulator中;发送线程获取发送数据;创建特定请求;如果请求过多,则会缓存部分请求;将发送准备好的请求;发送到kafka集群;接收回复;干净的数据。缓存是在消息累加器RecordAccumulator中执行的。
缓存大小通过参数buffer.memory配置,默认为32MB。累加器按照分区来管理每条消息,消息被组织成ProducerBatch的形式(大小由batch.size控制,默认1MB)。
为了提高吞吐量并减少网络请求数量,ProducerBatch 可能包含一条或多条消息。当消息不多时,一个Batch可能不会被填满,但也不会等待太久。
可以通过linger.ms控制等待时间,默认为0。增大该值可以提高吞吐量,但会增加延迟。
当产生消息的速度太快且缓存已满时,继续发送消息可能会出现阻塞或异常。这是由参数 max.block.ms 控制的。
默认值为 60 秒。数据到达并由发送线程创建请求后,需要根据需要发送到的 Broker 节点进行重新组装和分组。
每个节点都是一个连接。每个连接可以缓存的请求数量由 max.in.flight.requests.per.connection 控制,默认 5。
每个请求的大小由 max.reqeust.size 控制,默认 1MB。 3.2 发送方式 发送消息的方式有3种: 即发即忘:只发送,不考虑结果,性能最高,可靠性最差;同步(sync):等待集群确认消息写入成功然后返回,这是可靠的,性能最高,但性能差很多;异步(async):指定一个回调,在kafka返回响应后调用,以确认异步发送。
前两个是同步发送,后一个是异步发送。不过这里的异步发送并没有提供回调能力。
那么生产者发送消息后,Kafka如何确认消息呢?这就涉及到acks参数:acks=1,默认值为1,表示只要该分区的leader副本写入成功就成功; acks=0,生产者不需要等待任何服务器响应,数据可能会丢失; acks=-1或acks=all要求所有处于synchronized状态的副本都确认写入成功,可靠性最高,但性能较差。 3.3 生产者的重要参数 4. 如何读取消息 4.1 消费消息 4.1.1 消费模式 一般来说,消息消费有两种模式:推送模式和拉取模式,kafka 中的消费是基于拉取模式。
消费者通过不断调用poll来获取消息进行消费。基本模式如下(伪代码): 代码语言: go copy while(true) {records := Consumer.Pull() for record := rangerecords { // do Something with record }}4.1.2 偏移量提交中的消息kafka 有一个偏移唯一标识符。
对于消费者来说,每次消费消息时都需要通知kafka,这样下次拉取消息时就不会再拉取已消费的消息了。数据(不考虑重复消费)。
该消费者消费过的消息的位置就是消费位移。例如:假设当前拉取了该消息的最大偏移量并已被消费,则该消费者的消费位移为 ,待提交的消费位移为 ,表示下一条消息需要被消费的消息的位置拉。
消费者可能会一次拉取多条消息,这样提交方式就会出现问题。 Kafka默认采用自动提交,即每5秒自动拉取每个分区内的最大消息位移(相关参数为enable.auto.commit和auto.commit.interval.ms)。
然而,这可能会导致重复消费和数据丢失。先看重复消费:上次提交的消费位移为 ,说明之前的消息全部被消费;本次拉取的消息为,因此,本次消费成功后,唯一需要提交的是;消费者当前正在处理消息。
如果此时消费者挂掉了,而此时还没有提交的话,中间的消息就会被分配给下一个消费者,导致消息的重复处理。我们来看看消息丢失。
还是上图,如果消费者刚刚取到这三条消息,正好自动提交,而消费者此时挂掉了,那么就会在处理之前就提交了,导致这三条消息丢失。 4.2 分区分配策略 Kafka中的消息存储在多个分区中,因此消费者消息分区中的消息也有分区分配策略。
以初始图为例,下面是消费者组部分:一共有三个分区。消费者组1有4个消费者组,因此其中1个是空闲的;消费者组 2 有两个消费者组,因此有 1 个消费者组。
需要处理两个分区。 kafka消费者的分区分配策略是通过参数partition.asigment.strategy配置的。
有以下几种: Range:按照消费者总数和分区总数进行分布,但它是基于主题粒度的,所以可能会不均匀。 。
例如: RoundRobin:将消费者组中所有消费者的分区以及消费者订阅的所有主题按字典顺序排序,然后通过轮询的方式一次性将分区分配给每个消费者。例如: Sticky:这个策略比较复杂。
目的是尽可能均匀地分布分区,并尽可能与上次的分布保持一致。 4.3 再平衡 消费者之间的协调是通过消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)完成的。
其中之一是消费者再平衡。以下情况会导致消费者重新平衡的发生:新消费者加入;消费者下线;消费者主动退出;消费者组对应的组协调器节点发生变化;订阅的主题或分区的数量发生变化。
重新平衡会经过以下步骤: FindCoordinator:消费者找到组协调器所在的机器,然后建立连接; JoinGroup:消费者向群组协调者发起加入群组的请求; SyncGroup:组协调器将分区分配计划同步给所有消费者;心跳:消费者进入正常状态,开始心跳。 5、如何存储消息(物理层面) 我们已经介绍了Kafka如何在逻辑层面存储数据,接下来继续物理层面。
还是这个图: 5.1 日志文件 Kafka使用日志append来存储数据。新数据只需附加到日志文件的末尾。
这种方法提高了写入性能。但文件不能一直附加。
因此,Kafka中的日志文件对应多个日志段LogSegment。使用分段以方便清洁。
Kafka有两种日志清理策略: 日志删除(Log Retention):按照一定策略直接删除日志段;日志压缩(Log Compaction):整合每条消息的key,只保留同一key下的最新值。 。
5.1.1 日志删除 日志删除策略有过期时间和日志大小。默认保留时间为 7 天,默认大小为 1GB。
尽管默认保留时间为 7 天,但也可以保留更长的时间。因为当前活动日志段不会被删除,所以如果数据量很小且当前活动日志段无法拆分,则不会被删除。
Kafka会定期执行一个任务,删除符合删除条件的日志。 5.1.2 日志压缩 日志压缩针对key,仅保留多个具有相同key的值中最新的一个。
同时,日志压缩会产生小文件。为了避免小文件过多,Kafka在清理时会合并它们: 5.2 Log Index 日志追加提高了写入性能,但对读取不太友好。
为了提高读性能,需要降低写性能,在读写之间取得平衡。即在写入的同时维护一个索引。
Kafka维护两种类型的索引:偏移量索引和时间戳索引。 5.2.1 偏移索引 为了快速定位给定消息在日志文件中的位置,一个简单的方法是维护一个映射。
键是消息的偏移量,值是日志文件中的偏移量。这样只需要读取一个文件就可以找到对应的消息。
然而,当消息量巨大时,这个映射也会变得非常大。 Kafka维护着一个稀疏索引(sparse index),即并不是所有的消息都有对应的位置。
对于没有位置映射的消息,可以通过二分查找来解决。下图是偏移量索引的原理:比如要查找偏移量为37的消息位置,首先检查索引中没有对应的记录,然后找到不大于37的最大偏移量,即31,然后在日志中从头开始排序找到37条消息。
5.2.2 时间戳索引 时间戳索引可以根据时间戳找到对应的偏移量。时间戳索引是二级索引。
现在就可以根据时间戳找到offset,然后就可以利用offset索引找到对应的消息位置了。原理如下: 5.3 零拷贝Kafka将数据存储在磁盘上,并使用日志追加来提高性能。
为了进一步提高性能,Kafka采用了零拷贝技术。简单来说,零拷贝就是在内核态下直接将文件内容拷贝到网卡设备上,减少了内核态和用户态之间的切换。
非零拷贝: 零拷贝: 6.kafka的可靠性 Kafka通过多副本实现水平扩展,提高容灾能力和可靠性。下面看一下kafka的多副本机制。
6.1 一些概念 下图展示了副本同步的一些重要概念(单分区视角): 6.1.1 AR:分配的副本 所有副本统称为 AR。 6.1.2 ISR:In-Sync ReplicasISR 是 AR 的子集,即与主副本同步的所有副本的集合。
6.1.3 OSR:不同步副本OSR也是AR的一个子集,即与主副本不同步的所有副本。一组一致的副本。
所以AR=ISR+OSR。 Kafka 使用一些算法来确定从属副本是否保持同步。
失败的副本还可以通过赶上主副本来重新进入 ISR。6.1.4 LEO:日志结束偏移LEO 是写入下一条消息的偏移量。
LEO之前的消息已写入日志。每个副本都有自己的 LEO。
6.1.5 HW:在与主副本同步的所有 High Watermark 副本中,最小的 LEO 是 HW。这个偏移量意味着在此之前的消息已经被所有ISR写入日志,消费者可以拉取它们。
,此时即使主副本出现故障,其中一个ISR副本成为主副本,消息也不会丢失。 6.2 主副本HW和LEO的更新 LEO和HW都是消息偏移量,其中HW是所有ISR中最小的LEO。
下图展示了生产者同步消息到主副本,再同步到从副本的过程:生产者将消息发送给领导者;领导者将消息追加到日志中并更新自己的偏移信息。同时,leader还维护follower的信息(如LEO等); follower向leader请求同步,并携带自己的LEO等信息; Leader读取日志并拉取每个follower保存的信息(LEO);领导者将数据连同自己的硬件返回给跟随者; follower拿到数据后,追加到自己的日志中,同时根据返回的HW更新自己的HW。
方法是取自身LEO和HW的最小值。从上面的过程可以看出,一次同步过程后,Leader的HW并没有增加。
只有在再次同步后,follower将最后更新的LEO携带给leader,leader才能更新HW。此时,村里可以确认该消息确实已全部ISR副本写入成功。
Leader的HW非常重要,因为这个值直接决定了消费者可以消费的数据。 6.3 Leader Epoch 考虑以下场景。
最初,领导者保存了两条消息。此时LEO=2,HW=1:正在上传图片……sync 1时,follower拉取数据,append后需要再次请求leader。
Leader 和 Follower 的 HW 只能更新一次(sync 2)。这样,更新HW就会出现间隙。
当sync 1成功并且follower在sync 2之前挂掉时,那么重启后HW仍然是1,follower就会截断日志导致m2丢失。如果此时leader也挂掉了,就会出现这种情况。
follower将成为leader,m2将彻底丢失(即使重新启动原来的leader也无法改变)。为了解决这个问题,Kafka引入了leader epoch的概念。
其实这是一个版本号。在follower同步请求中,不仅传递自己的LEO,还带上当前的LE。
当领导者更换时,这个值就会增加。 1、由于有LE信息,follower在崩溃重启后不会轻易截断日志,而是会请求最新的信息,避免了上述情况下数据丢失的问题。
本文通过简单的语言和简单的图表简单介绍了Kafka中的一些重要概念。其实kafka是一个复杂的系统,需要更多的学习才能深入理解kafka。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签:
相关文章
06-18
06-17
06-21
06-18
06-17
06-06
06-18
最新文章
【玩转GPU】ControlNet初学者生存指南
【实战】获取小程序中用户的城市信息(附源码)
包雪雪简单介绍Vue.js:开学
Go进阶:使用Gin框架简单实现服务端渲染
线程池介绍及实际案例分享
JMeter 注释 18 - JMeter 常用配置组件介绍
基于Sentry的大数据权限解决方案
【云+社区年度征文集】GPE监控介绍及使用