当前位置:首页 > PHP教程 > php高级应用 > 列表

PHP和RabbitMQ实现消息队列的完整代码

发布:smiling 来源: PHP粉丝网  添加日期:2020-02-04 16:27:05 浏览: 评论:0 

本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.

php扩展地址: http://pecl.php.net/package/amqp

具体以官网为准 http://www.rabbitmq.com/getstarted.html

介绍:

config.php 配置信息

BaseMQ.php MQ基类

ProductMQ.php 生产者类

ConsumerMQ.php 消费者类

Consumer2MQ.php 消费者2(可有多个)

config.php

  1. <?php 
  2.  
  3. return [ 
  4.  
  5.     //配置 
  6.  
  7.     'host' => [ 
  8.  
  9.         'host' => '127.0.0.1'
  10.  
  11.         'port' => '5672'
  12.  
  13.         'login' => 'guest'
  14.  
  15.         'password' => 'guest'
  16.  
  17.         'vhost'=>'/'
  18.  
  19.     ], 
  20.  
  21.     //交换机 
  22. //phpfensi.com 
  23.     'exchange'=>'word'
  24.  
  25.     //路由 
  26.  
  27.     'routes' => [], 
  28.  
  29. ]; 

BaseMQ.php

  1. <?php 
  2.  
  3. /** 
  4.  
  5.  * Created by PhpStorm. 
  6.  
  7.  * User: pc 
  8.  
  9.  * Date: 2018/12/13 
  10.  
  11.  * Time: 14:11 
  12.  
  13.  */ 
  14.  
  15.  
  16.  
  17. namespace MyObjSummary\rabbitMQ; 
  18.  
  19.  
  20.  
  21. /** Member 
  22.  
  23.  *      AMQPChannel 
  24.  
  25.  *      AMQPConnection 
  26.  
  27.  *      AMQPEnvelope 
  28.  
  29.  *      AMQPExchange 
  30.  
  31.  *      AMQPQueue 
  32.  
  33.  * Class BaseMQ 
  34.  
  35.  * @package MyObjSummary\rabbitMQ 
  36.  
  37.  */ 
  38.  
  39. class BaseMQ 
  40.  
  41.  
  42.     /** MQ Channel 
  43.  
  44.      * @var \AMQPChannel 
  45.  
  46.      */ 
  47.  
  48.     public $AMQPChannel ; 
  49.  
  50.  
  51.  
  52.     /** MQ Link 
  53.  
  54.      * @var \AMQPConnection 
  55.  
  56.      */ 
  57.  
  58.     public $AMQPConnection ; 
  59.  
  60.  
  61.  
  62.     /** MQ Envelope 
  63.  
  64.      * @var \AMQPEnvelope 
  65.  
  66.      */ 
  67.  
  68.     public $AMQPEnvelope ; 
  69.  
  70.  
  71.  
  72.     /** MQ Exchange 
  73.  
  74.      * @var \AMQPExchange 
  75.  
  76.      */ 
  77.  
  78.     public $AMQPExchange ; 
  79.  
  80.  
  81.  
  82.     /** MQ Queue 
  83.  
  84.      * @var \AMQPQueue 
  85.  
  86.      */ 
  87.  
  88.     public $AMQPQueue ; 
  89.  
  90.  
  91.  
  92.     /** conf 
  93.  
  94.      * @var 
  95.  
  96.      */ 
  97.  
  98.     public $conf ; 
  99.  
  100.  
  101.  
  102.     /** exchange 
  103.  
  104.      * @var 
  105.  
  106.      */ 
  107.  
  108.     public $exchange ; 
  109.  
  110.  
  111.  
  112.     /** link 
  113.  
  114.      * BaseMQ constructor. 
  115.  
  116.      * @throws \AMQPConnectionException 
  117.  
  118.      */ 
  119.  
  120.     public function __construct() 
  121.  
  122.     { 
  123.  
  124.         $conf =  require 'config.php' ; 
  125.  
  126.         if(!$conf
  127.  
  128.             throw new \AMQPConnectionException('config error!'); 
  129.  
  130.         $this->conf     = $conf['host'] ; 
  131.  
  132.         $this->exchange = $conf['exchange'] ; 
  133.  
  134.         $this->AMQPConnection = new \AMQPConnection($this->conf); 
  135.  
  136.         if (!$this->AMQPConnection->connect()) 
  137.  
  138.             throw new \AMQPConnectionException("Cannot connect to the broker!\n"); 
  139.  
  140.     } 
  141.  
  142.  
  143.  
  144.     /** 
  145.  
  146.      * close link 
  147.  
  148.      */ 
  149.  
  150.     public function close() 
  151.  
  152.     { 
  153.  
  154.         $this->AMQPConnection->disconnect(); 
  155.  
  156.     } 
  157.  
  158.  
  159.  
  160.     /** Channel 
  161.  
  162.      * @return \AMQPChannel 
  163.  
  164.      * @throws \AMQPConnectionException 
  165.  
  166.      */ 
  167.  
  168.     public function channel() 
  169.  
  170.     { 
  171.  
  172.         if(!$this->AMQPChannel) { 
  173.  
  174.             $this->AMQPChannel =  new \AMQPChannel($this->AMQPConnection); 
  175.  
  176.         } 
  177.  
  178.         return $this->AMQPChannel; 
  179.  
  180.     } 
  181.  
  182.  
  183.  
  184.     /** Exchange 
  185.  
  186.      * @return \AMQPExchange 
  187.  
  188.      * @throws \AMQPConnectionException 
  189.  
  190.      * @throws \AMQPExchangeException 
  191.  
  192.      */ 
  193.  
  194.     public function exchange() 
  195.  
  196.     { 
  197.  
  198.         if(!$this->AMQPExchange) { 
  199.  
  200.             $this->AMQPExchange = new \AMQPExchange($this->channel()); 
  201.  
  202.             $this->AMQPExchange->setName($this->exchange); 
  203.  
  204.         } 
  205.  
  206.         return $this->AMQPExchange ; 
  207.  
  208.     } 
  209.  
  210.  
  211.  
  212.     /** queue 
  213.  
  214.      * @return \AMQPQueue 
  215.  
  216.      * @throws \AMQPConnectionException 
  217.  
  218.      * @throws \AMQPQueueException 
  219.  
  220.      */ 
  221.  
  222.     public function queue() 
  223.  
  224.     { 
  225.  
  226.         if(!$this->AMQPQueue) { 
  227.  
  228.             $this->AMQPQueue = new \AMQPQueue($this->channel()); 
  229.  
  230.         } 
  231.  
  232.         return $this->AMQPQueue ; 
  233.  
  234.     } 
  235.  
  236.  
  237.  
  238.     /** Envelope 
  239.  
  240.      * @return \AMQPEnvelope 
  241.  
  242.      */ 
  243.  
  244.     public function envelope() 
  245.  
  246.     { 
  247.  
  248.         if(!$this->AMQPEnvelope) { 
  249.  
  250.             $this->AMQPEnvelope = new \AMQPEnvelope(); 
  251. //phpfensi.com 
  252.         } 
  253.  
  254.         return $this->AMQPEnvelope; 
  255.  
  256.     } 
  257.  

ProductMQ.php

  1. <?php 
  2.  
  3. //生产者 P 
  4.  
  5. namespace MyObjSummary\rabbitMQ; 
  6.  
  7. require 'BaseMQ.php'
  8.  
  9. class ProductMQ extends BaseMQ 
  10.  
  11.  
  12.     private $routes = ['hello','word']; //路由key 
  13.  
  14.  
  15.  
  16.     /** 
  17.  
  18.      * ProductMQ constructor. 
  19.  
  20.      * @throws \AMQPConnectionException 
  21.  
  22.      */ 
  23.  
  24.     public function __construct() 
  25.  
  26.     { 
  27.  
  28.        parent::__construct(); 
  29.  
  30.     } 
  31.  
  32.  
  33.  
  34.     /** 只控制发送成功 不接受消费者是否收到 
  35.  
  36.      * @throws \AMQPChannelException 
  37.  
  38.      * @throws \AMQPConnectionException 
  39.  
  40.      * @throws \AMQPExchangeException 
  41.  
  42.      */ 
  43.  
  44.     public function run() 
  45.  
  46.     { 
  47.  
  48.         //频道 
  49.  
  50.         $channel = $this->channel(); 
  51.  
  52.         //创建交换机对象 
  53.  
  54.         $ex = $this->exchange(); 
  55.  
  56.         //消息内容 
  57.  
  58.         $message = 'product message '.rand(1,99999); 
  59.  
  60.         //开始事务 
  61.  
  62.         $channel->startTransaction(); 
  63.  
  64.         $sendEd = true ; 
  65.  
  66.         foreach ($this->routes as $route) { 
  67.  
  68.             $sendEd = $ex->publish($message$route) ; 
  69.  
  70.             echo "Send Message:".$sendEd."\n"
  71.  
  72.         } 
  73.  
  74.         if(!$sendEd) { 
  75.  
  76.             $channel->rollbackTransaction(); 
  77.  
  78.         } 
  79.  
  80.         $channel->commitTransaction(); //提交事务 
  81.  
  82.         $this->close(); 
  83.  
  84.         die ; 
  85.  
  86.     } 
  87.  
  88.  
  89. try{ 
  90.  
  91.     (new ProductMQ())->run(); 
  92.  
  93. }catch (\Exception $exception){ 
  94.  
  95.     var_dump($exception->getMessage()) ; 
  96.  

ConsumerMQ.php

  1. <?php 
  2.  
  3. //消费者 C 
  4.  
  5. namespace MyObjSummary\rabbitMQ; 
  6.  
  7. require 'BaseMQ.php'
  8.  
  9. class ConsumerMQ extends BaseMQ 
  10.  
  11.  
  12.     private  $q_name = 'hello'//队列名 
  13.  
  14.     private  $route  = 'hello'//路由key 
  15.  
  16.  
  17.  
  18.     /** 
  19.  
  20.      * ConsumerMQ constructor. 
  21.  
  22.      * @throws \AMQPConnectionException 
  23.  
  24.      */ 
  25.  
  26.     public function __construct() 
  27.  
  28.     { 
  29.  
  30.         parent::__construct(); 
  31.  
  32.     } 
  33.  
  34.  
  35.  
  36.     /** 接受消息 如果终止 重连时会有消息 
  37.  
  38.      * @throws \AMQPChannelException 
  39.  
  40.      * @throws \AMQPConnectionException 
  41.  
  42.      * @throws \AMQPExchangeException 
  43.  
  44.      * @throws \AMQPQueueException 
  45.  
  46.      */ 
  47.  
  48.     public function run() 
  49.  
  50.     { 
  51.  
  52.  
  53.  
  54.         //创建交换机 
  55.  
  56.         $ex = $this->exchange(); 
  57.  
  58.         $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 
  59.  
  60.         $ex->setFlags(AMQP_DURABLE); //持久化 
  61.  
  62.         //echo "Exchange Status:".$ex->declare()."\n"; 
  63.  
  64.  
  65.  
  66.         //创建队列 
  67.  
  68.         $q = $this->queue(); 
  69.  
  70.         //var_dump($q->declare());exit(); 
  71.  
  72.         $q->setName($this->q_name); 
  73.  
  74.         $q->setFlags(AMQP_DURABLE); //持久化 
  75.  
  76.         //echo "Message Total:".$q->declareQueue()."\n"; 
  77.  
  78.  
  79.  
  80.         //绑定交换机与队列,并指定路由键 
  81.  
  82.         echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n"
  83.  
  84.  
  85.  
  86.         //阻塞模式接收消息 
  87.  
  88.         echo "Message:\n"
  89.  
  90.         while(True){ 
  91.  
  92.             $q->consume(function ($envelope,$queue){ 
  93.  
  94.                 $msg = $envelope->getBody(); 
  95.  
  96.                 echo $msg."\n"//处理消息 
  97.  
  98.                 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
  99.  
  100.             }); 
  101.  
  102.             //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 
  103.  
  104.         } 
  105.  
  106.         $this->close(); 
  107.  
  108.     } 
  109.  
  110.  
  111. try{ 
  112.  
  113.     (new ConsumerMQ)->run(); 
  114.  
  115. }catch (\Exception $exception){ 
  116.  
  117.     var_dump($exception->getMessage()) ; 
  118.  

Tags: RabbitMQ PHP消息队列

分享到: