当前位置:首页 > CMS教程 > Thinkphp > 列表

TP5使用RabbitMQ实现消息队列的项目实践

发布:smiling 来源: PHP粉丝网  添加日期:2023-10-16 18:26:28 浏览: 评论:0 

在使用 RabbitMQ 之前,你要安装好 RabbitMQ 服务,具体安装方法可以参考 windows下安装RabbitMQ。

1、安装扩展

进入TP5 更目录下,输入命令安装:

composer require php-amqplib/php-amqplib

2、自定义命令

TP5 的自定义命令,这里也简单说下。

第一步:

创建命令类文件,新建 application/api/command/Test.php。

  1. <?php 
  2. namespace app\api\command; 
  3. use think\console\Command; 
  4. use think\console\Input; 
  5. use think\console\Output; 
  6. /** 
  7.  * 自定义命令测试 
  8.  */ 
  9. class Test extends Command 
  10.     /** 
  11.      * 配置 
  12.      */ 
  13.     protected function configure() 
  14.     { 
  15.         // 设置命令的名称和描述 
  16.         $this->setName('test')->setDescription('这是一个测试命令'); 
  17.     } 
  18.     /** 
  19.      * 执行 
  20.      */ 
  21.     protected function execute(Input $input, Output $output
  22.     { 
  23.         $output->writeln("测试命令"); 
  24.     } 

这个文件定义了一个叫test的命令,备注为 这是一个测试命令,执行命令会输出:test command。

第二步:

配置 command.php文件,在 application/command.php文件中添加命令。

  1. <?php 
  2. return [ 
  3.     'app\api\command\Test'
  4. ]; 

第三步:

测试命令,在项目根目录下输入命令:

php think test

回车运行之后输出:

test command

到这里,自定义命令就结束了,test命令就自定义成功了。

3、rabbitmq服务端

下来我们自定义 RabbitMQ 启动命令,守护进程运行,启动 rabbirmq 服务端接收消息。

在 application/api/command 目录下,新建 Ramq.php 文件,在执行命令的方法中,调用 RabbitMQ 启动守护进程方法即可。

  1. <?php 
  2. namespace app\api\command; 
  3. use PhpAmqpLib\Connection\AMQPStreamConnection; 
  4. use think\console\Command; 
  5. use think\console\Input; 
  6. use think\console\Output; 
  7. /** 
  8.  * RabbitMq 启动命令 
  9.  */ 
  10. class Ramq extends Command 
  11.     protected $consumerTag = 'customer'
  12.     protected $exchange = 'xcuser'
  13.     protected $queue = 'xcmsg'
  14.     protected function configure() 
  15.     { 
  16.         $this->setName('ramq')->setDescription('rabbitmq'); 
  17.     } 
  18.     protected function execute(Input $input, Output $output
  19.     { 
  20.         $output->writeln("消息队列开始"); 
  21.         $this->start(); 
  22.         // 指令输出 
  23.         $output->writeln('消费队列结束'); 
  24.     } 
  25.     /** 
  26.      * 关闭 
  27.      */ 
  28.     function shutdown($channel$connection
  29.     { 
  30.         $channel->close(); 
  31.         $connection->close(); 
  32.     } 
  33.     /** 
  34.      * 回调处理信息 
  35.      */ 
  36.     function process_message($message
  37.     { 
  38.         if ($message->body !== 'quit') { 
  39.             echo $message->body; 
  40.         } 
  41.         //手动应答 
  42.         $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); 
  43.         if ($message->body === 'quit') { 
  44.             $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); 
  45.         } 
  46.     } 
  47.     /** 
  48.      * 启动 守护进程运行 
  49.      */ 
  50.     public function start() 
  51.     { 
  52.         $host = '127.0.0.1'
  53.         $port = 5672; 
  54.         $user = 'guest'
  55.         $pwd = 'guest'
  56.         $vhost = '/'
  57.         $connection = new AMQPStreamConnection($host$port$user$pwd$vhost); 
  58.         $channel = $connection->channel(); 
  59.         $channel->queue_declare($this->queue, false, true, false, false); 
  60.         $channel->exchange_declare($this->exchange, 'direct', false, true, false); 
  61.         $channel->queue_bind($this->queue, $this->exchange); 
  62.         $channel->basic_consume($this->queue, $this->consumerTag, false, false, false, false, array($this'process_message')); 
  63.         register_shutdown_function(array($this'shutdown'), $channel$connection); 
  64.         while (count($channel->callbacks)) { 
  65.             $channel->wait(); 
  66.         } 
  67.     } 

在application/command.php文件中,添加rabbitmq自定义命令。

  1. return [ 
  2.     'app\api\command\Ramq',// rabbitmq 
  3. ]; 

4、发送端

最后,我们再写发送消息的控制器,实现消息队列,具体代码如下:

  1. <?php 
  2. namespace app\api\controller; 
  3. use PhpAmqpLib\Connection\AMQPStreamConnection; 
  4. use PhpAmqpLib\Message\AMQPMessage; 
  5. use think\Controller; 
  6. /** 
  7.  * 发送端 
  8.  */ 
  9. class MessageQueue extends Controller 
  10.     const exchange = 'xcuser'
  11.     const queue = 'xcmsg'
  12.     /** 
  13.      * 发送消息 
  14.      */ 
  15.     public function pushMessage($data
  16.     { 
  17.         $host = '127.0.0.1'
  18.         $port = 5672; 
  19.         $user = 'guest'
  20.         $pwd = 'guest'
  21.         $vhost = '/'
  22.         $connection = new AMQPStreamConnection($host$port$user$pwd$vhost); 
  23.         $channel = $connection->channel(); 
  24.         $channel->exchange_declare(self::exchange, 'direct', false, true, false); 
  25.         $channel->queue_declare(self::queue, false, true, false, false); 
  26.         $channel->queue_bind(self::queue, self::exchange); 
  27.         $messageBody = $data
  28.         $message = new AMQPMessage($messageBodyarray('content_type' => 'text/plain''delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); 
  29.         $channel->basic_publish($message, self::exchange); 
  30.         $channel->close(); 
  31.         $connection->close(); 
  32.         echo 'ok'
  33.     } 
  34.     /** 
  35.      * 执行 
  36.      */ 
  37.     public function index() 
  38.     { 
  39.         $data = json_encode(['msg' => '测试数据''id' => '15']); 
  40.         $this->pushMessage($data); 
  41.     } 

5、验证

先执行自定义命令,启动 rabbitmq 守护进程。在项目更目录下打开命令行,输入下面命令:

php think ramq

然后在浏览器访问发送信息的方法,http://你的域名/api/message/index,你发送一次消息,在命令行就会输出一条消息。这样我们就用 RabbitMQ 实现了一个简单的消息队列。

Tags: RabbitMQ TP5消息队列

分享到: