自学内容网 自学内容网

Kafka学习笔记

Kafka消息中间件

官网:https://kafka.apache.org/
docker安装kafka教程:https://bugstack.cn/md/road-map/kafka.html

Kafka的几个概念

生产者Producer

消费者Consumer

主题Topic

image.png

分区Partition

一个topic下可以有多个分区。当创建topic时,如果补置顶该topic的partition数量,那么默认就是1个partition。

偏移量Offset

标识每个分区中消息的唯一为止,从0开始。
image.png

SpringBoot集成Kafka开发

依赖配置:

<!--kafka依赖,不是starter依赖-->  
<dependency>  
    <groupId>org.springframework.kafka</groupId>  
    <artifactId>spring-kafka</artifactId>  
</dependency>

配文件:

  1. 服务器连接
spring:  
  application:  
    # 应用名称  
    name: spring-boot-01-kafka-base  
  
  # kafka连接地址 (ip + port)  
  kafka:  
    bootstrap-servers: 10.15.15.201:9092
  1. 生产者 (KafkaProperties)
    属性如下:
    image.png
  2. 消费者 (KafkaProperties)
    属性如下:

写代码:

  • 生产者(写入事件):
@Component  
public class EventProducer {  
  
    // 加入spring-kafka依赖 + .yml配置信息,Springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public void sendEvent() {  
        kafkaTemplate.send("hello-topic", "hello kafka");  
    }  
}
  • 消费者(读取事件):
    默认读的是最新的数据
@Component  
public class EventConsumer {  
  
    // 采用监听的方式接收事件 (消息、数据)  
    @KafkaListener(topics = "hello-topic", groupId = "hello-group")
    public void onEvent(String event) {  
        System.out.println("读取到的事件:" + event);  
    }  
}

去运行:

  • 生产者发送事件Event(消息、数据)
@SpringBootTest  
class SpringBoot01KafkaBaseApplicationTests {  
  
    @Resource  
    private EventProducer eventProducer;  
  
    @Test  
    void test01() {  
        eventProducer.sendEvent();  
    }  
  
}

image.png

  • 消费者接收事件Event(消息、数据)

读取最早的消息
默认情况下,当启动一个新的消费组时,它会从每个分区的最新偏移量(即该分区中最后一条消息的下一个位置)开始消费。如果希望从第一条消息开始消费,需要设置消费者的auto.offset.reset设置为earliest

spring:  
  kafka:  
    consumer:  
      auto-offset-reset: earliest

取值:earliestlatestnoneexception

  • earliest:自动将偏移量重置为最早的偏移量
  • latest:自动将偏移量重置为最新的偏移量
  • none:如果没有为消费者组找到以前的偏移量,则向消费者抛出异常
  • exception:向消费者抛出异常。(spring-kafka不支持)

image.png
注意:如果之前已经用相同的消费者组ID消费过该主题,并且kafka已经保存了该消费者组的偏移量,那么即使你设置了auto.offset.reset=earliest,该设置也不会生效,因为kafka只会在找不到偏移量时使用这个配置。在这种情况下,你需要手动重置偏移量使用一个新的消费者组ID

spring-kafka生产者发送消息

生产者客户端向kafka的主题topic中写入事件

image.png

  1. 发送Message对象
@Component  
public class EventProducer {  
  
    // 加入spring-kafka依赖 + .yml配置信息,Springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public void sendEvent() {  
        // 通过建造者模式创建Message对象  
        Message<String> message = MessageBuilder  
                .withPayload("hello kafka")  
                .setHeader(KafkaHeaders.TOPIC, "test-topic") // 在header中放置topic的名字  
                .build();  
        kafkaTemplate.send(message);  
    }
    
}
  1. 发送ProduceRecord对象
@Component  
public class EventProducer {  
  
    // 加入spring-kafka依赖 + .yml配置信息,Springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  

    public void sendEvent() {  
        // Headers里面是放一些信息(key-value键值对),到时候消费者接收到该消息后,可以拿到Headers里面放的信息  
        Headers headers = new RecordHeaders();  
        headers.add("phone", "15349850538".getBytes(StandardCharsets.UTF_8));  
        headers.add("orderId", "0D1234523452345".getBytes(StandardCharsets.UTF_8));  
          
        ProducerRecord<String, String> record = new ProducerRecord<>(  
                "test-topic", 0, System.currentTimeMillis(), "k1", "hello kafka", headers  
        );  
        kafkaTemplate.send(record);  
    }  
}
  1. 发送指定分区的消息
@Component  
public class EventProducer {  
  
    // 加入spring-kafka依赖 + .yml配置信息,Springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  

    public void sendEvent4() {
    // String topic, Integer partition, Long timestamp, K key, V data
        kafkaTemplate.send("test-topic", 0, System.currentTimeMillis(), "k2", "hello kafka");  
    }  
}
  1. 发送默认topic消息
