Kafka | RabbitMQ | RocketMQ | ActiveMQ 的区别和入门案例
springboot,vue,springcloudalibaba课程视频,有需要可以看看
<!-- springboot,springboot整合redis,整合rocketmq视频: -->
https://www.bilibili.com/video/BV1nkmRYSErk/?vd_source=14d27ec13a4737c281b7c79463687112
<!-- springcloudalibaba,openfeign,nacos,gateway,sso视频:-->
https://www.bilibili.com/video/BV1cFDEYWEkY/?vd_source=14d27ec13a4737c281b7c79463687112
<!-- vue+springboot前后端分离视频:-->
https://www.bilibili.com/video/BV1JLSEYJETc/?vd_source=14d27ec13a4737c281b7c79463687112
<!-- shiro视频:-->
https://www.bilibili.com/video/BV1YVUmYJEPi/?vd_source=14d27ec13a4737c281b7c79463687112
以下是常用MQ消息中间件的区别,以表格形式展示:
特性/MQ | Kafka | RabbitMQ | RocketMQ | ActiveMQ |
---|---|---|---|---|
开发语言 | Scala | Erlang | Java | Java |
支持的协议 | 自定义(基于TCP) | AMQP | 自定义 | OpenWire、STOMP、REST、XMPP、AMQP |
消息存储 | 磁盘;支持大量堆积 | 内存、磁盘;支持少量堆积 | 磁盘;支持大量堆积 | 内存、磁盘、数据库;支持少量堆积 |
消息事务 | 支持 | 支持 | 不支持 | 支持 |
负载均衡 | 支持 | 支持的不好 | 支持 | 可基于zookeeper实现负载均衡 |
集群方式 | 天然的‘Leader-Slave’无状态集群 | 支持简单集群,对高级集群模式支持不好 | 常用多对’Master-Slave’模式 | 支持简单集群模式,对高级集群模式支持不好 |
管理界面 | 一般 | 好 | 有管理后台 | 一般 |
可用性 | 非常高(分布式) | 高(主从) | 非常高(分布式) | 高(主从) |
消息重复 | 支持at least once、at most once | 支持at least once、at most once | 支持at least once | 支持at least once |
吞吐量TPS | 极大 | 比较大 | 大 | 比较大 |
订阅形式和消息分发 | 发布订阅模式 | direct、topic、Headers和fanout | 发布订阅模式 | 点对点(p2p)、广播(发布-订阅) |
接下来是每个MQ的入门案例代码:
Kafka入门案例
生产者代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), "Hello Kafka " + i));
}
}
}
}
消费者代码:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
}
RabbitMQ入门案例
生产者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费者代码:
import com.rabbitmq.client.*;
public class RabbitMQConsumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
}
RocketMQ入门案例
生产者代码:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class RocketMQProducerExample {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
消费者代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMQConsumerExample {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
ActiveMQ入门案例
当然可以,以下是ActiveMQ的生产者和消费者的基本示例代码。
ActiveMQ 生产者代码示例
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQProducer {
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 建立连接
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
// 创建会话,使用非事务性会话和自动确认消息
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("TestQueue");
// 创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 创建文本消息
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
// 发送消息
producer.send(message);
System.out.println("Sent: " + message.getText());
// 清理资源
producer.close();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
ActiveMQ 消费者代码示例
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQConsumer {
public static void main(String[] args) {
// 连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 建立连接
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
// 创建会话,使用非事务性会话和自动确认消息
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("TestQueue");
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
// 接收消息
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received: " + textMessage.getText());
}
// 清理资源
consumer.close();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
在这两个示例中,生产者连接到ActiveMQ服务器,创建一个名为"TestQueue"的队列,并向其中发送一条消息。消费者也连接到同一个队列,并等待接收消息。当生产者发送消息后,消费者会接收并打印这条消息。
确保在运行这些代码之前,你的ActiveMQ服务器已经启动,并且监听在默认的61616端口上。如果ActiveMQ服务器配置有所不同,请相应地修改连接字符串。
完结!
原文地址:https://blog.csdn.net/QQ903275718/article/details/144376230
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!