Kafka数据消费

如题所述

第1个回答  2022-08-01

消费者负责从订阅的主题上拉取消息,消费组是逻辑概念。一个消费者只属于一个消费组,一个消费组包一个或多个消费者。当消息发布到主题后,会被投递到每个消费组,但每个消费组中只有一个消费者能消费给消息。

消费者如何知道该消费哪个分区?当消费组内消费者个数发生变化时,分区分配是如何变化的呢?

按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀地分配给所有的消费者。对于 每一个主题 该策略会将消费组内所有的消费者按照名称的字典序排序然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1分区,后面的每个消费者分配n个分区。

如图所示主题中共有7个分区,此时消费组内只有一个消费者C0,C0订阅7个分区。

随着消费组内消费者不断加入,分区逐渐从C0分配到C1~C6,当最后一个消费者C7加入后,此时总共有8个消费者但是只有7个分区,因此C7由于分配不到分区而无法消费任何消息。

消费者并非越多越好,消费者数量应小于等于分区数量,否则会造成资源的浪费

缺点:

当一个消费组订阅两个分别包含四个分区的主题时,分区分配结果如下,比较均匀。

但当两个主题各有3个分区时,则会出现如下分区不均的问题。类似情况扩大的话,可能出现消费者过载问题。

将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式将分区依次分配给每个消费者。如果消费组内消费者的订阅信息都是相同的,那么分区分配会比较均匀。如一个消费组两个消费者,分别订阅两个都有3的分区的主题,如图。

但是当消费组内消费者的订阅信息不同时,则会出现分配不均问题。如图,假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,分区结果将会如下图所示。

后来引入的策略,主要目的:

假设三个消费者,订阅了4个主题,每个主题有两个分区,那么初始分区分配结果如下:

乍一看,跟RoundRobin分配策略结果相同,但此时如果C1下线,那么消费组会执行再均衡操作,重新分配消息分区。如果是RoundRobin策略,分配结果如下:

而如果是Sticky分配策略,则结果如下:

StickyAssignor保留了上一次对C0和C2的分配结果,将C1的分区分配给C0和C2使其均衡。

如果发生分区重分配,那么对于同一个分区而 ,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,造成重复消费。StickyAssignor分配策略如同其名称中的"sticky"一 样,让分配策略具备的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。

再来看下,消费者订阅信息不相同的情况,拿RoundRobinAssignor中的实例来说。

假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,RoundRobinAssignor分区结果将会如下图所示。

而采用StickyAssignor时,分区分配结果如下:

若此时C0下线,RoundRobinAssignor重分配的结果如下:

而StickyAssignor重分配结果如下:

综上:

StickyAssignor分配策略的优点就是可以使分区重分配具备 “黏性”,减少不必要的分区移动(一个分区剥离之前的消费者 ,转而分配给另一个新的消费者)。

Kafka中的消息消费是基于拉模式。

Kafka每次拉取一组消息,每条消息的格式如下:

在每次拉取方法时,它返回的是还没有被消费过的消息集。要实现这个功能,就需要知道上次消费时的消费位移,消费者在消费完消息后要进行消费位移提交动作,且消费位移要进行持久化,消费位移保存在__consumer_offsets主题中。

当前拉取消息的最大offset为x,消费者消费完成提交位移的是offset其实为x+1,表示下次拉取消息的起始位置。

自动提交

默认采用自动提交,默认每隔5s会将拉取到的每个分区的最大的消息位移进行提交。真正的提交动作是在拉取消息的逻辑完成,每次拉取消息前会判断是否可以进行位移提交,如果可以则提交上一次的位移。这里会有两个问题,如下图所示。

重复消费:当前拉取消息【x+2,x+7】,当前消费到X+5,在提交消费位移前,消费者宕机;新的消费者还是会从X+2开始拉取消息, 因此导致重复消费。

消息丢失:当前拉取消息【x+2,x+7】,当前消费X+5,到下次拉取的时候,消费位移已经提交为X+8,若此时消费者宕机,新的消费者会从X+8处开始消费,导致X+5 ~ X+7的消息没有被消费,导致消息的丢失。

手动提交

同步提交和异步提交。

同步提交默认提交本次拉取分区消息的最大偏移量,如本次拉取【X+2,X+7】的消息,同步提交默认提交X+8的位置;当时同步提交也可指定提交的偏移量,比如消费一条提交1次,因为提交本身为同步操作,所以会耗费一定的性能。

同步提交也会导致重复消费的问题,如消费完成后,提交前消费者宕机。

异步提交消费者线程不会被阻塞,使性能得到增强,但异步提交失败重试可能会导致提交位移被覆盖的问题,如本次异步提交offset=X失败,下次异步提交offset=X+y成功;此时前一次提交重试再次提交offset=x,如果业务上没有重试校验,会导致offset被覆盖,最终导致重复消费。

当新的消费组建立、消费者订阅新的主题或之前提交的位移信息因为过期被删除等,此时查不到纪录的消费位移。Kafka可配置从最新或从最早处开始消费。

Kafka还支持从特定位移处开始消费,可以实现回溯消费,Kafka内部提供了Seek()方法,来重置消费位移。

当需要回溯指定时间后的消息时,可先用offsetsForTimes方法查到指定时间后第一条消息的位移,然后再用seek重置位移。

分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除或添加消费者。

Kfaka提供了组协调器(GroupCoordinator)和消费者协调器(ConsumerCoordinator),前者负责管理消费组,后者负责与前者交互,两者最重要的职责就是负责再均衡的操作。

举例说明,当消费者加入消费组时,消费者、消费组和组协调器之间一般会经历以下几个阶段。

第一阶段(FIND COORDINATOR)

消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker并创建与该broker 相互通信的网络连接。

消费者会向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器。

Kafka根据请求中的coordinator_key(也就是groupld )的哈希值计算__consumer_offsets中的分区编号,如下图所示。找到对应的分区之后,在寻找此分区leader副本所在的broker节点,该节点即为当前消费组所在的组协调器节点。

消费组最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给该broker节点。该broker节点既扮演GroupCoordinato的角色又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。

第二阶段(JOIN GROUP)

在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的 消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

组协调器内部主要做了以下几件事:

选举消费组的****leader

如果当前组内没有leader,那么第一个加入消费组的则为leader。如果leader挂掉,组协调器会从内部维护的HashMap(消费者信息,key为member_id)中选择第一个key作为新的leader。

选举分区分配策略

前面说的每个消费者可能会上报多个分区分配策略,选举过程如下:

第三阶段(SYNC GROUP)

leader消费者根据在第二阶段中得到的分区分配策略来实施分区分配,然后将分配结果同步到组协调器。各个消费者会向组协调器发送SyncGroupRequest请求来同步分配方案。

请求结构如图,leader发送的请求才会有group_assignment。

其中包含了各个消费者对应的具体分配方案,member_id表示消费者的唯一标识,而 member_assignment是与消费者对应的分配方案,如图

消费者收到具体的分区分配方案后,会开启心跳任务,定期向组协调器发送心跳请求确定彼此在线。

第四阶段(HEARTBEAT)

在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交成功,那么消费者会请求获取上次提交的消费位移并从此处继续消费。

心跳线程是一个独立的线程,可以在轮询消息的空档发送。如果消费者停发送心跳的时间足够长,组协调器会认为这个消费者已经死亡,则触发一次再均衡行为。

相似回答