springboot 配置Kafka 关闭自启动连接
这里写自定义目录标题
springboot 配置Kafka 关闭自启动连接
在Spring Boot应用程序中,默认情况下,Kafka监听器容器会在应用程序启动时自动开始连接到Kafka broker。如果你希望禁用这种自动启动行为,可以通过配置来实现。以下是几种常见的方法:
方法一:使用 @ConditionalOnProperty
你可以使用条件注解来控制Kafka监听器容器的启动。通过设置一个属性来决定是否启用Kafka监听器。
步骤:
- 定义配置属性: 在你的application.yml或application.properties文件中添加一个自定义属性,用于控制Kafka监听器的启用状态。
spring:
kafka:
enabled: false
- 使用 @ConditionalOnProperty 注解: 在你的Kafka监听器类上使用@ConditionalOnProperty注解,根据配置属性来决定是否启用该监听器。
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true")
public class MyKafkaListener {
@KafkaListener(topics = "your-topic-name", groupId = "your-group-id")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
方法二:手动管理Kafka监听器容器
另一种方法是手动管理Kafka监听器容器的生命周期,而不是依赖于Spring Boot的自动配置。
步骤:
- 禁用自动配置: 在你的主应用程序类或配置类上排除KafkaAutoConfiguration。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
@SpringBootApplication(exclude = KafkaAutoConfiguration.class)
public class DeviceExchangeApplication {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
System.out.println("-----------> 数据交换链[device-exchange]启动...");
SpringApplication.run(DeviceExchangeApplication.class, args);
System.out.println("-----------> 数据交换链[device-exchange]启动成功,耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");
}
}
- 手动创建和管理Kafka监听器容器: 创建并管理Kafka监听器容器,以便在需要的时候手动启动它们。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Autowired
private MyKafkaListener myKafkaListener;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public MessageListenerContainer kafkaListenerContainer() {
ConcurrentMessageListenerContainer<String, String> container =
kafkaListenerContainerFactory()
.createContainer("your-topic-name");
container.setupMessageListener(myKafkaListener::listen);
return container;
}
}
- 手动启动Kafka监听器容器: 在需要的时候手动启动Kafka监听器容器。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
@Component
public class KafkaStarter implements CommandLineRunner {
@Autowired
private MessageListenerContainer kafkaListenerContainer;
@Override
public void run(String... args) throws Exception {
// 手动启动Kafka监听器容器
kafkaListenerContainer.start();
}
}
方法三:使用 autoStartup=false
你可以在Kafka监听器容器的配置中设置autoStartup=false,这样它就不会在应用程序启动时自动启动。
步骤:
- 配置 autoStartup=false: 在你的Kafka监听器配置中设置autoStartup=false。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {
@KafkaListener(id = "myListener", topics = "your-topic-name", autoStartup = "false")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
- 手动启动Kafka监听器容器: 使用MessageListenerContainer接口的手动启动方法。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
@Component
public class KafkaStarter {
@Autowired
private MessageListenerContainer myListenerContainer;
public void startKafkaListener() {
myListenerContainer.start();
}
}
总结
通过上述三种方法,你可以有效地控制Kafka监听器容器的自动启动行为。选择适合你项目需求的方法来实现即可。通常情况下,使用@ConditionalOnProperty是最简单和灵活的方式。
结语
以上答案来自大模型,第二种和第三种都比较麻烦,最后采用了第一种方式在所有的消费类上加了@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true")
,启动就很快了,KafkaAdmin 和 KafkaConsumer就没有自动启动了。用kafkaTemplate发送消息还是会去连接Kafka服务器,不影响正常使用。
注意:必须是所有的消费类必须加,不然就不会起作用。
主要场景:一般线上部署环境才会去连接kafka,本地开发的时候 不一定要去连,所以想暂时关闭一下
原文地址:https://blog.csdn.net/jr126/article/details/144607266
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!