自学内容网 自学内容网

springboot整合kafka

springboot整合kafka

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.4.0</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>cn.lhz</groupId>
  <artifactId>kafka</artifactId>
  <version>0.0.1</version>
  <name>kafka</name>
  <description>kafka</description>
  <url/>
  <licenses>
    <license/>
  </licenses>
  <developers>
    <developer/>
  </developers>
  <scm>
    <connection/>
    <developerConnection/>
    <tag/>
    <url/>
  </scm>
  <properties>
    <java.version>21</java.version>
    <jdk.version>21</jdk.version>
    <maven.compiler.source>${jdk.version}</maven.compiler.source>
    <maven.compiler.target>${jdk.version}</maven.compiler.target>
    <maven.compiler.compilerVersion>${jdk.version}</maven.compiler.compilerVersion>
    <maven.compiler.encoding>utf-8</maven.compiler.encoding>
    <project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.test.failure.ignore>true</maven.test.failure.ignore>
    <maven.test.skip>true</maven.test.skip>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-kafka</artifactId>
     </dependency>-->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
      <groupId>com.github.xiaoymin</groupId>
      <artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId>
      <version>4.5.0</version>
    </dependency>
  </dependencies>

  <build>
    <finalName>${project.name}</finalName>
    <plugins>
      <!-- 编译级别 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.13.0</version>
        <configuration>
          <!-- 设置编译字符编码 -->
          <encoding>UTF-8</encoding>
          <!-- 设置编译jdk版本 -->
          <source>${jdk.version}</source>
          <target>${jdk.version}</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <configuration>
          <excludes>
            <exclude>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
            </exclude>
          </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <repositories>
    <repository>
      <id>public</id>
      <name>aliyun nexus</name>
      <url>https://maven.aliyun.com/repository/public</url>
      <releases>
        <enabled>true</enabled>
      </releases>
    </repository>
  </repositories>
  <pluginRepositories>
    <pluginRepository>
      <id>public</id>
      <name>aliyun nexus</name>
      <url>https://maven.aliyun.com/repository/public</url>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </pluginRepository>
  </pluginRepositories>
</project>

application.yml配置kafka

本案例使用最少的配置,在application.yml中添加Kafka的连接配置:

spring:
  application:
    name: springboot-kafka
  kafka:
    bootstrap-servers: lihaozhe01:9092,lihaozhe02:9092,lihaozhe03:9092
    consumer:
      group-id: lihaozhe
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#是否激活 swagger true or false
springdoc:
  swagger-ui:
    path: /swagger-ui
    tags-sorter: alpha
    operations-sorter: alpha
  api-docs:
    path: /v3/api-docs
  group-configs:
    - group: 'default'
      paths-to-match: '/**'
      packages-to-scan: cn.lhz.controller
# knife4j的增强配置,不需要增强可以不配
knife4j:
  enable: true
  setting:
    language: zh_cn
  basic:
    # 启用基本认证
    enable: true
    # 设置用户名
    username: admin
    # 设置密码
    password: lihaozhe
Kafka配置类

创建KafkaConfig.java配置类,用于创建Kafka主题:

package cn.lhz.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 李昊哲
 * @version 1.0.0
 */
@Configuration
@EnableKafka
public class KafkaConfig {
//  @Bean
//  public ProducerFactory<String, String> producerFactory() {
//    Map<String, Object> configProps = new HashMap<>();
//    // 配置属性
//    return new DefaultKafkaProducerFactory<>(configProps);
//  }
//
//  @Bean
//  public KafkaTemplate<String, String> kafkaTemplate() {
//    return new KafkaTemplate<>(producerFactory());
//  }

//  @Bean
//  public NewTopic myTopic() {
//    return new NewTopic("lihaozhe", 1, (short) 1);
//  }
}
Kafka消费者

创建KafkaConsumer.java消费者类,用于从Kafka主题接收消息:

以下两段代码二选一

  • 第一个案例代码只获取 value
  • 第二个案例代码获取了 ConsumerRecord 完整信息
获取value
package cn.lhz.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

/**
 * @author 李昊哲
 * @version 1.0.0
 */