@Component  
public class EventProducer {  
  
    // 加入spring-kafka依赖 + .yml配置信息,Springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  

    public void sendEvent() {  
        // Integer partition, Long timestamp, K key, V data  
        kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello kafka");  
    }  
}

同时还需要在application.yml中配置默认topic

spring:  
  kafka:  
    # 配置模板默认的主题topic名称  
    template:  
      default-topic: default-topic

获取生产者消息发送结果

  • .send()方法和.sendDefault()方法都返回CompletableFuture<SendResult<K, V>>
  • CompletableFuture是Java 8中引入的一个类,用于异步编程,它表示一个异步计算的结果,这个特性使得调用者不必等待操作完成就能继续执行其他任务,从而提高了应用程序的响应速度和吞吐量
  • 使用 CompletableFuture,.send() 方法可以立即返回一个表示异步操作结果的未来对象,而不是等待操作完成,这样,调用线程可以继续执行其他任务,而不必等待消息发送完成。当消息发送完成时(无论是成功还是失败),CompletableFuture会相应地更新其状态,并允许我们通过回调、阻塞等方式来获取操作结果;
  • 方法一:调用CompletableFutureget()方法,同步阻塞等待发送结果
@Component  
public class EventProducer {  
  
    // 加入spring-kafka依赖 + .yml配置信息,Springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  

    public void sendEvent() throws ExecutionException, InterruptedException {  
        // Integer partition, Long timestamp, K key, V data  
        CompletableFuture<SendResult<String, String>> completableFuture  
                = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello kafka");  
  
        // 怎么拿到结果,通过CompletableFuture这个类拿结果,这个类里面有很多方法  
        // 1. 阻塞等待的方式拿结果  
        SendResult<String, String> sendResult = completableFuture.get();  
  
        if (sendResult.getRecordMetadata() != null) {  
            // 此时kafka这个服务器确认接收到了这个消息  
            System.out.println("消息发送成功:" + sendResult.getRecordMetadata().toString());  
        }  
        System.out.println("producerRecord:" + sendResult.getProducerRecord());  
    }  
  
}
  • 方法二:使用thenAccept(), thenApply(), thenRun()等方法来注册回调函数,回调函数将在CompeletableFuture完成时被执行
@Component  
public class EventProducer {  
  
    // 加入spring-kafka依赖 + .yml配置信息,Springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  

    public void sendEvent7() {  
        // Integer partition, Long timestamp, K key, V data  
        CompletableFuture<SendResult<String, String>> completableFuture  
                = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello kafka");  
  
        // 怎么拿到结果,通过CompletableFuture这个类拿结果,这个类里面有很多方法  
        // 2. 非阻塞的方式拿结果  
        completableFuture.thenAccept((sendResult) -> {  
            if (sendResult.getRecordMetadata() != null) {  
                // 此时kafka这个服务器确认接收到了这个消息  
                System.out.println("消息发送成功:" + sendResult.getRecordMetadata().toString());  
            }  
            System.out.println("producerRecord:" + sendResult.getProducerRecord());  
        }).exceptionally((exception) -> {  
            exception.printStackTrace();  
            // 做失败的处理  
            return null;  
        });  
    }  
  
}

生产者发送对象消息

  1. 注入kafkaTemplate,记得修改K V
@Resource  
private KafkaTemplate<Object, Object> template;
  1. 发送消息代码
@Data  
@Builder  
@AllArgsConstructor  
@NoArgsConstructor  
public class User {  
  
    private int id;  
  
    private String phone;  
  
    private Date birthDay;  
}
public void sendEvent() {  
    User user = User.builder()  
            .id(1)  
            .phone("15349850538")  
            .birthDay(new Date())  
            .build();  
    // 分区为null,则让kafka自己去决定把消息发送哪个分区  
    template.sendDefault(null, System.currentTimeMillis(), "k3", user);  
}
  1. 配置application.yml,指定消息key和消息value的编码(序列化)方式
spring:   
  kafka:  
    bootstrap-servers: 192.168.237.105:9092  
  
    # 配置生产者 (有24个配置)  
    producer:  
      # 默认是StringSerializer.class序列化  
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

kafka核心概念:Replica副本

  • Replica:副本,为实现备份功能,保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且 Kafka仍然能够继续工作,Kafka提供了副本机制,一个topic的每个分区都有1个或多个副本;
  • Replica副本分为Leader Replica和Follower Replica:
    • Leader:每个分区多个副本中的“主”副本,生产者发送数据以及消费者消费数据,都是来自leader副本;
    • Follower:每个分区多个副本中的“从”副本,实时从leader副本中同步数据,保持和leader副本数据的同步,leader副本发生故障时,某个follower副本会成为新的leader副本;
  • 设置副本个数不能为0,也不能大于节点个数,否则将不能创建Topic;
指定topic的分区和副本

执行代码时指定分区和副本

  • kafkaTemplate.send("topic", message)
  • 直接使用send()方法发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区有1个副本,也就是有它自己本身的副本,没有额外的副本备份;
  • 我们可以在项目中新建一个配置类专门用来初始化topic;
@Configuration  
public class KafkaConfig {  
  
    @Bean  
    public NewTopic newTopic() {  
        // 创建一个名为heTopic的Topic并设置分区数为5,分区副本数为1  
        return new NewTopic("heTopic", 5, (short) 1);  
    }  
  
    // 对topic进行更新  
    @Bean  
    public NewTopic updateNewTopic() {  
        // 创建一个名为heTopic的Topic并设置分区数为5,分区副本数为1
        // 如果要修改分区数,只需修改配置值重启项目即可,修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
        return new NewTopic("heTopic", 9, (short) 1);  
    }  
}

生产者发送对象消息的分区策略

消息发送到哪个分区?是什么策略?

生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中;

  1. 默认分配策略:BuiltInPartitioner
    • 有key:Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
    • 没有key:是使用随机数 % numPartitions
  2. 轮询分配策略:RoundRobinPartitioner (接口:Partitioner)
  3. 自定义分配策略:我们自己定义

指定生产者写入消息到topic时的分配策略:

  • 轮询分配策略:RoundRobinPartitioner (接口:Partitioner)
@Configuration  
public class KafkaConfig {  
  
    @Value("${spring.kafka.bootstrap-servers}")  
    private String bootstrapServers;  
  
    @Value("${spring.kafka.producer.key-serializer}")  
    private String keySerializer;  
  
    @Value("${spring.kafka.producer.value-serializer}")  
    private String valueSerializer;  

    /**  
     * 生产者创建工厂  
     * @return  
     */    public ProducerFactory<String, ?> producerFactory() {  
        return new DefaultKafkaProducerFactory<>(producerConfigs());  
    }  
  
    /**  
     * 生产者相关配置  
     * @return  
     */    public Map<String, Object> producerConfigs() {  
        Map<String, Object> props = new HashMap<>();  
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);  
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);  
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);  
        return props;  
    }  
  
    /**  
     * kafkaTemplate 覆盖默认配置类中的kafkaTemplate  
     * @return     */    @Bean  
    public KafkaTemplate<String, ?> kafkaTemplate() {  
        return new KafkaTemplate<>(producerFactory());  
    }  
}
  • 自定义分配策略:XxxPartitioner (接口:Partitioner)
@Configuration  
public class KafkaConfig {  
  
    @Value("${spring.kafka.bootstrap-servers}")  
    private String bootstrapServers;  
  
    @Value("${spring.kafka.producer.key-serializer}")  
    private String keySerializer;  
  
    @Value("${spring.kafka.producer.value-serializer}")  
    private String valueSerializer;  

    /**  
     * 生产者创建工厂  
     * @return  
     */    public ProducerFactory<String, ?> producerFactory() {  
        return new DefaultKafkaProducerFactory<>(producerConfigs());  
    }  
  
    /**  
     * 生产者相关配置  
     * @return  
     */    public Map<String, Object> producerConfigs() {  
        Map<String, Object> props = new HashMap<>();  
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);  
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);  
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitionerConfig.class);   
        return props;  
    }  
  
    /**  
     * kafkaTemplate 覆盖默认配置类中的kafkaTemplate  
     * @return     */    @Bean  
    public KafkaTemplate<String, ?> kafkaTemplate() {  
        return new KafkaTemplate<>(producerFactory());  
    }  
}
public class CustomerPartitionerConfig implements Partitioner {  
  
    private AtomicInteger nextPartition = new AtomicInteger(0);  
  
    @Override  
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {  
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);  
        int numPartitions = partitions.size();  
  
        if (key == null) {  
            // 使用轮询方式选择分区  
            int next = nextPartition.getAndIncrement();  
            if (next >= numPartitions) {  
                nextPartition.compareAndSet(next, 0);  
            }  
            System.out.println("分区值:" + next);  
            return next;  
        } else {  
            // 如果key不为null,则使用默认的分区策略  
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;  
        }  
    }  
  
    @Override  
    public void close() {  
  
    }  
    @Override  
    public void configure(Map<String, ?> configs) {  
  
    }  
}

生产者发送消息流程

image.png

拦截生产者发送的消息

  • 自定义拦截器拦截消息的发送;
  • 实现ProducerInterceptor<K, V>接口;
public class CustomerProducerInterceptor implements ProducerInterceptor<String, Object> {  
  
    /**  
     * 发送消息时,会先调用该方法,对消息进行拦截,可以在拦截中对消息进行处理,如记录日志等操作  
     * @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.  
     * @return     */    @Override  
    public ProducerRecord<String, Object> onSend(ProducerRecord record) {  
        System.out.println("拦截消息:" + record.toString());  
        return record;  
    }  
  
    /**  
     * 服务器收到消息后的一个确认  
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset).  
     *                 If an error occurred, metadata will contain only valid topic and maybe     *                 partition. If partition is not given in ProducerRecord and an error occurs     *                 before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.     *                 The metadata may be null if the client passed null record to     *                 {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.  
     * @param exception The exception thrown during processing of this record. Null if no error occurred.  
     */    
    @Override  
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {  
        if (null != metadata) {  
            System.out.println("服务器收到了该消息:" + metadata.offset());  
        } else {  
            System.out.println("消息发送失败了, exception = " + exception.getMessage());  
        }  
    }  
  
    @Override  
    public void close() {  
  
    }  
    @Override  
    public void configure(Map<String, ?> configs) {  
  
    }
}
@Configuration  
public class KafkaConfig {  
  
    @Value("${spring.kafka.bootstrap-servers}")  
    private String bootstrapServers;  
  
    @Value("${spring.kafka.producer.key-serializer}")  
    private String keySerializer;  
  
    @Value("${spring.kafka.producer.value-serializer}")  
    private String valueSerializer;  
  
    /**  
     * 生产者创建工厂  
     * @return  
     */    
    public ProducerFactory<String, ?> producerFactory() {  
        return new DefaultKafkaProducerFactory<>(producerConfigs());  
    }  
  
    /**  
     * 生产者相关配置  
     * @return  
     */    
    public Map<String, Object> producerConfigs() {  
        Map<String, Object> props = new HashMap<>();  
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);  
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);  
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitionerConfig.class);  
  
        // 添加一个拦截器  
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomerProducerInterceptor.class.getName());  
        return props;  
    }  
  
    /**  
     * kafkaTemplate 覆盖默认配置类中的kafkaTemplate  
     * @return     
     */    
    @Bean  
    public KafkaTemplate<String, ?> kafkaTemplate() {  
        return new KafkaTemplate<>(producerFactory());  
    }  
}

获取生产者发送的消息

获取生产者发送的字符串消息
@Component  
public class EventConsumer {  
  
    // 采用监听的方式接收事件 (消息、数据)  
    @KafkaListener(topics = "helloTopic", groupId = "helloGroup")  
    public void onEvent(@Payload String event,  
                        @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,  
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition) {  
        System.out.println("读取到的事件:" + event + ", topic : " + topic + ", partition: " + partition);  
    }  
}

注解:

  • @Payload:标记该参数是消息体的内容
  • @Header:标记该参数是消息头的内容
@Component  
public class EventConsumer {  
  
    // 采用监听的方式接收事件 (消息、数据)  
    @KafkaListener(topics = "helloTopic", groupId = "helloGroup")  
    public void onEvent(@Payload String event,  
                        @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,  
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,  
                        ConsumerRecord<String, String> record) {  
        System.out.println("读取到的事件:" + event + ", topic : " + topic + ", partition: " + partition);  
        System.out.println("读取到的事件:" + record.toString());  
    }  
}
获取生产者发送的对象消息

需要将对象转换成JSON数据,否则会报包不被信任的异常。

public class JSONUtils {  
  
    // 创建对象映射工具类  
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();  
  
    /**  
     * 将对象转换成JSON  
     * @param object  
     * @return  
     * @throws JsonProcessingException  
     */  
    public static String toJSON(Object object) throws JsonProcessingException {  
        // 把对象以字符串的形式写出去,就变成了json  
        return OBJECT_MAPPER.writeValueAsString(object);  
    }  
  
    /**  
     * 将JSON转换成对象  
     * @param json  
     * @param clazz  
     * @return  
     * @param <T>  
     * @throws JsonProcessingException  
     */  
    public static <T> T toBean(String json, Class<T> clazz) throws JsonProcessingException {  
        return OBJECT_MAPPER.readValue(json, clazz);  
    }  
}
@Component  
public class EventProducer {  
  
    //加入了spring-kafka依赖 + .yml配置信息,springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  

    public void sendEvent() throws JsonProcessingException {  
        User user = User.builder()  
                .id(1209)  
                .phone("1235324234")  
                .birthDay(new Date())  
                .build();  
        String userJSON = JSONUtils.toJSON(user);  
        kafkaTemplate.send("helloTopic", userJSON);  
    }  
}
@Component  
public class EventConsumer {  

    @KafkaListener(topics = "helloTopic", groupId = "helloGroup")  
    public void onEvent(String userJSON,  
                        @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,  
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,  
                        ConsumerRecord<String, String> record) throws JsonProcessingException {  
        User user = JSONUtils.toBean(userJSON, User.class);  
        System.out.println("读取到的事件:" + user + ", topic : " + topic + ", partition: " + partition);  
        System.out.println("读取到的事件:" + record.toString());  
    }
    
}
通过占位符接收消息
@Component  
public class EventConsumer {  
  
