kafka消费者
@TOC
1、消费者与消费组
消费组是kafka提供的可扩展且具有容错性的消费者机制,同一个消费组的消费者共有一个groupId,一个分区只能被同一个消费组中的一个消费者消费。
消费组解决了什么问题:
- 消费模式灵活,假设三个消费者订阅了一个主题,当使用三个消费组时,每个消费组有一个消费者,利用分区在不同消费组间共享的特性,就实现了发布订阅(广播)模式;当把这三个消费者放在一个消费组中时利用分区在消费组成员间互斥的特性实现了队列(单播)模式;
- 故障容灾,引进了消费组的概念,当消费组中的某个消费者出现故障时消费组会执行在平衡操作。
2、push or pull
kafka的消费者采用的是pull模式主动从broker拉取数据,这种主动拉的模式可以让消费者根据自己的消费速率拉消息。如果采用push模式的话,不同消费者消费速率可能不同,无法设置一个使用所有消费者的push消息大小,push模式也可能会导致消费者消息来不及消费而积压。
但pull模式也有缺点,当broker中没有消息时,消费者频繁的pull是没有任何意义的,但kafka在消费者每次拉的时候带有一个参数,当消费者拉到的数据为空时,会等待这个时间之后再去pull。
消费者在拉消息的时候和生产者生产消息的batch机制有相似的设置,可以通过设置下面两个参数减少消费者与broker之间的频繁交互
fetch.min.byte:消费者从broker获取数据的最小字节数,如果消息大小小于此值,则broker会等待有足够的数据时把它返回给消费者
fetch.max.wait.ms:默认500ms,如果消费者获取最小数据量的要求得不到满足,就会在等待最多该属性所设置的时间后获取到数据。
3、offset提交
消费者位移存储在kafka的内部的主题__consumer_offsets中,之前版本的位移存储在zookeeper中,老版本的Kafka会把位移信息保存在zk中,当Consumer重启后,自动从zk中读取位移信息。这种设计使Kafka Broker不需要保存位移数据,可减少Broker端需要持有的状态空间,有利于实现高伸缩性,但zk不适用于高频的写操作,这令zk集群性能严重下降,在新版本中将消费者的位移数据作为一条条普通的Kafka消息,提交至内部主题__consumer_offsets中保存。实现高持久性和高频写操作。
Consumer 需要为分配给它的每个分区提交各自的位移数据,提交位移一是表征消费者的消费进度,二是可以在当前消费者发生故障重启后能从之前提交的位移处接着消费,避免再从头开始消费。
4、自动提交与手动提交
从消费者的角度来说位移提交分为自动提交和手动提交。
4.1 自动提交
自动提交设置enable.auto.commit的值为true,默认是5s自动提交一次。
在自动提交时,先提交上一批次拉取消息的位移,再处理下一批次,因此自动提交能保证消息不丢失,但是自动提交存在重复消费的可能。默认情况下5s提交一次,假如在上次为已提交后的第3s发生了rebalance,则消费者会从上一次提交位移的地方开始消费,则rebalance前的3s数据就会被再次消费。可以通过 auto.commit.interval.ms设置自动提交的间隔时间,但是减少时间只能是减少重复消费的时间窗口,而不能完全消除。
4.2 手动异步提交
手动提交并不能完去代替自动提交,手动提交是异步的,不存在重试机制,因为手动提交是异步的,重试时提交的位移可能已不是最新的位移了,因此重试没有意义。
对于常规性阶段性的手动提交可以使用异步和非异步的方式组合的方式,将两者结合,使用异步提交的方式避免程序阻塞,当consumer结束消费时同步提交。
备注:如果不发生重平衡使用自动提交的方式。
5、重平衡
5.1 重平衡的时机
订阅的主题数发生变化,这种情况只有在业务调整时才会发生,要么不发生,要么不可避免;
- 主题分区发生变化,在部署之前就要考虑集群的容量,以便确定好分区数,因此调整的次数也是有限的,在低峰时进行调整;
- 消费组成员的变化,消费组成员发生变化的情况有下面两种:
- 消费者处理消息超时,即如果消费者处理消费的消息的时间超过了Kafka集群配置的 max.poll.interval.ms 的值,那么该消费者将会自动离组
- 心跳超时,如果消费者在指定的session.timeout.ms时间内没有汇报心跳,那么Kafka就会认为该消费已经dead了
因此消费端的消费者组成员变化一般都是由于异常引起的,所以其产生的 Rebalance 也是最难控制的。
重平衡过程是靠消费者端的心跳线程通知到其他消费者实例的,每当消费者向其 coordinator 汇报心跳的时候,如果这个时候 coordinator 决定开启 Rebalance ,那么 coordinator 会将REBALANCE_IN_PROGRESS封装到心跳的响应中,当消费者接收到这个REBALANCE_IN_PROGRESS,他就知道需要开启新的一轮 Rebalance 了,所以heartbeat.interval.ms除了是设置心跳的间隔时间,其实也意味着 Rebalance 感知速度,心跳越快,Rebalance 就能更快的被各个消费者感知。
5.2 重平衡流程
消费者端重平衡流程:
加入组,组内成员加入组时会向协调者发送JoinGroup请求,将自己订阅的主题上报,协调者会从这些组员中选出一个leader consumer,协调者会把消费者组订阅信息封装进JoinGroup请求的响应体中,然后发给领导者,由领导者统一做出分配方案后。然后进入到下一步:发送SyncGroup请求;
Leader Consumer分配方案,这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了
broker端的重平衡流程:新成员加入组,当有新成员发送JoinGroup请求时,协调者会以心跳的方式通知之前的成员开启新一轮的重平衡
组成员主动离组
组成员崩溃离组
组成员提交位移
5.3 分区分配原则
Range,这种分配是基于每个主题的分区分配,如果主题的分区不能平均分配给组内每个消费者,那么对该主题,某些消费者会被分配到额外的分区,这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,可能会出现有的消费者分配很多分区,有的消费者分配较少的分区。
RoundRobin,RoundRobin是基于全部主题的分区来进行分配的,同时这种分配也是kafka默认的rebalance分区策略,
发现C2承担了4个分区的消费而C1订阅了T1,是不是把T1P1交给C1消费能更加的均衡呢Sticky,目的是在执行一次新的分配时,能在上一次分配的结果的基础上,尽量少的调整分区分配的变动,节省因分区分配变化带来的开销。每一次分配变更相对上一次分配做最少的变动。其目标有两点:
- 分区的分配尽量的均衡。
- 每一次重分配的结果尽量与上一次分配结果保持一致。
当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出Sticky特性的。