Spring Boot教程之五十六:用 Apache Kafka 消费 JSON 消息
Spring Boot | 如何使用 Apache Kafka 消费 JSON 消息
Apache Kafka 是一个流处理系统,可让您在进程、应用程序和服务器之间发送消息。在本文中,我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。
为了了解如何创建 Spring Boot 项目,请参阅本文。
工作步骤
步骤 1:
转到Spring 初始化程序并创建具有以下依赖项的启动项目:
Spring for Apache Kafka
步骤 2:
在 IDE 中打开项目并同步依赖项。在本文中,我们将创建一个学生模型,我们将在其中发布学生详细信息。因此,创建一个模型类Student。添加数据成员并创建构造函数并重写toString方法以查看 JSON 格式的消息。以下是学生类的实现:
- 学生模型
// Java program to implement a // student class
// Creating a student class public class Student {
// Data members of the class int id; String firstName; String lastName;
// Constructor of the student // Class public Student() { }
// Parameterized constructor of // the student class public Student(int id, String firstName, String lastName) { this.id = id; this.firstName = firstName; this.lastName = lastName; }
@Override public String toString() { return "Student{" + "id = " + id + ", firstName = '" + firstName + "'" + ", lastName = '" + lastName + "'" + "}"; } } |
步骤 3:
创建一个新的类Config并添加注释@Configuration和@EnableKafka。现在使用 Student 类对象创建 Bean ConsumerFactory和ConcurrentKafkaListenerContainerFactory 。
- 配置类
@EnableKafka @Configuration public class Config {
// Function to establish a connection // between Spring application // and Kafka server @Bean public ConsumerFactory<String, Student> studentConsumer() {
// HashMap to store the configurations Map<String, Object> map = new HashMap<>();
// put the host IP in the map map.put(ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// put the group ID of consumer in the map map.put(ConsumerConfig .GROUP_ID_CONFIG, "id"); map.put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); map.put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// return message in JSON formate return new DefaultKafkaConsumerFactory<>( map, new StringDeserializer(), new JsonDeserializer<>(Student.class)); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, Student> studentListner() { ConcurrentKafkaListenerContainerFactory<String, Student> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(studentConsumer()); return factory; } } |
步骤 4:
创建一个带有@Service注释的KafkaService类。此类将包含用于在控制台上发布消息的侦听器方法。
- KafkaService 类
@Service public class KafkaService {
// Annotation required to listen // the message from Kafka server @KafkaListener(topics = "JsonTopic", groupId = "id", containerFactory = "studentListner") public void publish(Student student) { System.out.println("New Entry: " + student); } } |
步骤 5:
启动 zookeeper 和 Kafka 服务器。现在我们需要创建一个名为JsonTopic的新主题。为此,打开一个新的命令提示符窗口并将目录更改为 Kafka 目录。
步骤6:
现在使用下面给出的命令创建一个新主题:
bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 mac 和 linux
.\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 windows
步骤 7:
现在运行 Kafka 生产者控制台,使用以下命令:
bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // 适用于 mac 和 linux
.\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // 适用于 windows
步骤 8:
运行应用程序并在 Kafka 生产器上输入消息并按回车键。
原文地址:https://blog.csdn.net/xt14327/article/details/145064915
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!