    @KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.consumer.group}")  
    public void onEvent(String userJSON,  
                        @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,  
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,  
                        ConsumerRecord<String, String> record) throws JsonProcessingException {  
        User user = JSONUtils.toBean(userJSON, User.class);  
        System.out.println("读取到的事件:" + user + ", topic : " + topic + ", partition: " + partition);  
        System.out.println("读取到的事件:" + record.toString());  
    }  
}
# 自定义配置,不是框架提供的  
kafka:  
  topic:  
    name: helloTopic  
  consumer:  
    group: helloGroup
手动消息确认
@Component  
public class EventConsumer {  
  
    // 采用监听的方式接收事件 (消息、数据)  
    @KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.consumer.group}")  
    public void onEvent(String userJSON,  
                        @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,  
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,  
                        ConsumerRecord<String, String> record,  
                        Acknowledgment ack) throws JsonProcessingException {  
        User user = JSONUtils.toBean(userJSON, User.class);  
        System.out.println("读取到的事件:" + user + ", topic : " + topic + ", partition: " + partition);  
        System.out.println("读取到的事件:" + record.toString());  
  
        ack.acknowledge(); // 手动确认消息,告诉kafka服务器,该消息我已经收到了,默认情况下kafka是自动确认  
    }  
}
spring:  
  # kafka连接地址 (ip + port)  
  kafka:  
    bootstrap-servers: 192.168.237.105:9092   
    # 配置消息监听器  
    listener:  
      # 开启消息监听的手动确认模式  
      ack-mode: manual

默认情况下,Kafka消费者消费消息后会自动发送确认信息给Kafka服务器,表示消息已经被成功消费。但在某些场景下,我们希望在消息处理成功后再发送确认,或者在消息处理失败时选择不发送确认,以便Kafka能够重新发送该消息;

指定topic、partition、offset消费
@Component  
public class EventConsumer {  

    @KafkaListener(groupId = "${kafka.consumer.group}", topicPartitions = {  
            @TopicPartition(  
                    topic = "${kafka.topic.name}",  
                    partitions = {"0", "1", "2"},  
                    partitionOffsets = {  
                            @PartitionOffset(partition = "3", initialOffset = "3"),  
                            @PartitionOffset(partition = "4", initialOffset = "3")  
                    })  
    })  
    public void onEvent(String userJSON,  
                        @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,  
                        @Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,  
                        ConsumerRecord<String, String> record,  
                        Acknowledgment ack) {  
        try {  
            // 收到消息后,处理业务  
            User user = JSONUtils.toBean(userJSON, User.class);  
            System.out.println("读取到的事件:" + user + ", topic : " + topic + ", partition: " + partition);  
            System.out.println("读取到的事件:" + record.toString());  
  
            // 业务处理完成,给kafka服务器确认  
            ack.acknowledge(); // 手动确认消息,告诉kafka服务器,该消息我已经收到了,默认情况下kafka是自动确认  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}
@Component  
public class EventProducer {  
  
    //加入了spring-kafka依赖 + .yml配置信息,springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public void sendEvent() throws JsonProcessingException {  
        for (int i = 0; i < 25; i++) {  
            User user = User.builder()  
                    .id(i)  
                    .phone("1235324234" + i)  
                    .birthDay(new Date())  
                    .build();  
            String userJSON = JSONUtils.toJSON(user);  
            kafkaTemplate.send("helloTopic", "k" + i, userJSON);  
        }  
    }  
  
}
@Configuration  
public class KafkaConfig {  
  
    @Bean  
    public NewTopic newTopic() {  
        // 创建一个名为heTopic的Topic并设置分区数为5,分区副本数为1  
        return new NewTopic("helloTopic", 5, (short) 1);  
    }  
  
}

消费者批量消费消息

  1. 设置application.properties开启批量消费;
spring:  
  application:  
    # 应用名称  
    name: spring-boot-03-kafka-base  
  
  # kafka连接地址 (ip + port)  kafka:  
    bootstrap-servers: 192.168.237.105:9092  
  
    # 配置消息监听器  
    listener:  
      # 设置批量消费,默认是单个消息消费  
      type: batch  
  
    consumer:  
      # 批量消费每次最多消费多少条消息  
      max-poll-records: 20
      # 从第一条消息开始接收  
  auto-offset-reset: earliest
  1. 接收消息时用LIst来接收
@Component  
public class EventConsumer {  
  
    @KafkaListener(topics = {"batchTopic"}, groupId = "batchGroup2")  
    public void onEvent(List<ConsumerRecord<String, String>> records) {  
        System.out.println("批量消费,records.size() = " + records.size() + ",records = " + records);  
    }  
  
}
@Component  
public class EventProducer {  
  
    //加入了spring-kafka依赖 + .yml配置信息,springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public void sendEvent() throws JsonProcessingException {  
        for (int i = 0; i < 125; i++) {  
            User user = User.builder()  
                    .id(i)  
                    .phone("1235324234" + i)  
                    .birthDay(new Date())  
                    .build();  
            String userJSON = JSONUtils.toJSON(user);  
            kafkaTemplate.send("batchTopic", "k" + i, userJSON);  
        }  
    }  
  
}

消费消息时的消息拦截

在消息消费之前,我们可以通过配置拦截器对消息进行拦截,在消息被实际处理之前对其进行一些操作,例如记录日志、修改消息内容或执行一些安全检查等;

  1. 实现kafka的ConsumerInterceptor拦截器接口
/**  
 * 自定义的消费者拦截器  
 */  
public class CustomerConsumerInterceptor implements ConsumerInterceptor<String, String> {  
  
    /**  
     * 在消费消息之前执行  
     *  
     * @param records records to be consumed by the client or records returned by the previous interceptors in the list.  
     * @return     
     */    
    @Override  
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {  
        System.out.println("onConsumer方法执行:" + records);  
        return records;  
    }  
  
    /**  
     * 消息拿到之后,提交offset之前执行该方法  
     *  
     * @param offsets A map of offsets by partition with associated metadata  
     */    
    @Override  
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {  
        System.out.println("onCommit方法执行:" + offsets);  
    }  
  
    @Override  
    public void close() {  
  
    }  
    @Override  
    public void configure(Map<String, ?> configs) {  
  
    }  
}
  1. 在kafka消费者的ConsumerFactory配置中注册这个拦截器
@Configuration  
public class KafkaConfig {  
  
    @Value("${spring.kafka.bootstrap-servers}")  
    private String bootstrapServers;  
  
    @Value("${spring.kafka.consumer.key-deserializer}")  
    private String keyDeserializer;  
  
    @Value("${spring.kafka.consumer.value-deserializer}")  
    private String valueDeserializer;  
  
    /**  
     * 消费者相关配置  
     *  
     * @return     
     */    
    public Map<String, Object> consumerConfigs() {  
        Map<String, Object> props = new HashMap<>();  
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);  
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);  
  
        // 添加一个消费者拦截器  
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomerConsumerInterceptor.class.getName());  
  
        return props;  
    }  
  
    /**  
     * 消费者创建工厂  
     * @return  
     */    
    public ConsumerFactory<String, String> consumerFactory() {  
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());  
    }  
  
}
  1. 监听消息时使用我们的监听器容器工厂Bean
@Configuration  
public class KafkaConfig {  
  
    @Value("${spring.kafka.bootstrap-servers}")  
    private String bootstrapServers;  
  
    @Value("${spring.kafka.consumer.key-deserializer}")  
    private String keyDeserializer;  
  
    @Value("${spring.kafka.consumer.value-deserializer}")  
    private String valueDeserializer;  
  
    /**  
     * 消费者相关配置  
     *  
     * @return     
     */    
    public Map<String, Object> consumerConfigs() {  
        Map<String, Object> props = new HashMap<>();  
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);  
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);  
  
        // 添加一个消费者拦截器  
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomerConsumerInterceptor.class.getName());  
  
        return props;  
    }  
  
    /**  
     * 消费者创建工厂  
     * @return  
     */    
    @Bean  
    public ConsumerFactory<String, String> ourConsumerFactory() {  
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());  
    }  
  
    @Bean  
    public KafkaListenerContainerFactory<?> ourKafkaListenerContainerFactory(ConsumerFactory<String, String> ourConsumerFactory) {  
        ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();  
        concurrentKafkaListenerContainerFactory.setConsumerFactory(ourConsumerFactory);  
        return concurrentKafkaListenerContainerFactory;  
    }  
  
}
@Component  
public class EventConsumer {  
  
    @KafkaListener(topics = {"batchTopic"}, groupId = "batchGroup2", containerFactory = "ourKafkaListenerContainerFactory")  
    public void onEvent(ConsumerRecord<String, String> record) {  
        System.out.println("消息消费,records = " + record);  
    }  
  
}

创建生产者

@Component  
public class EventProducer {  
  
    //加入了spring-kafka依赖 + .yml配置信息,springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public void sendEvent() throws JsonProcessingException {  
        User user = User.builder()  
                .id(1028)  
                .phone("1235324234312")  
                .birthDay(new Date())  
                .build();  
        String userJSON = JSONUtils.toJSON(user);  
        kafkaTemplate.send("interceptorTopic", "k", userJSON);  
    }  
  
}

消息转发

