自学内容网 自学内容网

Java版Flink使用指南——将消息写入到RabbitMQ的队列中


《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们介绍了如何使用Java在Flink中读取RabbitMQ中的数据,并将其写入日志中。本文将通过代码产生一些数据,然后将它们写入到另外一个RabbitMQ队列中。

新建工程

我们在IntelliJ中新建一个工程SinkToRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>3.0.1-1.17</version>
</dependency>

编码

自动产生数据

这段代码将产生两个字符串数据,后续这些数据会被写入到RabbitMQ的队列中。

List<String> data = new ArrayList<>();
data.add("Hello, World!");
data.add("Hello, Flink!");
DataStream<String> stream = env.fromCollection(data);

写入RabbitMQ

不同于《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》中创建RMQSource用来接收RabbitMQ队列中数据,这次我们创建RMQSink用来发布数据。

String sinkQueueName = "data.to.rbtmq"; // name of the queue to send data to
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;

RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
.setHost(host)
.setPort(port)
.setUserName(username)
.setPassword(password)
.setVirtualHost(virtualHost)
.build();

RMQSink<String> stringRMQSink = new RMQSink<>(rmqConnectionConfig, sinkQueueName, new SimpleStringSchema());
stream.addSink(stringRMQSink).name(username + "'s sink to " + sinkQueueName).setParallelism(parallelism);

测试

打包、提交并运行任务
在这里插入图片描述
然后在RabbitMQ的后台可以看到收到两条消息
在这里插入图片描述
其内容也是我们之前在代码中生成的内容
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo


原文地址:https://blog.csdn.net/breaksoftware/article/details/140062974

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!