自学内容网 自学内容网

springboot 配置Kafka 关闭自启动连接

springboot 配置Kafka 关闭自启动连接

在Spring Boot应用程序中,默认情况下,Kafka监听器容器会在应用程序启动时自动开始连接到Kafka broker。如果你希望禁用这种自动启动行为,可以通过配置来实现。以下是几种常见的方法:

方法一:使用 @ConditionalOnProperty

你可以使用条件注解来控制Kafka监听器容器的启动。通过设置一个属性来决定是否启用Kafka监听器。
步骤:

  1. 定义配置属性: 在你的application.yml或application.properties文件中添加一个自定义属性,用于控制Kafka监听器的启用状态。
   spring:
     kafka:
       enabled: false
  1. 使用 @ConditionalOnProperty 注解: 在你的Kafka监听器类上使用@ConditionalOnProperty注解,根据配置属性来决定是否启用该监听器。
   import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
   import org.springframework.kafka.annotation.KafkaListener;
   import org.springframework.stereotype.Component;

   @Component
   @ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true")
   public class MyKafkaListener {

       @KafkaListener(topics = "your-topic-name", groupId = "your-group-id")
       public void listen(String message) {
           System.out.println("Received Message: " + message);
       }
   }
   

方法二:手动管理Kafka监听器容器

另一种方法是手动管理Kafka监听器容器的生命周期,而不是依赖于Spring Boot的自动配置。
步骤:

  1. 禁用自动配置: 在你的主应用程序类或配置类上排除KafkaAutoConfiguration。
   import org.springframework.boot.SpringApplication;
   import org.springframework.boot.autoconfigure.SpringBootApplication;
   import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;

   @SpringBootApplication(exclude = KafkaAutoConfiguration.class)
   public class DeviceExchangeApplication {

       public static void main(String[] args) {
           long startTime = System.currentTimeMillis();
           System.out.println("-----------> 数据交换链[device-exchange]启动...");
           SpringApplication.run(DeviceExchangeApplication.class, args);
           System.out.println("-----------> 数据交换链[device-exchange]启动成功,耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");
       }
   }
   
  1. 手动创建和管理Kafka监听器容器: 创建并管理Kafka监听器容器,以便在需要的时候手动启动它们。
   import org.apache.kafka.clients.consumer.ConsumerConfig;
   import org.apache.kafka.common.serialization.StringDeserializer;
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.context.annotation.Bean;
   import org.springframework.context.annotation.Configuration;
   import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
   import org.springframework.kafka.core.ConsumerFactory;
   import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
   import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
   import org.springframework.kafka.listener.MessageListenerContainer;

   import java.util.HashMap;
   import java.util.Map;

   @Configuration
   public class KafkaConfig {

       @Autowired
       private MyKafkaListener myKafkaListener;

       @Bean
       public Map<String, Object> consumerConfigs() {
           Map<String, Object> props = new HashMap<>();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");
           return props;
       }

       @Bean
       public ConsumerFactory<String, String> consumerFactory() {
           return new DefaultKafkaConsumerFactory<>(consumerConfigs());
       }

       @Bean
       public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, String> factory =
                   new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(consumerFactory());
           return factory;
       }

       @Bean
       public MessageListenerContainer kafkaListenerContainer() {
           ConcurrentMessageListenerContainer<String, String> container =
                   kafkaListenerContainerFactory()
                           .createContainer("your-topic-name");
           container.setupMessageListener(myKafkaListener::listen);
           return container;
       }
   }
   
  1. 手动启动Kafka监听器容器: 在需要的时候手动启动Kafka监听器容器。
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.boot.CommandLineRunner;
   import org.springframework.kafka.listener.MessageListenerContainer;
   import org.springframework.stereotype.Component;

   @Component
   public class KafkaStarter implements CommandLineRunner {

       @Autowired
       private MessageListenerContainer kafkaListenerContainer;

       @Override
       public void run(String... args) throws Exception {
           // 手动启动Kafka监听器容器
           kafkaListenerContainer.start();
       }
   }
   

方法三:使用 autoStartup=false

你可以在Kafka监听器容器的配置中设置autoStartup=false,这样它就不会在应用程序启动时自动启动。
步骤:

  1. 配置 autoStartup=false: 在你的Kafka监听器配置中设置autoStartup=false。
   import org.springframework.kafka.annotation.KafkaListener;
   import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
   import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
   import org.springframework.kafka.listener.MessageListenerContainer;
   import org.springframework.stereotype.Component;

   @Component
   public class MyKafkaListener {

       @KafkaListener(id = "myListener", topics = "your-topic-name", autoStartup = "false")
       public void listen(String message) {
           System.out.println("Received Message: " + message);
       }
   }
   
  1. 手动启动Kafka监听器容器: 使用MessageListenerContainer接口的手动启动方法。
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.kafka.listener.MessageListenerContainer;
   import org.springframework.stereotype.Component;

   @Component
   public class KafkaStarter {

       @Autowired
       private MessageListenerContainer myListenerContainer;

       public void startKafkaListener() {
           myListenerContainer.start();
       }
   }
   

总结
通过上述三种方法,你可以有效地控制Kafka监听器容器的自动启动行为。选择适合你项目需求的方法来实现即可。通常情况下,使用@ConditionalOnProperty是最简单和灵活的方式。

结语

以上答案来自大模型,第二种和第三种都比较麻烦,最后采用了第一种方式在所有的消费类上加了@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true"),启动就很快了,KafkaAdmin 和 KafkaConsumer就没有自动启动了。用kafkaTemplate发送消息还是会去连接Kafka服务器,不影响正常使用。
注意:必须是所有的消费类必须加,不然就不会起作用。
主要场景:一般线上部署环境才会去连接kafka,本地开发的时候 不一定要去连,所以想暂时关闭一下


原文地址:https://blog.csdn.net/jr126/article/details/144607266

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!