@Service
public class KafkaConsumer {
  /**
   * 从 topic 中获取 value
   *
   * @param value topic 中获取 value
   */
  @KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")
  public void consume(String value) {
    System.out.printf("Consumed: %s\n", value);
  }
}

获取ConsumerRecord
package cn.lhz.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

/**
 * @author 李昊哲
 * @version 1.0.0
 */
@Service
public class KafkaConsumer {
  /**
   * 从 topic 中获取 ConsumerRecord
   *
   * @param record topic 中获取 ConsumerRecord
   */
  @KafkaListener(topics = "lihaozhe", groupId = "lihaozhe")
  public void consume(ConsumerRecord<String, String> record) {
    System.out.printf("Consumed: %s-%d-%s:%s\n", record.topic(), record.partition(), record.key(), record.value());
  }
}

Kafka生产者
package cn.lhz.service;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

/**
 * @author 李昊哲
 * @version 1.0.0
 */
@Service
@RequiredArgsConstructor
public class KafkaProducer {
  private final KafkaTemplate<String, String> kafkaTemplate;

  /**
   * 或者 topic 中的 value
   *
   * @param topic kafka topic
   * @param value kafka topic 中的 value
   */
  public void sendMessage(String topic, String value) {
    kafkaTemplate.send(topic, value);
  }

  /**
   * 或者 topic 中的 value
   *
   * @param topic kafka topic
   * @param key   kafka topic 中的 key
   * @param value kafka topic 中的 value
   */
  public void sendMessage(String topic, String key, String value) {
    kafkaTemplate.send(topic, key, value);
  }
}

创建控制器发送消息
package cn.lhz.controller;

import cn.lhz.service.KafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 李昊哲
 * @version 1.0.0
 */
@RestController
@RequiredArgsConstructor
public class KafkaController {
  private final KafkaProducer kafkaProducer;

  @GetMapping("/send/{message}")
  public String sendMessage(@PathVariable(value = "message") String message) {
    // 只发送 value
    kafkaProducer.sendMessage("lihaozhe", message);
    return "消息:" + message;
  }

  @GetMapping("/send/{key}/{message}")
  public String sendMessage(@PathVariable(value = "key") String key,
                            @PathVariable(value = "message") String message) {
    // 只发送 key 和 value
    kafkaProducer.sendMessage("lihaozhe", key, message);
    return "消息:" + message;
  }
}
配置OpenApi
package cn.lhz.config;

import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Contact;
import io.swagger.v3.oas.models.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.Inet4Address;
import java.net.UnknownHostException;

/**
 * @author 李昊哲
 * @version 1.0.0
 */
@Configuration
@Slf4j
public class OpenApiConfig implements ApplicationListener<WebServerInitializedEvent> {
  @Bean
  public OpenAPI springOpenAPI() {
    Contact contact = new Contact();
    contact.setName("李昊哲");
    contact.setUrl("https://space.bilibili.com/480308139");
    contact.setEmail("646269678@qq.com");
    // 访问路径:http://localhost:8080/doc.html
    // 访问路径:http://localhost:8080/swagger-ui/index.html
    return new OpenAPI().info(new Info()
        .title("SpringBoot Kafka API")
        .description("SpringBoot Kafka Simple Application")
        .contact(contact)
        .version("1.0.0"));
  }

  @Override
  public void onApplicationEvent(WebServerInitializedEvent event) {
    try {
      //获取IP
      String hostAddress = Inet4Address.getLocalHost().getHostAddress();
      //获取端口号
      int port = event.getWebServer().getPort();
      //获取应用名
      String applicationName = event.getApplicationContext().getApplicationName();
      // TODO:这个localhost改成host地址
      log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/doc.html", hostAddress, port, applicationName);
      log.info("项目启动启动成功!接口文档地址: http://{}:{}{}/swagger-ui/index.html", hostAddress, port, applicationName);
    } catch (UnknownHostException e) {
      e.printStackTrace();
    }
  }
}

启动项目测试

springboot kafka

控制台输出

Consumed: hello world

springboot kafka

控制台输出

Consumed: lihaozhe-0-hello:world

原文地址:https://blog.csdn.net/qq_24330181/article/details/143961064

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!