消息转发就是应用A从TopicA接收到消息,经过处理后转发到TopicB,再由应用B监听接收该消息,即一个应用处理完成后将该消息转发至其他应用处理,这在实际开发中,是可能存在这样的需求的;

@Component  
public class EventConsumer {  
  
    @KafkaListener(topics = {"topicA"}, groupId = "aGroup")  
    @SendTo(value = "topicB")  
    public String onEventA(ConsumerRecord<String, String> record) {  
        System.out.println("消息A消费,records = " + record);  
        return record.value() + "--forward message";  
    }  
  
    @KafkaListener(topics = {"topicB"}, groupId = "bGroup")  
    public void onEventB(ConsumerRecord<String, String> record) {  
        System.out.println("消息B消费,records = " + record);  
    }  
  
}

消息消费的分区策略

Kafka消费消息时的分区策略:是指Kafka主题topic中哪些分区应该由哪些消费者来消费;
image.png

Kafka有多种分区分配策略,默认的分区分配策略是RangeAssignor,除了RangeAssignor策略外,Kafka还有其他分区分配策略:

  • RoundRobinAssignor
  • StickyAssignor
  • CooperativeStickyAssignor

这些策略各有特点,可以根据实际的应用场景和需求来选择适合的分区分配策略
image.png

RangeAssignor分区策略

Kafka默认的消费分区分配策略:RangeAssignor;假设如下:

  • 一个主题myTopic有10个分区;(p0 - p9)
  • 一个消费者组内有3个消费者:consumer1、consumer2、consumer3;

RangeAssignor消费分区策略:RangeAssignor策略是根据消费者组内的消费者数量和主题的分区数量,来均匀地为每个消费者分配分区。

  1. 计算每个消费者应得的分区数: 分区总数(10)/ 消费者数量(3)= 3 … 余1;
    • 每个消费者理论上应该得到3个分区,但由于有余数1,所以前1个消费者会多得到一个分区;
    • consumer1(作为第一个消费者)将得到 3 + 1 = 4 个分区;
    • consumer2 和 consumer3 将各得到 3 个分区;
  2. 具体分配: 分区编号从0到9,按照编号顺序为消费者分配分区:
    • consumer1 将分配得到分区 0、1、2、3;
    • consumer2 将分配得到分区 4、5、6;
    • consumer3 将分配得到分区 7、8、9;
@Component  
public class EventConsumer {  
  
    @KafkaListener(topics = {"myTopic"}, groupId = "myGroup", concurrency = "3")  
    public void onEventA(ConsumerRecord<String, String> record) {  
        System.out.println(Thread.currentThread().getName() + " -->消息消费,records = " + record);  
    }  
  
}
@Component  
public class EventProducer {  
  
    //加入了spring-kafka依赖 + .yml配置信息,springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public void sendEvent() throws JsonProcessingException {  
        for (int i = 0; i < 100; i++) {  
            User user = User.builder().id(1028+i).phone("1370909090"+i).birthDay(new Date()).build();  
            String userJSON = JSONUtils.toJSON(user);  
            kafkaTemplate.send("myTopic", "k" + i, userJSON);  
        }  
    }  
  
}
@Configuration  
public class KafkaConfig {  
  
    @Bean  
    public NewTopic newTopic() {  
        // 创建一个名为heTopic的Topic并设置分区数为5,分区副本数为1  
        return new NewTopic("myTopic", 10, (short) 1);  
    }  
  
}
RoundRobinAssignor分区策略

继续以前面的例子数据,采用RoundRobinAssignor策略进行测试,得到的结果如下:

  • consumer1:0、3、6、9
  • consumer2:1、4、7
  • consumer3:2、5、8
@Configuration  
public class KafkaConfig {  
  
    @Value("${spring.kafka.bootstrap-servers}")  
    private String bootstrapServers;  
  
    @Value("${spring.kafka.consumer.value-deserializer}")  
    private String valueDeserializer;  
  
    @Value("${spring.kafka.consumer.value-deserializer}")  
    private String keyDeserializer;  
  
    @Value("${spring.kafka.consumer.auto-offset-reset}")  
    private String autoOffsetReset;  
  
    @Bean  
    public NewTopic newTopic() {  
        // 创建一个名为heTopic的Topic并设置分区数为5,分区副本数为1  
        return new NewTopic("myTopic", 10, (short) 1);  
    }  
  
    /**  
     * 消费者相关配置  
     *  
     * @return  
     */  
    public Map<String, Object> consumerConfigs() {  
        Map<String, Object> props = new HashMap<>();  
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);  
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);  
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);  
  
        // 指定使用轮询的消息消费区分策略  
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());  
  
        return props;  
    }  
  
    /**  
     * 消费者创建工厂  
     * @return  
     */  
    @Bean  
    public ConsumerFactory<String, String> ourConsumerFactory() {  
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());  
    }  
  
    /**  
     * 创建监听器容器工厂  
     *  
     * @param ourConsumerFactory  
     * @return  
     */  
    @Bean  
    public KafkaListenerContainerFactory<?> ourKafkaListenerContainerFactory(ConsumerFactory<String, String> ourConsumerFactory) {  
        ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();  
        concurrentKafkaListenerContainerFactory.setConsumerFactory(ourConsumerFactory);  
        return concurrentKafkaListenerContainerFactory;  
    }  
  
}
@Component  
public class EventConsumer {  
  
