自学内容网 自学内容网

PHP使用RabbitMQ(正常连接与开启SSL验证后的连接)

代码中包含了PHP在一般情况下使用方法和RabbitMQ开启了SSL验证后的使用方法(我这边消费队列是使用接口请求的方式,每次只从中取出一条)

安装amqp扩展

PHP使用RabbitMQ前,需要安装amqp扩展,之前文章中介绍了Windows环境PHP安装amqp扩展的方法:windows环境PHP使用RabbitMq安装amqp扩展_windows mq扩展安装-CSDN博客

Linux中安装amqp扩展:

### 先进入/usr/local目录下,下载两个文件到此目录(我的PHP版本是7.2):

wget -c https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz

wget -c http://pecl.php.net/get/amqp-1.9.3.tgz

### 若使用的docker,将上面下载的两个包 拷贝到容器内【 docker cp ./文件 dockerID:/usr/local】,然后执行下面命令即可

### 解压rabbitmq-c-0.8.0.tar.gz

tar zxf rabbitmq-c-0.8.0.tar.gz

cd /usr/local/rabbitmq-c-0.8.0

./configure --prefix=/usr/local/rabbitmq-c-0.8.0

make && make install

### 然后解压 amqp-1.9.3.tgz 解压后amqp-1.9.3文件下内还有个amqp-1.9.3文件夹,将内部的amqp-1.9.3目录拷贝到/usr/local/下,执行下列命令:

cd /usr/local/amqp-1.9.3

/usr/local/bin/phpize

./configure --with-php-config=/usr/local/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.8.0

cp /usr/local/rabbitmq-c-0.8.0/librabbitmq/amqp_ssl_socket.h /usr/local/amqp-1.9.3/

make && make install

### 最后修改php.ini    加上配置:

extension = amqp.so 

安装后,执行php -m 显示amqp  即表明扩展安装成功!

加载PHP代码的扩展包

然后需要加载代码的扩展包,比较方便快捷的方法是使用composer 加载扩展包

composer require php-amqplib/php-amqplib

若想指定版本:composer require php-amqplib/php-amqplib:版本

具体使用哪个版本可以在此链接内查询:https://packagist.org/packages/php-amqplib/php-amqplib

示例代码(包含开启了SSL的连接方式)

<?php

namespace common\helpers;

