kafka重复消费的问题

2024-05-07 13:55

1. kafka重复消费的问题

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
  
 造成的问题:假如consumer.properties配置中max.poll.records=40  (一次最多拉取40条数据)  session.timeout.ms=30000    (会话时间)
  
 假设kafka此时一次拉取了40条数据,但在处理第31条的时候抛出了如上的异常,就会导致,本次offset不会提交,完了这40条消息都会在接下来的某刻被再次消费,这其中就包含了其实已经消费了的30条数据
  
 原因:the poll loop is spending too much time message processing, the time between subsequent calls to poll() was longer than the configured session.timeout.ms,好吧其实是一个意思!
  
 意思就是说poll下来数据后,处理这些数据的时间比 session.timeout.ms配置的时间要长,从而导致the group has already rebalanced
  
 解决办法是最后一句话:You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
  
 即要不增大 session.timeout.ms,要不减小max.poll.records ,至于具体配置为多少,得看你处理一条消息花费多长时间 x,需要满足 x乘以max.poll.records < session.timeout.ms
  
 另一种解决思路:
  
 解决此类重复消费的方式:将能够唯一标识消息的信息存储在其他系统,比如redis,什么能够唯一标识消息呢?就是consumergroup+topic+partition+offset,更准确的应该是consumergroup+" "+topic+" "+partition+"_"+offset组成的key,value可以是处理时间存放在redis中,每次处理kafka消息时先从redis中根据key获取value,如果value为空,则表明该消息是第一次被消费,不为空则表示时已经被消费过的消息;
  
 参考: https://www.cnblogs.com/chinano1/p/9357725.html

kafka重复消费的问题

2. kafka防止消息重复消费

kafka重复消费的根本原因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offset没更新?
   
  
  max.poll.interval.ms 
  
 两次poll操作允许的最大时间间隔。单位毫秒。默认值300000(5分钟)。
  
 两次poll超过此时间间隔,Kafka服务端会进行rebalance操作,导致客户端连接失效,无法提交offset信息,从而引发重复消费。
  
 拿到消息就提交offset
  
 
  
  
  1、丢包问题 :消息推送服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。
  
 解决方案:首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。 
  
 检测方法:使用重放机制,查看问题所在。
  
 2. 重复消费最常见的原因 :re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。 
  
  消息重复消费和消息丢包的解决办法 
  
 保证不丢失消息:生产者(ack=all 代表至少成功发送一次)     重试机制
  
 消费者 (offset手动提交,业务逻辑成功处理后,提交offset) 
  
 保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据) 
  
 业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

3. Kafka的重复、丢数据及顺序消费等问题

①、kafka的顺序消息仅仅是通过partitionKey,将某类消息写入同一个partition,一个partition只能对应一个消费线程,以保证数据有序。
  
 ②、除了发送消息需要指定partitionKey外,producer和consumer实例化无区别。
  
 ③、kafka broker宕机,kafka会有自选择,所以宕机不会减少partition数量,也就不会影响partitionKey的sharding。
  
 acks设置为0:broker接收消息立即返回,还没写入磁盘,容易丢失数据
  
 acks设置为1:等待broker的ack,如果leader落盘了就返回ack,如果follower同步完成前leader挂了就会丢失未同步的数据(follower选举)
  
 acks设置为-1:等待所有leader和follower都落盘后返回ack,如果follower已同步,但是broker返回ack前leader挂了,则会重复发送消息。
  
 consumer自动提交offset,但其实未处理好消息,容易丢数据。可以选择手动提交,处理完后再提交offset
  
 0.9版本的kafka改进了coordinator的设计,提出了group coordinator——每个consumer group都会被分配一个这样的coordinator用于组管理和位移管理。这个group coordinator比原来承担了更多的责任,比如组成员管理、位移提交保护机制等。当新版本consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信。显而易见,这种coordinator设计不再需要zookeeper了,性能上可以得到很大的提升。
  
 每个  Group  都会选择一个  Coordinator  来完成自己组内各  Partition  的  Offset  信息,选择的规则如下: 1. 计算  Group  对应在  __consumer_offsets  上的  Partition  2. 根据对应的Partition寻找该Partition的leader所对应的Broker,该Broker上的Group Coordinator即就是该Group的Coordinator
  
 numPartitionsPerConsumer=counsumer/partitions——》5/3=1,每个消费者至少被分配一个partition
  
 consumersWithExtraPartition=counsumer%partitions——》5%3=2
  
 i=0,start=0,length=2;
  
 i=1,start=2,length=2;
  
 i=2,start=4,length=1;
  
 如果是4个partitions和3个consumer
  
 i=0,start=0,length=2;
  
 i=1,start=2,length=1;
  
 i=2,start=3,length=1;
  
 for(每一个TopicPartition)
  
 ​   以RoundRobin的方式选择一个订阅了这个Topic的Consumer,将这个TopicPartition分派给这个Consumer      end
  
 “sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
                                                                                  
 为什么会重复消费:第一种可能是生产者重复发送消息。第二种可能是消费者手动提交时挂掉了,导致消费了数据但是没有提交offset。
  
 为什么会丢失数据:第一种可能是ack非-1的情况下,follower未同步完全,leader挂了。第二种可能是消费者自动提交,但其实还没完成消费。
  
 怎么保证生产者消息不重复,0.11后,生产者会生成pid,和一个sequence number,通过pid sequence number brokerid作为key,如果在partition中已经存在,则只持久化一条。且Producer重启可以通过TransactionID拿到原来的pid,所以可以跨会话的保持一致
  
 保证顺序消费:需要保证顺序的消息发到同一个partition中,consumer会自己根据顺序消费
  
 0.9.0.0 版本之前判断副本之间是否同步,主要是靠参数 replica.lag.max.messages 决定的,即允许 follower 副本落后 leader 副本的消息数量,超过这个数量后,follower 会被踢出 ISR。
  
 replica.lag.max.messages 也很难在生产上给出一个合理值,如果给的小,会导致 follower 频繁被踢出 ISR,如果给的大,broker 发生宕机导致 leader 变更时,肯能会发生日志截断,导致消息严重丢失的问题。
  
 在 0.9.0.0 版本之后,Kafka 给出了一个更好的解决方案,去除了 replica.lag.max.messages,,用 replica.lag.time.max.ms 参数来代替,该参数的意思指的是允许 follower 副本不同步消息的最大时间值,即只要在 replica.lag.time.max.ms 时间内 follower 有同步消息,即认为该 follower 处于 ISR 中,这就很好地避免了在某个瞬间生产者一下子发送大量消息到 leader 副本导致该分区 ISR 频繁收缩与扩张的问题了。
  
 Kafka集群中多个broker,有一个会被选举为controller leader,负责管理整个集群中分区和副本的状态,比如partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic分区的时候也会由controller管理分区的重新分配工作
  
 实际上,Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
  
 故障转移
  
 ​       当 Broker 0 宕机后,ZooKeeper 通过 Watch 机制感知到并删除了 /controller 临时节点。之后,所有存活的 Broker 开始竞选新的控制器身份。Broker 3 最终赢得了选举,成功地在 ZooKeeper 上重建了 /controller 节点。之后,Broker 3 会从 ZooKeeper 中读取集群元数据信息,并初始化到自己的缓存中。

