Kafka: GroupCoordinator

发布日期:2019-07-25
1、Kafka GroupCoordinator的核心功能及其由来2、Group协调2.1 Rebalance2.2 Group内member的Leader选举3、Consumer commit offset的存储4、如何选择GroupCoordinator5、Configuration

 

1、Kafka GroupCoordinator的核心功能及其由来

Kafka Consumer消费topic消息时,分配策略是将topic中的partition分配给Consumer,其中Consumer又是以组的方式进行管理的。一个Consumer Group订阅一个topic,最终是Group中的每一个Consumer去订阅部分的partition,Group中整个Group订阅所有的partition。

       在Kafka里,用于管理Group与成员关系、协调重分配的过程,是一个被称为Coordinator的组件来完成的。并且在0.9版本之前,该协调是由ZooKeeper来完成的,从0.9开始,这个事情由Kafka Broker来完成了。

       此外,每一个分区,其实是4个offset:

 

Leo:代表了producer提交的message set的最后一个offset

HW:代表了最大可以供Consumer消费的message set的offset

Current Position:代表了某个Consumer Group消费到的位置

Last Committed Offset:代表了某个Consumer Group Commit的offset。

其中leo,hw都不必说了,他们是存在于recovery-point-offset-checkpoint replication-checkpoint 两个文件中的它两个都是针对于分区来说的。

Current position 是有Consumer自身知道的。

Last Committed Offset则是记录了Partition在某个Consumer Group的消费情况。

 

       在0.9之前,这两个offset也是记录在zookeeper,则改为记录到内置topic __consumer_offset中了 ,这样的改变的一个重要原因是每一次消费consumer都要往zk上写一次commited offset,而kafka也要向zookeeper记录一次current position。这样给zk带来了沉重的负担,降低了消费性能。

 

也就说,在0.9之后版本中,Kafka Broker又承担了两个角色:1)管理consumer group offset commit, 2)对Group进行协调。

 

这两个新的特性,都是由Kafka GroupCoordinator来完成的。对于第2)个特性,Group协调工作,并不是只针对与Consumer Group,而是可以对于任何组。在Kafka 中,目前已经有两种组了: ConsumerGroup、WorkerGroup。

2、Group协调

具体来讲,是针对具备订阅关系的组进行协调。例如Consumer Group要订阅topic,那就将topic 中的partition按照某种策略分配给Group内的各个Consumer;Worker Group要订阅各个connector,那就把connector内的各个task分配给Group内的各个Worker。

 

2.1 Rebalance

GroupCoordinator提供了4个API来进行组协调工作:

join group sync group,leave group heartbeat。

也就是说GroupCoordinator是维持了动态的Group,然而因为这个Group是为了订阅到某种资源,所以,一旦成员数量发生了变更、或者资源数目发生了变更,需要进行资源重新分配(reblance)。此外为了跟踪各个member的状态,需要各个成员与Group Coordinator保持心跳,一旦心跳超时,也会认为该member离开了Group。

 

为了更好的协调Group,对Group划分了状态:

Dead:死了,该组将不再服务

AwaitingSync:等待Member Leader进行分配状态

Stable:稳定状态

PreparingRebalance:等待重分配状态

 

1)  一旦有member join group、leave group、 heartbeat timeout时,group变为PreparingRebalance状态

2)  在PreparingRebalance状态下,GroupCoordinator会通知所有的Member进行rebalance。此时该group变为AwaitingSync状态。

3)  在Member接收到需要rebalance时,一旦发现自己是group内所有member的leader,就开始调度assign过程。

4)  所有的member在执行完rebalance请求后,发起sync group的请求给Group Coordinator。非leader的member发的数据是空,而leader member发的是分配的结果。(例如ConsumerCoordinator 回发的是每一个consumer分配到了哪些partition,WorkerCoordinator回发的是每一个work分配到了哪些task。)

5)  GroupCoordinator只对leader member的响应做处理,具体说是将该分配请求的数据(bytes)发给每一个member。此时该group变为Stable状态。

6)各个memebr拿到新的分配情况后,应用该分配,并进行heartbeat。

 

2.2 Group 内member的leader选择

member的leader选择不会频繁的发生,只会出现在两个情况:1) group是empty的组时,会在第一个member加入组是将其选为leader。2)当leader失败或者离开组时,才会重新选择leader,选择的办法很简单,只将第一个member选择leader即可。

def add(memberId: String member: MemberMetadata) { assert(supportsProtocols(member.protocols)) if (leaderId == null) leaderId = memberId members.put(memberId member)}// members是一个hashmapdef remove(memberId: String) { members.remove(memberId) if (memberId == leaderId) { leaderId = if (members.isEmpty) { null } else { members.keys.head } }}

 

  

 

3、Consumer commit offset存储

__consumer_offsets 是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 三个副本。

具体 group 的消费情况要存储到哪一个 partition 上,是根据 Math.abs(groupId.hashCode())% partitionCount来计算(其中,parititionCount 是__consumer_offsets的 partition 数,默认是50个)的。

 

对于offset commit提供了 两个api:

1)  commit offset 发生在consumer消费完一次数据后进行commit时,将其存到__consumer_offsets中

2)  fetch offset 任何一个客户端都可以随时的调用该请求来获取到last committed offset

 

此外,每一个group的消费情况也不会是永久的存储到__consumer_offsets里了,也会被定期的清理掉的。

 

4、如何选择GroupCoordinator?

       在一个Kafka集群里,有那么多的Broker的,一旦Consumer 消费了一次数据,还得给GroupCoordinator发一个commit offset的请求,也就是说这个GroupCoordinator处理的最多的是1)接收commit offset请求,2)写offset到__consumer_offsets。所以选择哪个Broker作为Group Coordinator就比较重要了,主要依赖于第2)项了,毕竟对于1)来讲,哪一个Broker都是无可厚非的,但对于2)就不同了,如果作为GroupCoordinator的Broker与group所以对应的parition的leader不在一台机器上,又增加了各个Broker之间的网络负载。

 

所以GroupCoordinator是选择的group在__consumer_offsets所对应的partition的leader所在的Broker上。

 

5、Configuration

1)    offsets.commit.required.acks offsets.commit.timeout.ms

GroupCoordinator在处理commitoffset请求时,会将消费情况写到__consumer_offsets中,实际上调用的是replicaManager.appendMessage(timeout requiredAcksmessageSet)方法。

 

offsets.commit.required.acks这个配置项,其实就是这个requeiredAcks参数,也就是说,这个配置项和一个producer发送数据是设置的ack是同样的作用,要求在写数据前确认ISR中有足够的replica。它的默认值是-1。

 

offsets.commit.timeout.ms 就是这个timeout参数,默认值是 5000

 

2)offsets.load.buffer.size

读取offset segment时的batchsize

 

3)offsets在磁盘上保留多久:offsets.retention.check.interval.ms offset.retention.minutes

4)offsets.topic.num.parititonsoffsets.topic.replication.factor offsets.topic.segment.bytes,offsets.topic.compression.codec

__consumer_offset Topic的分区数,复制因子,段大小,压缩方式

 

5) Group成员到GroupCoordinator的session的时间配置:group.max.session.timeout.msgroup.min.session.timeout.ms

Broker中执行heartbeat检查是基于member 的sessiontimeout + member.lastheartbeat作为heartbeat的deadline的。也就是说完全由member自身来控制的。

 

那么这两个配置项有啥用呢?只是划定一个范围,用于规范member的sesssion timeout配置的,怎么理解呢?

在handleJoinGroup请求时,基于这两个配置,来验证member的session timeout是否在这两个值的范围内,没有其他的作用。