Kafka(四) Consumer消费者
一,基础知识
1,消费者与消费组
-
每个消费者都有对应的消费组,不同消费组之间互不影响。
-
Partition的消息只能被一个消费组中的一个消费者所消费, 但Partition也可能被再平衡分配给新的消费者。
-
一个Topic的不同Partition会根据分配策略(消费者客户端参数partition.assignment strategy)分给不同消费者。
2,Kafka的消息模式
-
如果所有的消费者都属于同一消费组,那么所有的消息都会被均衡地投递给每个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式;
-
如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式;
二,Client开发
1,消费逻辑需要具备以下几个步骤
-
配置消费者参数及创建消费者实例
-
订阅主题
-
拉取消息并消费
-
提交消费位移
-
关闭消费者实例
public class Consumer {
private static final String BROKER_LIST = "localhost:9092";
private static final String TOPIC = "TOPIC-A";
private static final String GROUP_ID = "GROUP-A";
private static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true);
public static Properties initConfig() {
Properties properties = new Properties();
// 以下3个必须
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 客户端ID
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "eris-kafka-consumer");
// 消费组ID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// 自动提交,默认为true
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return properties;
}
public static void main(String[] args) {
Properties properties = initConfig();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Arrays.asList(TOPIC));
try {
while (IS_RUNNING.get()) {
// poll内部封装了消费位移提交、消费者协调器、组协调器、消费者的选举、分区分配与再均衡、心跳等
// Duration用来控制在消费者的缓冲区里没有可用数据时阻塞等待的时间,0表示不等待直接返回
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records) {
print("topic:" + r.topic() + ", patition:" + r.partition() + ", offset:" + r.offset());
print("key:" + r.key() + ", value:" + r.value());
}
}
} catch (WakeupException e) {
// wakeup方法是KafkaConsumer中唯一可以从其他线程里安全调用的方法,调用wakeup后可以退出poll的逻辑,并抛出WakeupException。我们也不需处理WakeupException,它只是一种跳出循环的方式。
} catch (Exception e) {
e.printStackTrace();
} finally {
// maybe commit offset.
kafkaConsumer.close();
}
}
}
2,subscribe有4个重载方法
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
// 在之后如果又创建了新主题,并且与正表达式相匹配,那么这个消费者也可以消费到新添加的Topic
public void subscribe (Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe (Pattern pattern)
3,assign订阅指定的分区
consumer.assign(Arrays.asList(new TopicPartition ("topic-demo", 0))) ;
4,取消订阅
consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());
5,消息消费
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
pr vate final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
//省略若干方法
}
① 根据分区对当前批次消息分类:public List<ConsumerRecord<K, V> records(TopicPart tion partition)
for (TopicPartition tp : records.partitions()) {
for (ConsumerRecord<String, String> record : records.records(tp)) {
System.out.println(record.partition() + ":" + record.value());
}
}
② 根据主题对当前批次消息分类:public Iterable<ConsumerRecord<K, V> records(String topic)
// ConsumerRecords类中并没提供与partitions()类似的topics()方法来查看拉取的消息集中所含的主题列表。
for (String topic : Arrays.asList(TOPIC)) {
for (ConsumerRecord<String, String> record : records.records(topic)) {
System.out.println(record.topic() + ":" + record.value());
}
}
6,反序列化
三,位移提交
1,消费位移提交
2,三个位移的关系
-
lastConsumedOffset:当前消费到的位置,即poll拉到的该分区最后一条消息的offset
-
committed offset:提交的消费位移
-
position:下次拉取的位置
TopicPartition tp = new TopicPartition("topic", 0);
kafkaConsumer.assign(Arrays.asList(tp));
long lastConsumedOffset = 0;
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(tp);
lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 同步提交消费位移
kafkaConsumer.commitSync();
System.out.println("consumed off set is " + lastConsumedOffset);
OffsetAndMetadata offsetAndMetadata = kafkaConsumer.committed(tp);
System.out.println("commited offset is " + offsetAndMetadata.offset());
long posititon = kafkaConsumer.position(tp);
System.out.println("he offset of t he next record is " + posititon);
}
输出结果:
consumed offset is 377
commited offset is 378
the offset of the next record is 378
3,消息丢失与重复消费
-
如果poll后立马提交位移,之后业务异常,再次拉取就从新位移开始,就丢失了数据。
-
如果poll后先处理数据,处理到一半异常了,或者最后提交位移异常,重新拉取会从之前的位移拉,就重复消费了。
4,自动提交位移原理
5,手动提交位移
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//do some logical processing
kafkaConsumer.commitSync();
}
1)commitSync会根据poll拉取的最新位移来进行提交(注意提交的值对应于图3-6 position的位置〉。
2)可以使用带参方法,提交指定位移:commitSync(final Map<TopicPartition OffsetAndMetadata> offsets)
3)没必要每条消息提交一次,可以改为批量提交。
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
1)异步提交在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的poll操作。
2)提交也可能会异常,可引入重试机制。但重试可能出现问题,若第一次commitAsync失败在重试,第二次成功了,然后第一次重试也成功了,就会覆盖位移为之前。解决方案:可以提交时维护一个序号,如果发现过期的序号就不再重试。
try {
while (IS_RUNNING.get()) {
// poll records and do some logical processing .
kafkaConsumer.commitAsync();
}
} finally {
try {
kafkaConsumer.commitSync();
} finally {
kafkaConsumer.close();
}
}
四,暂停或恢复消费
public void pause(Collection<TopicPartition> partitions)
public roid resume(Collection<TopicPartition> partitions)
五,指定位移消费
-
决定从何处消费;
-
找不到消费位移记录时;
-
位移越界时(seek);
-
(默认值)auto.offset.reset=latest,从分区末尾开始消费;
-
auto.offset.reset=earliest,从分区起始开始消费;
// partition:分区,offset:从哪个位置消费
public void seek(TopicPartition partition, long offset)
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
// 如采不为空,则说明已经成功分配到了分区
kafkaConsumer.poll(Duration.ofMillis(1000));
assignment = kafkaConsumer.assignment();
}
for (TopicPartition tp : assignment) {
kafkaConsumer.seek(tp, 10);
}
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
//consume the record
}
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)
给定待查分区和时间戳,返回大于等于该时间戳的第一条消息对应的offset和timestamp,对应于OffsetAndTimestamp中的offset、timestamp字段。
六,消费再均衡
注册:
subscribe(Collection<String> topics, ConsumerRebalanceListener listener) 和 subscribe(Patten pattern, ConsumerRebalanceListener listener)
ConsumerRebalanceListener是一个接口,有2个方法。
(1) void onPartitionsRevoked(Collection<TopicPartition> partitions)
再均衡开始之前和消费者停止读取消息之后被调用。
(2) void onPartitionsAssigned(Collection<TopicPartition> partitions)
新分配分区之后和消费者开始拉取消费之前被调用 。
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
kafkaConsumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
kafkaConsumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//do nothing .
}
});
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//process the record
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
kafkaConsumer.commitAsync(currentOffsets, null);
kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// store offset in DB
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
// 从DB中读取消费位移
kafkaConsumer.seek(tp, getOffsetFromDB(tp));
}
}
}
七,消费者拦截器
// poll方法返回之前调用,可以修改返回的消息内容、按照某种规则过滤消息等
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K , V> records);
// 提交完消费位移之后调用,可以用来记录跟踪所提交的位移信息,比如当使用commitSync的无参方法时,我们不知道提交的消费位移,而onCommit方法却可以做到这一点
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
八,消费者多线程模型
原文地址:https://blog.csdn.net/jiezheee/article/details/140443007
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!