use models\setting\Log;
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class AmqpHelper
{
    /**
     * rabbitMq 未开启ssl验证 消费者
     * @return false|string|void
     * @throws \AMQPChannelException
     * @throws \AMQPConnectionException
     * @throws \AMQPQueueException
     * @time 2024/12/2 13:43
     * @author zsh
     */
    public static function consumerResult()
    {
        //队列配置信息
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['queueHost'],
            'port' => \Yii::$app->params['cotaTct']['queuePort'],
            'login' => \Yii::$app->params['cotaTct']['queueLogin'],
            'password' => \Yii::$app->params['cotaTct']['queuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );

        $conn = new \AMQPConnection($configParams);
        if (!$conn->connect()) {
            die("连接rabbitmq失败!\n");
        }

        //建立信道
        $channel = new \AMQPChannel($conn);

        // 创建队列
        $q = new \AMQPQueue($channel);

        $queueName = \Yii::$app->params['cotaTct']['queueName']; //队列名
        $q->setName($queueName);
        $q->setFlags(AMQP_DURABLE);             // 持久化


        // 绑定交换机与队列,并指定路由键
        $q->bind(\Yii::$app->params['cotaTct']['exchange'], \Yii::$app->params['cotaTct']['routingKey']);

        // 消息获取
        $ret = $q->get(AMQP_AUTOACK);

        if ($ret) {
//            echo "\nget data:\n";
//            var_dump($ret->getBody());
//            var_dump(json_decode($ret->getBody(), true));
            $conn->disconnect();
            return $ret->getBody();

        }else{
            $conn->disconnect();
            return false;
        }
    }

    /**
     * rabbitMq 开启ssl了验证 消费者
     * @return mixed|string|void
     * @throws \ErrorException
     * @time 2024/12/2 13:44
     * @author zsh
     */
    public static function sslConsumerResult()
    {
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['prodQueueHost'],
            'port' => \Yii::$app->params['cotaTct']['prodQueuePort'],
            'login' => \Yii::$app->params['cotaTct']['prodQueueLogin'],
            'password' => \Yii::$app->params['cotaTct']['prodQueuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );

        // 创建SSL连接时忽略证书验证
        $ssl_options = array(
            'verify_peer' => false,
            'verify_peer_name' => false,
        );

        $connection = new AMQPSSLConnection(
            $configParams['host'],
            $configParams['port'],
            $configParams['login'],
            $configParams['password'],
            $configParams['vhost'],
            $ssl_options);

        if (!$connection->isConnected()) {
            die("连接rabbitmq失败!\n");
        }
//        echo '链接成功...';

        $queueName = \Yii::$app->params['cotaTct']['queueName']; //队列名
        $exchange = \Yii::$app->params['cotaTct']['exchange'];
        $routingKey = \Yii::$app->params['cotaTct']['routingKey'];

        $channel = $connection->channel();

        // 声明交换器
        $channel->exchange_declare($exchange, 'topic', false, true, false);

        // 获取系统生成的消息队列名称,这里也可以指定一个队列名称
        $channel->queue_declare($queueName, false, true, false, false);

        // 将队列名与交换器名进行绑定,并指定routing_key(路由键值)
        $channel->queue_bind($queueName,$exchange,$routingKey);

        $message = '';
        // 定义收到消息回调函数
        $callback = function ($msg) use (&$message) {
//            echo 'Message:'.$msg->body;
            $message = $msg->body;
            // 手动确认消息是否正常消费
            $msg->delivery_info['channel']->basic_Ack($msg->delivery_info['delivery_tag']);
        };

        // 设置消费成功后才能继续进行下一个消费
        $channel->basic_qos(null, 1, null);

        // 开启消费no_ack=false,设置为手动应答
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);

        // 循环进行消费
//        while ($channel->is_consuming()) {
//            try {
//                $channel->wait(null, false, $timeout = 10);
//            }catch (AMQPTimeoutException $ex){
//                // 没有消息可处理,退出循环
//                echo $ex->getMessage();
//                break;
//            }
//        }

        if ($channel->is_consuming()) {
            try {
                $channel->wait(null, false, $timeout = 5);
            }catch (AMQPTimeoutException $ex){
                // 没有消息可处理,退出循环
                echo $ex->getMessage();
            }
        }

        //关闭连接
        $channel->close();
        $connection->close();

        $return = $message;
        unset($message);
        $message = null;

        return $return;
    }

    /**
     * rabbitMq 未开启ssl验证 生产者
     * @return mixed|string|void
     * @throws \ErrorException
     * @time 2024/12/2 13:44
     * @author zsh
     */
    public static function producer($message)
    {
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['queueHost'],
            'port' => \Yii::$app->params['cotaTct']['queuePort'],
            'login' => \Yii::$app->params['cotaTct']['queueLogin'],
            'password' => \Yii::$app->params['cotaTct']['queuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );
        $exchangeName = \Yii::$app->params['cotaTct']['producerExchange'];

        try {
            $conn = new AMQPStreamConnection($configParams['host'], $configParams['port'], $configParams['login'], $configParams['password']);

            //创建channel
            $channel = $conn->channel();
            $channel->exchange_declare($exchangeName,'fanout',false,true,false);

            $messageData = new AMQPMessage($message);
            $channel->basic_publish($messageData, $exchangeName);

            $channel->close();
            $conn->close();

            return true;
        }catch (\Exception $e){
            Log::error('AMQP队列错误:'.$e,'AMQP');
            return false;
        }
    }

    /**
     * rabbitMq 开启了ssl验证 生产者
     * @return mixed|string|void
     * @throws \ErrorException
     * @time 2024/12/2 13:44
     * @author zsh
     */
    public static function sslProducer($message)
    {
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['prodQueueHost'],
            'port' => \Yii::$app->params['cotaTct']['prodQueuePort'],
            'login' => \Yii::$app->params['cotaTct']['prodQueueLogin'],
            'password' => \Yii::$app->params['cotaTct']['prodQueuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );
        $exchangeName = \Yii::$app->params['cotaTct']['producerExchange'];

        // 创建SSL连接时忽略证书验证
        $ssl_options = array(
            'verify_peer' => false,
            'verify_peer_name' => false,
        );

        try {
            $conn = new AMQPSSLConnection(
                $configParams['host'],
                $configParams['port'],
                $configParams['login'],
                $configParams['password'],
                $configParams['vhost'],
                $ssl_options);

            if (!$conn->isConnected()) {
                die("连接rabbitmq失败!\n");
            }

            //创建channel
            $channel = $conn->channel();
            $channel->exchange_declare($exchangeName,'fanout',false,true,false);

            $messageData = new AMQPMessage($message);
            $channel->basic_publish($messageData, $exchangeName);

            $channel->close();
            $conn->close();

            return true;
        }catch (\Exception $e){
            Log::error('AMQP队列错误:'.$e,'AMQP');
            return false;
        }
    }
}


原文地址:https://blog.csdn.net/qq_32737755/article/details/144241576

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!