kafka 超详细的消息订阅与消息消费几种方式
kafka 消息订阅与消息消费几种方式
本文主要内容
-
消费者订阅几种方式
-
订阅多个主题
-
按正则表达式订阅
-
-
消息消费几种方式
-
按分区消费
-
按主题消费
-
不区分
-
“笔者建议一开始学习Kafka最好不要用SpringBoot 集成方式,因为SpringBoot推崇用注解方式,比如
@KafkaListener
等,就可以直接消费,这样不能直接接触kafka-client一些api, 且SpringBoot 给我们提供了很多默认配置,我们几乎零配置
也可以使用,实际上kafka很多配置很重要的,不容忽视。
消费者订阅几种方式
KafkaConsumer
给我们提供了几种订阅消息方式,我们可以订阅多个消息。示例代码如下
kafkaConsumer.subscribe(Arrays.asList("topicA","topicB"));
kafkaConsumer.subscribe(Pattern.compile("topic-*"));
kafkaConsumer.assign(Arrays.asList(new TopicPartition("topicA",0)));
订阅多个主题
void subscribe(Collection<String> topics)
对应上面第一行代码,这是最常见的订阅方式
按正则表达式订阅
void subscribe(Pattern pattern)
符合正则的主题都会被消费
“有人创建了新的主题,并且与正则匹配,消费者也可以消费到
这种方式需要能对多种消息处理,对于一些能通用处理,不感知具体业务数据的场景比较合适。比如B系统需要同步A系统数据,我们按正则订阅,当A系统有新的数据需要同步,这是只需要A发满足条件正则的消息,B系统无需任何改动。
订阅指定分区
void assign(Collection<TopicPartition> partitions);
正常业务不会使用,如果订阅的分区不存在,会报错。一些特殊场景,比如需要精确控制消费者消费消息,自定义分区分配策略时 可能会用到assign 方法
消息消费
“kafka 采用客户端
拉取模式
进行消息消费
poll() 返回所订阅的主题上一组消息ConsumerRecords
,我们可以对消息进行按主题、按分区进行处理,当然可以统一处理,不分主题和分区
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
不区分主题、分区
for(ConsumerRecord<String,String> record : records){
// 处理消息
}
按partition 处理
Set<TopicPartition> topicPartitions = records.partitions();
for(TopicPartition topicPartition : topicPartitions){
List<ConsumerRecord<String, String>> tpRecords = records.records(topicPartition);
}
按主题
Iterable<ConsumerRecord<String,String>> iter = records.records(topic);
思考
kafka 给我们提供了灵活的消息订阅以及消息消费方式,我们需要根据实际业务场景选择。无论哪种场景都离不开 主题
、分区
,最主要的是分区
,当我们选择了某种订阅方式如果主题
、分区
发生了变化 ,消息还能正常消费吗
“选择了按正则订阅消息方式, 后面创建了新的主题,该消息能被正常消费吗
“选择了指定分区订阅, 如果后面扩容了新的分区,新分区消息能消费吗?
List<PartitionInfo> partitionsFor(String topic)
能获取分区情况,如果需要按分区订阅,该方法一定用的上
“按分区维度消费消息,对于手动提交消息位移场景非常有用
“按主题分类处理消息也很常见,因为不同主题消息格式可能是不一样的,根据主题区分,很容易将不同的消息分类处理。
原文地址:https://blog.csdn.net/happycao123/article/details/142367624
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!