    @KafkaListener(topics = {"myTopic"}, groupId = "myGroup2", concurrency = "3", containerFactory = "ourKafkaListenerContainerFactory")  
    public void onEventA(ConsumerRecord<String, String> record) {  
        System.out.println(Thread.currentThread().getId() + " -->消息消费,records = " + record);  
    }  
  
}
@Component  
public class EventProducer {  
  
    //加入了spring-kafka依赖 + .yml配置信息,springboot自动配置好了kafka,自动装配好了KafkaTemplate这个Bean  
    @Resource  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public void sendEvent() throws JsonProcessingException {  
        for (int i = 0; i < 100; i++) {  
            User user = User.builder().id(1028+i).phone("1370909090"+i).birthDay(new Date()).build();  
            String userJSON = JSONUtils.toJSON(user);  
            kafkaTemplate.send("myTopic", "k" + i, userJSON);  
        }  
    }  
  
}
StickyAssignor消费分区策略
  • 尽可能保持消费者与分区之间的分配关系不变,即使消费组的消费者成员发生变化,减少不必要的分区重分配;
  • 尽量保持现有的分区分配不变,仅对新加入的消费者或离开的消费者进行分区调整。这样,大多数消费者可以继续消费它们之前消费的分区,只有少数消费者需要处理额外的分区;所以叫“粘性”分配;
CooperativeStickyAssignor消费分区策略
  • 与 StickyAssignor 类似,但增加了对协作式重新平衡的支持,即消费者可以在它离开消费者组之前通知协调器,以便协调器可以预先计划分区迁移,而不是在消费者突然离开时立即进行分区重分配;

Kafka事件(消息、数据)的存储

  • kafka的所有事件(消息、数据)都存储在/tmp/kafka-logs目录中,可通过log.dirs=/tmp/kafka-logs配置;
  • Kafka的所有事件(消息、数据)都是以日志文件的方式来保存;
  • Kafka一般都是海量的消息数据,为了避免日志文件过大,日志文件被存放在多个日志目录下,日志目录的命名规则为:<topic_name>-<partition_id>
  • 比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0firstTopic-1firstTopic-2
    • 00000000000000000000.index 消息索引文件
    • 00000000000000000000.log 消息数据文件
    • 00000000000000000000.timeindex 消息的时间戳索引文件
    • 00000000000000000006.snapshot 快照文件,生产者发生故障或重启时能够恢复并继续之前的操作
    • leader-epoch-checkpoint 记录每个分区当前领导者的epoch以及领导者开始写入消息时的起始偏移量
    • partition.metadata 存储关于特定分区的元数据(metadata)信息
  • 每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset;
  • 在kafka中,有一个__consumer_offsets的topic, 消费者消费提交的offset信息会写入到 该topic中,__consumer_offsets保存了每个consumer group某一时刻提交的offset信息,__consumer_offsets默认有50个分区;
  • consumer_group 保存在哪个分区中的计算公式:Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ;

Offset详解

  1. 生产者Offset
    • 生产者发送一条消息到Kafka的broker的某个topic下某个partition中;
    • Kafka内部会为每条消息分配一个唯一的offset,该offset就是该消息在partition中的位置;
  2. 消费者Offset
    • 消费者offset是消费者需要知道自己已经读取到哪个位置了,接下来需要从哪个位置开始继续读取消息;
    • 每个消费者组(Consumer Group)中的消费者都会独立地维护自己的offset,当消费者从某个partition读取消息时,它会记录当前读取到的offset,这样,即使消费者崩溃或重启,它也可以从上次读取的位置继续读取,而不会重复读取或遗漏消息;(注意:消费者offset需要消费消息并提交后才记录offset

image.png

每个消费者组启动开始监听消息,默认从消息的最新的位置开始监听消息,即把最新的位置作为消费者offset;

  • 分区中还没有发送过消息,则最新的位置就是0;
  • 分区中已经发送过消息,则最新的位置就是生产者offset的下一个位置;
    消费者消费消息后,如果不提交确认(ack),则offset不更新,提交了才更新;
    命令行命令:./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group osGroup --describe
    结论: 消费者从什么位置开始消费,就看消费者的offset是多少,消费者offset是多少,它启动后,可以通过上面的命令查看;

原文地址:https://blog.csdn.net/qq_51313170/article/details/143783374

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!