springboot整合kafka
springboot整合kafka
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.lhz</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1</version>
<name>kafka</name>
<description>kafka</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>21</java.version>
<jdk.version>21</jdk.version>
<maven.compiler.source>${jdk.version}</maven.compiler.source>
<maven.compiler.target>${jdk.version}</maven.compiler.target>
<maven.compiler.compilerVersion>${jdk.version}</maven.compiler.compilerVersion>
<maven.compiler.encoding>utf-8</maven.compiler.encoding>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.test.failure.ignore>true</maven.test.failure.ignore>
<maven.test.skip>true</maven.test.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.name}</finalName>
<plugins>
<!-- 编译级别 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<!-- 设置编译字符编码 -->
<encoding>UTF-8</encoding>
<!-- 设置编译jdk版本 -->
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>public</id>
<name>aliyun nexus</name>
<url>https://maven.aliyun.com/repository/public</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>public</id>
<name>aliyun nexus</name>
<url>https://maven.aliyun.com/repository/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
application.yml配置kafka
本案例使用最少的配置,在
application.yml
中添加Kafka的连接配置:
spring:
application:
name: springboot-kafka
kafka:
bootstrap-servers: lihaozhe01:9092,lihaozhe02:9092,lihaozhe03:9092
consumer:
group-id: lihaozhe
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#是否激活 swagger true or false
springdoc:
swagger-ui:
path: /swagger-ui
tags-sorter: alpha
operations-sorter: alpha
api-docs:
path: /v3/api-docs
group-configs:
- group: 'default'
paths-to-match: '/**'
packages-to-scan: cn.lhz.controller
# knife4j的增强配置,不需要增强可以不配
knife4j:
enable: true
setting:
language: zh_cn
basic:
# 启用基本认证
enable: true
# 设置用户名
username: admin
# 设置密码
password: lihaozhe
Kafka配置类
创建
KafkaConfig.java
配置类,用于创建Kafka主题:
package cn.lhz.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author 李昊哲
* @version 1.0.0
*/
@Configuration
@EnableKafka
public class KafkaConfig {
// @Bean
// public ProducerFactory<String, String> producerFactory() {
// Map<String, Object> configProps = new HashMap<>();
// // 配置属性
// return new DefaultKafkaProducerFactory<>(configProps);
// }
//
// @Bean
// public KafkaTemplate<String, String> kafkaTemplate() {
// return new KafkaTemplate<>(producerFactory());
// }
// @Bean
// public NewTopic myTopic() {
// return new NewTopic("lihaozhe", 1, (short) 1);
// }
}
Kafka消费者
创建
KafkaConsumer.java
消费者类,用于从Kafka主题接收消息:以下两段代码二选一
- 第一个案例代码只获取
value
值- 第二个案例代码获取了
ConsumerRecord
完整信息
获取value
package cn.lhz.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
/**
* @author 李昊哲
* @version 1.0.0
*/
@Service
public class KafkaConsumer {
/**
* 从 topic 中获取 value
*
* @param value topic 中获取 value
*/
@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")
public void consume(String value) {
System.out.printf("Consumed: %s\n", value);
}
}
获取ConsumerRecord
package cn.lhz.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
/**
* @author 李昊哲
* @version 1.0.0
*/
@Service
public class KafkaConsumer {
/**
* 从 topic 中获取 ConsumerRecord
*
* @param record topic 中获取 ConsumerRecord
*/
@KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")
public void consume(ConsumerRecord<String, String> record) {
System.out.printf("Consumed: %s-%d-%s:%s\n", record.topic(), record.partition(), record.key(), record.value());
}
}
Kafka生产者
package cn.lhz.service;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author 李昊哲
* @version 1.0.0
*/
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
/**
* 或者 topic 中的 value
*
* @param topic kafka topic
* @param value kafka topic 中的 value
*/
public void sendMessage(String topic, String value) {
kafkaTemplate.send(topic, value);
}
/**
* 或者 topic 中的 value
*
* @param topic kafka topic
* @param key kafka topic 中的 key
* @param value kafka topic 中的 value
*/
public void sendMessage(String topic, String key, String value) {
kafkaTemplate.send(topic, key, value);
}
}
创建控制器发送消息
package cn.lhz.controller;
import cn.lhz.service.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 李昊哲
* @version 1.0.0
*/
@RestController
@RequiredArgsConstructor
public class KafkaController {
private final KafkaProducer kafkaProducer;
@GetMapping("/send/{message}")
public String sendMessage(@PathVariable(value = "message") String message) {
// 只发送 value
kafkaProducer.sendMessage("lihaozhe", message);
return "消息:" + message;
}
@GetMapping("/send/{key}/{message}")
public String sendMessage(@PathVariable(value = "key") String key,
@PathVariable(value = "message") String message) {
// 只发送 key 和 value
kafkaProducer.sendMessage("lihaozhe", key, message);
return "消息:" + message;
}
}
配置OpenApi
package cn.lhz.config;
import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Contact;
import io.swagger.v3.oas.models.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.Inet4Address;
import java.net.UnknownHostException;
/**
* @author 李昊哲
* @version 1.0.0
*/
@Configuration
@Slf4j
public class OpenApiConfig implements ApplicationListener<WebServerInitializedEvent> {
@Bean
public OpenAPI springOpenAPI() {
Contact contact = new Contact();
contact.setName("李昊哲");
contact.setUrl("https://space.bilibili.com/480308139");
contact.setEmail("646269678@qq.com");
// 访问路径:http://localhost:8080/doc.html
// 访问路径:http://localhost:8080/swagger-ui/index.html
return new OpenAPI().info(new Info()
.title("SpringBoot Kafka API")
.description("SpringBoot Kafka Simple Application")
.contact(contact)
.version("1.0.0"));
}
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
try {
//获取IP
String hostAddress = Inet4Address.getLocalHost().getHostAddress();
//获取端口号
int port = event.getWebServer().getPort();
//获取应用名
String applicationName = event.getApplicationContext().getApplicationName();
// TODO:这个localhost改成host地址
log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/doc.html", hostAddress, port, applicationName);
log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/swagger-ui/index.html", hostAddress, port, applicationName);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
启动项目测试
控制台输出
Consumed: hello world
控制台输出
Consumed: lihaozhe-0-hello:world
原文地址:https://blog.csdn.net/qq_24330181/article/details/143961064
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!