Kafka组消费之Rebalance机制
基础概述
Kafka是插件的消息队列, 本文涉及的是Kafka的 消费者组
, 消费者组是Kafka的一个重要特性, 它允许一个或多个消费者订阅同一个主题, 并且共同消费主题中的消息。在此之前, 先简单介绍生产者与消费者的基础概念.
Topic
: 消息主题, 生产者将消息发送到特定的主题, 消费者从特定的主题订阅消息, 每个topic可能包含N个数据分区, 数据生产时, 会通过不同的数据哈希策略, 将消息写入对应的分区中。consumer
: 消费者, 消费者从特定的主题订阅消息, 并消费消息, 消费者属于某一个消费者组, 不同的消费者组之间, 消费状态互相隔离。消费吞吐量
: 同一个消费者组中, 一个数据分区至多被一个消费者消费, 但是一个消费者可能同时消费多个数据分区.因此, 一个消费者组的消费吞吐量, 受限于消费者组中消费者数量。 但是, 一旦消费者数量 等于 数据分区数量时, 此时消费能力以达到最大吞吐量
, 若依旧存在消费性能问题, 增加消费者是无效行为.
消费者与数据分区关系
消费者数量少于数据分区
消费者数量等于数据分区
消费者数量大于数据分区
常见问题场景
- 分区数量不足: 在消息量很大, 但是数据分区数量不足的情况下, 消费者组无法在短时间内消费完消息, 导致消息积压, 此时的操作一般是
数据分区扩容
, 扩容之后紧接着的动作便是启动足够的消费者。 - 消费者数量不足: 数据费去足够,但是消费者数量不足,如 6 个数据分区, 但是只有 2 个消费者, 平均每个消费者消费3个数据分区的数据(理论情况,实际比较复杂), 此时的操作是在启动 4 个消费者, 使消费者数量等于数据分区数量。
- 业务代码层面: 消费者消费消息时, 消费逻辑处理异常, 导致消费者处于离线状态(相当于消费者数量减少), 也可能触发rebalance。
以上两种情况都会触发 消费者Reblance行为
, 以上两种场景可以抽象的解释为: 消费者数量不变, 数据分区扩容, 触发reblance; 数据分区不变, 消费者数量变化(前提是为达到最大吞吐量),会触发reblance
关键概念
协调者(Coordinator)
在解释Rebalance机制之前,必须了解一个关键概念 Coordinator(协调者)
,它是Rebalance机制中非常重要的一个角色。每个消费组都会有一个coordinator,Coordinator负责 处理管理组内的消费者和位移管理
,Coordinator并 不负责消费组内的partition分配
。消费者通过 心跳的方式
告知Coordinator自己仍然处于存活状态,Coordinator以session. timeout. ms参数的频率检测消费组group内消费者存活情况,该参数的默认值是10s,如果该值太大,那么coordinator需要非常长时间才能检测到消费者宕机, 如果该值太小, 那么可能会误判消费者宕机。具体多大合适,需要在实际应用环境中尝试判断。
协调者(Coordinator) 状态机
Coordinator(协调者) 拥有自己的生命周期, 在生命周期中可能出现如下几个状态:
状态 | 执行能力 |
---|---|
Down | 不会维护任何消费组状态 |
Initialize | 处于初始化状态,从Zookeeper中读取相关的消费组数据,这个时候 接受到消费者心跳或者加入组的请求都会返回错误 |
Stable | 处理消费者心跳请求,但是还未开始初始化 generation(代际, 对于reblance场景下的一种保护机制) ,Coordinator正在等待消费者加入组的请求 |
Joining | 正在处理组内成员加入组的请求 |
AwaitingSync | 等待leader consumer分配分区,并将分区分配结果发送给各个Consumers |
上述的五个状态,具有如下状态机流转:
Generation(代际)
消费者消费消息超时之后,如果再次尝试提交offset,就会出现如下的异常
注意
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
出现该异常的原因是Coordinator消费组的 保护机制
。如果消费者消费超时,就会 被Coordinator从消费组中剔除,Coordinator就会进行Rebalance
,将当前消费者负责的partition重新分配给其它的消费者,如果已经是超时的消费者完成了消息的消费,假设超时的消费者成功提交partition的offset,那么就会出现混乱,因为超时的消费者负责的partition已经被分配给了其它的消费者。Generation(代际)机制就是上述的保护机制
。
Coordinator每进行一次Rebalance,就会为当前的Rebalance 设置一个Generation标记
,比如说第一次Rebalance标记是1,如果再次Rebalance,该标记就会成为2,消费者在提交offset的时候会将generation一同提交,Coordinator在发现超时的消费者的标记已经超时的情况下会拒绝消费者提交generation标记。
警告
Generation的机制可能会导致上一代际消费者和当前代际消费者消费相同的消息,所以消费者在消费消息的时候需要实现消息消费的幂等性,关于幂等性消费的问题笔者将会写一瓶文章详细介绍。
rebalance流程
Coordinator发生Rebalance的时候,Coordinator并 不会主动通知
组内的所有Consumer重新加入组,而是当Consumer向Coordinator发送心跳的时候,Coordinator将Rebalance的状况通过心跳响应告知Consumer。Rebalance机制整体可以分为两个步骤,一个是 Joining the Group ,另外一个是分配 Synchronizing Group State
Joining the Group
在当前这个步骤中,所有的消费者会和Coordinator交互,请求Coordinator加入当前消费组。Coordinator会从所有的消费者中选择一个消费者作为leader consumer, 选择的算法是 随机选择
ynchronizing Group State
leader Consumer从Coordinator获取所有的消费者的信息,并将消费组订阅的 partition分配结果封装为SyncGroup请求
,需要注意的是leader Consumer不会直接与组内其它的消费者交互,leader Consumer会将SyncGroup发送给Coordinator,Coordinator再将分配结果发送给各个Consumer。分配partition有如下3种策略
- RangeAssignor
- RoundRobinAssignor
- StickyAssignor
如果leader consumer因为一些特殊原因导致分配分区失败(Coordinator通过超时的方式检测
),那么Coordinator会重新要求所有的Consumer重新进行步骤Joining the Group状态