自学内容网 自学内容网

Kafka【十三】消费者消费消息的偏移量

偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据。如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。

【1】起始偏移量

在消费者的配置中,我们可以增加偏移量相关参数auto.offset.reset,用于从最开始获取主题数据。

在这里插入图片描述
参数取值有3个:latest、earliest、none。

① earliest

earliest:对于同一个消费者组,从头开始消费。就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费(未提交偏移量的场合)。
在这里插入图片描述

② latest

latest:对于同一个消费者组,消费者只能消费到连接topic后,新产生的数据(未提交偏移量的场合)。

在这里插入图片描述

none:生产环境不使用。

【2】 指定偏移量消费

除了从最开始的偏移量或最后的偏移量读取数据以外,Kafka还支持从指定的偏移量的位置开始消费数据。
在这里插入图片描述

【3】偏移量提交

生产环境中,消费者可能因为某些原因或故障重新启动消费,那么如果不知道之前消费数据的位置,重启后再消费,就可能重复消费(earliest)或漏消费(latest)。所以Kafka提供了保存消费者偏移量的功能,而这个功能需要由消费者进行提交操作。这样消费者重启后就可以根据之前提交的偏移量进行消费了。

注意,一旦消费者提交了偏移量,那么kafka会优先使用提交的偏移量进行消费。此时,auto.offset.reset参数是不起作用的。

① 自动提交

所谓的自动提交就是消费者消费完数据后,无需告知kafka当前消费数据的偏移量,而是由消费者客户端API周期性地将消费的偏移量提交到Kafka中。这个周期默认为5000ms,可以通过配置进行修改。

在这里插入图片描述

② 手动提交

基于时间周期的偏移量提交是我们无法控制的,一旦参数设置的不合理或单位时间内数据量消费的很多,却没有来及的自动提交,那么数据就会重复消费。所以Kafka也支持消费偏移量的手动提交,也就是说当消费者消费完数据后,自行通过API进行提交。

不过为了考虑效率和安全,kafka同时提供了异步提交和同步提交两种方式供我们选择。注意:需要禁用自动提交auto.offset.reset=false,才能开启手动提交

异步提交

向Kafka发送偏移量offset提交请求后,就可以直接消费下一批数据,因为无需等待kafka的提交确认,所以无法知道当前的偏移量一定提交成功,所以安全性比较低,但相对消费性能会提高。

在这里插入图片描述

同步提交

必须等待Kafka完成offset提交请求的响应后,才可以消费下一批数据。一旦提交失败,会进行重试处理,尽可能保证偏移量提交成功(但是依然可能因为意外情况导致提交请求失败)。此种方式消费效率比较低,但是安全性高。
在这里插入图片描述

【4】偏移量保存

由于消费者在消费消息的时候可能会由于各种原因而断开消费,当重新启动消费者时我们需要让它接着上次消费的位置offset继续消费,因此消费者需要实时的记录自己以及消费的位置。

0.90版本之前,这个信息是记录在zookeeper内的,在0.90之后的版本,offset保存在__consumer_offsets这个topic内。

每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,key是consumerGroupId+topic+分区号

在这里插入图片描述

value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据。

在这里插入图片描述
因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),均匀分配到Kafka集群的多个Broker中。Kafka采用hash(consumerGroupId) % __consumer_offsets主题的分区数来计算我们的偏移量提交到哪一个分区。

因为偏移量也是保存到主题中的,所以保存的过程和生产者生产数据的过程基本相同。

【5】消费者事务

无论偏移量使用自动提交还是,手动提交,特殊场景中数据都有可能会出现重复消费。

在这里插入图片描述
如果提前提交偏移量,再处理业务,又可能出现数据丢失的情况。在这里插入图片描述

对于单独的Consumer来讲,事务保证会比较弱,尤其是无法保证提交的信息被精确消费。主要原因就是消费者可以通过偏移量访问信息,而不同的数据文件生命周期不同,同一事务的信息可能会因为重启导致被删除的情况。

所以一般情况下,想要完成kafka消费者端的事务处理,需要将数据消费过程和偏移量提交过程进行原子性绑定。也就是说数据处理完了,必须要保证偏移量正确提交,才可以做下一步的操作。如果偏移量提交失败,那么数据就恢复成处理之前的效果。

对于生产者事务而言,消费者消费的数据也会受到限制。默认情况下,消费者只能消费到生产者提交的数据,也就是未提交完成的数据,消费者是看不到的。


原文地址:https://blog.csdn.net/J080624/article/details/141955013

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!