Kafka的重复、丢数据及顺序消费等问题

4. Kafka数据消费

 消费者负责从订阅的主题上拉取消息,消费组是逻辑概念。一个消费者只属于一个消费组,一个消费组包一个或多个消费者。当消息发布到主题后,会被投递到每个消费组,但每个消费组中只有一个消费者能消费给消息。
                                           消费者如何知道该消费哪个分区?当消费组内消费者个数发生变化时,分区分配是如何变化的呢?
   按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀地分配给所有的消费者。对于 每一个主题 该策略会将消费组内所有的消费者按照名称的字典序排序然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
   假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1分区,后面的每个消费者分配n个分区。
   如图所示主题中共有7个分区,此时消费组内只有一个消费者C0,C0订阅7个分区。
                                                                                                                                                                   随着消费组内消费者不断加入,分区逐渐从C0分配到C1~C6,当最后一个消费者C7加入后,此时总共有8个消费者但是只有7个分区,因此C7由于分配不到分区而无法消费任何消息。
   消费者并非越多越好,消费者数量应小于等于分区数量,否则会造成资源的浪费
    缺点: 
   当一个消费组订阅两个分别包含四个分区的主题时,分区分配结果如下,比较均匀。
                                           但当两个主题各有3个分区时,则会出现如下分区不均的问题。类似情况扩大的话,可能出现消费者过载问题。
                                           将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式将分区依次分配给每个消费者。如果消费组内消费者的订阅信息都是相同的,那么分区分配会比较均匀。如一个消费组两个消费者,分别订阅两个都有3的分区的主题,如图。
                                           但是当消费组内消费者的订阅信息不同时,则会出现分配不均问题。如图,假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,分区结果将会如下图所示。
                                           后来引入的策略,主要目的:
   假设三个消费者,订阅了4个主题,每个主题有两个分区,那么初始分区分配结果如下:
                                           乍一看,跟RoundRobin分配策略结果相同,但此时如果C1下线,那么消费组会执行再均衡操作,重新分配消息分区。如果是RoundRobin策略,分配结果如下:
                                           而如果是Sticky分配策略,则结果如下:
                                           StickyAssignor保留了上一次对C0和C2的分配结果,将C1的分区分配给C0和C2使其均衡。
   如果发生分区重分配,那么对于同一个分区而 ,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,造成重复消费。StickyAssignor分配策略如同其名称中的"sticky"一 样,让分配策略具备的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。
   再来看下,消费者订阅信息不相同的情况,拿RoundRobinAssignor中的实例来说。
   假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,RoundRobinAssignor分区结果将会如下图所示。
                                           而采用StickyAssignor时,分区分配结果如下:
                                           若此时C0下线,RoundRobinAssignor重分配的结果如下:
                                           而StickyAssignor重分配结果如下:
                                           综上:
   StickyAssignor分配策略的优点就是可以使分区重分配具备 “黏性”,减少不必要的分区移动(一个分区剥离之前的消费者 ,转而分配给另一个新的消费者)。
   Kafka中的消息消费是基于拉模式。
   Kafka每次拉取一组消息,每条消息的格式如下:
   在每次拉取方法时,它返回的是还没有被消费过的消息集。要实现这个功能,就需要知道上次消费时的消费位移,消费者在消费完消息后要进行消费位移提交动作,且消费位移要进行持久化,消费位移保存在__consumer_offsets主题中。
                                           当前拉取消息的最大offset为x,消费者消费完成提交位移的是offset其实为x+1,表示下次拉取消息的起始位置。
    自动提交 
   默认采用自动提交,默认每隔5s会将拉取到的每个分区的最大的消息位移进行提交。真正的提交动作是在拉取消息的逻辑完成,每次拉取消息前会判断是否可以进行位移提交,如果可以则提交上一次的位移。这里会有两个问题,如下图所示。
                                           重复消费:当前拉取消息【x+2,x+7】,当前消费到X+5,在提交消费位移前,消费者宕机;新的消费者还是会从X+2开始拉取消息, 因此导致重复消费。
   消息丢失:当前拉取消息【x+2,x+7】,当前消费X+5,到下次拉取的时候,消费位移已经提交为X+8,若此时消费者宕机,新的消费者会从X+8处开始消费,导致X+5 ~ X+7的消息没有被消费,导致消息的丢失。
    手动提交 
   同步提交和异步提交。
   同步提交默认提交本次拉取分区消息的最大偏移量,如本次拉取【X+2,X+7】的消息,同步提交默认提交X+8的位置;当时同步提交也可指定提交的偏移量,比如消费一条提交1次,因为提交本身为同步操作,所以会耗费一定的性能。
   同步提交也会导致重复消费的问题,如消费完成后,提交前消费者宕机。
   异步提交消费者线程不会被阻塞,使性能得到增强,但异步提交失败重试可能会导致提交位移被覆盖的问题,如本次异步提交offset=X失败,下次异步提交offset=X+y成功;此时前一次提交重试再次提交offset=x,如果业务上没有重试校验,会导致offset被覆盖,最终导致重复消费。
   当新的消费组建立、消费者订阅新的主题或之前提交的位移信息因为过期被删除等,此时查不到纪录的消费位移。Kafka可配置从最新或从最早处开始消费。
   Kafka还支持从特定位移处开始消费,可以实现回溯消费,Kafka内部提供了Seek()方法,来重置消费位移。
   当需要回溯指定时间后的消息时,可先用offsetsForTimes方法查到指定时间后第一条消息的位移,然后再用seek重置位移。
   分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除或添加消费者。
   Kfaka提供了组协调器(GroupCoordinator)和消费者协调器(ConsumerCoordinator),前者负责管理消费组,后者负责与前者交互,两者最重要的职责就是负责再均衡的操作。
   举例说明,当消费者加入消费组时,消费者、消费组和组协调器之间一般会经历以下几个阶段。
    第一阶段(FIND COORDINATOR) 
   消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker并创建与该broker 相互通信的网络连接。
   消费者会向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器。
                                           Kafka根据请求中的coordinator_key(也就是groupld )的哈希值计算__consumer_offsets中的分区编号,如下图所示。找到对应的分区之后,在寻找此分区leader副本所在的broker节点,该节点即为当前消费组所在的组协调器节点。
   消费组最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给该broker节点。该broker节点既扮演GroupCoordinato的角色又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。
    第二阶段(JOIN GROUP) 
   在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的 消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。
                                           组协调器内部主要做了以下几件事:
    选举消费组的****leader 
   如果当前组内没有leader,那么第一个加入消费组的则为leader。如果leader挂掉,组协调器会从内部维护的HashMap(消费者信息,key为member_id)中选择第一个key作为新的leader。
    选举分区分配策略 
   前面说的每个消费者可能会上报多个分区分配策略,选举过程如下:
    第三阶段(SYNC GROUP) 
   leader消费者根据在第二阶段中得到的分区分配策略来实施分区分配,然后将分配结果同步到组协调器。各个消费者会向组协调器发送SyncGroupRequest请求来同步分配方案。
                                           请求结构如图,leader发送的请求才会有group_assignment。
                                           其中包含了各个消费者对应的具体分配方案,member_id表示消费者的唯一标识,而 member_assignment是与消费者对应的分配方案,如图
                                           消费者收到具体的分区分配方案后,会开启心跳任务,定期向组协调器发送心跳请求确定彼此在线。
    第四阶段(HEARTBEAT) 
   在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交成功,那么消费者会请求获取上次提交的消费位移并从此处继续消费。
   心跳线程是一个独立的线程,可以在轮询消息的空档发送。如果消费者停发送心跳的时间足够长,组协调器会认为这个消费者已经死亡,则触发一次再均衡行为。