当前位置:首页 > PHP教程 > php高级应用 > 列表

PHP+RabbitMQ实现消息队列的完整代码

发布:smiling 来源: PHP粉丝网  添加日期:2021-11-13 22:29:26 浏览: 评论:0 

这篇文章主要给大家介绍了关于利用PHP+RabbitMQ实现消息队列的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用PHP具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧。

前言

为什么使用RabbitMq而不是ActiveMq或者RocketMq?

首先,从业务上来讲,我并不要求消息的100%接受率,并且,我需要结合php开发,RabbitMq相较RocketMq,延迟较低(微妙级)。至于ActiveMq,貌似问题较多。RabbitMq对各种语言的支持较好,所以选择RabbitMq。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.

php扩展地址: http://pecl.php.net/package/amqp

具体以官网为准  http://www.rabbitmq.com/getstarted.html

介绍

config.php 配置信息

BaseMQ.php MQ基类

ProductMQ.php 生产者类

ConsumerMQ.php 消费者类

Consumer2MQ.php 消费者2(可有多个)

config.php

  1. <?php 
  2. return [ 
  3.  //配置 
  4.  'host' => [ 
  5.   'host' => '127.0.0.1'
  6.   'port' => '5672'
  7.   'login' => 'guest'
  8.   'password' => 'guest'
  9.   'vhost'=>'/'
  10.  ], 
  11.  //交换机 
  12.  'exchange'=>'word'
  13.  //路由 
  14.  'routes' => [], 
  15. ]; 

BaseMQ.php

  1. <?php 
  2. /** 
  3.  * Created by PhpStorm. 
  4.  * User: pc 
  5.  * Date: 2018/12/13 
  6.  * Time: 14:11 
  7.  */ 
  8.  
  9. namespace MyObjSummary\rabbitMQ; 
  10.  
  11. /** Member 
  12.  *  AMQPChannel 
  13.  *  AMQPConnection 
  14.  *  AMQPEnvelope 
  15.  *  AMQPExchange 
  16.  *  AMQPQueue 
  17.  * Class BaseMQ 
  18.  * @package MyObjSummary\rabbitMQ 
  19.  */ 
  20. class BaseMQ 
  21.  /** MQ Channel 
  22.   * @var \AMQPChannel 
  23.   */ 
  24.  public $AMQPChannel ; 
  25.  
  26.  /** MQ Link 
  27.   * @var \AMQPConnection 
  28.   */ 
  29.  public $AMQPConnection ; 
  30.  
  31.  /** MQ Envelope 
  32.   * @var \AMQPEnvelope 
  33.   */ 
  34.  public $AMQPEnvelope ; 
  35.  
  36.  /** MQ Exchange 
  37.   * @var \AMQPExchange 
  38.   */ 
  39.  public $AMQPExchange ; 
  40.  
  41.  /** MQ Queue 
  42.   * @var \AMQPQueue 
  43.   */ 
  44.  public $AMQPQueue ; 
  45.  
  46.  /** conf 
  47.   * @var 
  48.   */ 
  49.  public $conf ; 
  50.  
  51.  /** exchange 
  52.   * @var 
  53.   */ 
  54.  public $exchange ; 
  55.  
  56.  /** link 
  57.   * BaseMQ constructor. 
  58.   * @throws \AMQPConnectionException 
  59.   */ 
  60.  public function __construct() 
  61.  { 
  62.   $conf = require 'config.php' ; 
  63.   if(!$conf
  64.    throw new \AMQPConnectionException('config error!'); 
  65.   $this->conf  = $conf['host'] ; 
  66.   $this->exchange = $conf['exchange'] ; 
  67.   $this->AMQPConnection = new \AMQPConnection($this->conf); 
  68.   if (!$this->AMQPConnection->connect()) 
  69.    throw new \AMQPConnectionException("Cannot connect to the broker!\n"); 
  70.  } 
  71.  
  72.  /** 
  73.   * close link 
  74.   */ 
  75.  public function close() 
  76.  { 
  77.   $this->AMQPConnection->disconnect(); 
  78.  } 
  79.  
  80.  /** Channel 
  81.   * @return \AMQPChannel 
  82.   * @throws \AMQPConnectionException 
  83.   */ 
  84.  public function channel() 
  85.  { 
  86.   if(!$this->AMQPChannel) { 
  87.    $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection); 
  88.   } 
  89.   return $this->AMQPChannel; 
  90.  } 
  91.  
  92.  /** Exchange 
  93.   * @return \AMQPExchange 
  94.   * @throws \AMQPConnectionException 
  95.   * @throws \AMQPExchangeException 
  96.   */ 
  97.  public function exchange() 
  98.  { 
  99.   if(!$this->AMQPExchange) { 
  100.    $this->AMQPExchange = new \AMQPExchange($this->channel()); 
  101.    $this->AMQPExchange->setName($this->exchange); 
  102.   } 
  103.   return $this->AMQPExchange ; 
  104.  } 
  105.  
  106.  /** queue 
  107.   * @return \AMQPQueue 
  108.   * @throws \AMQPConnectionException 
  109.   * @throws \AMQPQueueException 
  110.   */ 
  111.  public function queue() 
  112.  { 
  113.   if(!$this->AMQPQueue) { 
  114.    $this->AMQPQueue = new \AMQPQueue($this->channel()); 
  115.   } 
  116.   return $this->AMQPQueue ; 
  117.  } 
  118.  
  119.  /** Envelope 
  120.   * @return \AMQPEnvelope 
  121.   */ 
  122.  public function envelope() 
  123.  { 
  124.   if(!$this->AMQPEnvelope) { 
  125.    $this->AMQPEnvelope = new \AMQPEnvelope(); 
  126.   } 
  127.   return $this->AMQPEnvelope; 
  128.  } 

ProductMQ.php

  1. <?php 
  2. //生产者 P 
  3. namespace MyObjSummary\rabbitMQ; 
  4. require 'BaseMQ.php'
  5. class ProductMQ extends BaseMQ 
  6.  private $routes = ['hello','word']; //路由key 
  7.  
  8.  /** 
  9.   * ProductMQ constructor. 
  10.   * @throws \AMQPConnectionException 
  11.   */ 
  12.  public function __construct() 
  13.  { 
  14.   parent::__construct(); 
  15.  } 
  16.  
  17.  /** 只控制发送成功 不接受消费者是否收到 
  18.   * @throws \AMQPChannelException 
  19.   * @throws \AMQPConnectionException 
  20.   * @throws \AMQPExchangeException 
  21.   */ 
  22.  public function run() 
  23.  { 
  24.   //频道 
  25.   $channel = $this->channel(); 
  26.   //创建交换机对象 
  27.   $ex = $this->exchange(); 
  28.   //消息内容 
  29.   $message = 'product message '.rand(1,99999); 
  30.   //开始事务 
  31.   $channel->startTransaction(); 
  32.   $sendEd = true ; 
  33.   foreach ($this->routes as $route) { 
  34.    $sendEd = $ex->publish($message$route) ; 
  35.    echo "Send Message:".$sendEd."\n"
  36.   } 
  37.   if(!$sendEd) { 
  38.    $channel->rollbackTransaction(); 
  39.   } 
  40.   $channel->commitTransaction(); //提交事务 
  41.   $this->close(); 
  42.   die ; 
  43.  } 
  44. try{ 
  45.  (new ProductMQ())->run(); 
  46. }catch (\Exception $exception){ 
  47.  var_dump($exception->getMessage()) ; 

ConsumerMQ.php

  1. <?php 
  2. //消费者 C 
  3. namespace MyObjSummary\rabbitMQ; 
  4. require 'BaseMQ.php'
  5. class ConsumerMQ extends BaseMQ 
  6.  private $q_name = 'hello'//队列名 
  7.  private $route = 'hello'//路由key 
  8.  
  9.  /** 
  10.   * ConsumerMQ constructor. 
  11.   * @throws \AMQPConnectionException 
  12.   */ 
  13.  public function __construct() 
  14.  { 
  15.   parent::__construct(); 
  16.  } 
  17.  
  18.  /** 接受消息 如果终止 重连时会有消息 
  19.   * @throws \AMQPChannelException 
  20.   * @throws \AMQPConnectionException 
  21.   * @throws \AMQPExchangeException 
  22.   * @throws \AMQPQueueException 
  23.   */ 
  24.  public function run() 
  25.  { 
  26.  
  27.   //创建交换机 
  28.   $ex = $this->exchange(); 
  29.   $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 
  30.   $ex->setFlags(AMQP_DURABLE); //持久化 
  31.   //echo "Exchange Status:".$ex->declare()."\n"; 
  32.  
  33.   //创建队列 
  34.   $q = $this->queue(); 
  35.   //var_dump($q->declare());exit(); 
  36.   $q->setName($this->q_name); 
  37.   $q->setFlags(AMQP_DURABLE); //持久化 
  38.   //echo "Message Total:".$q->declareQueue()."\n"; 
  39.  
  40.   //绑定交换机与队列,并指定路由键 
  41.   echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n"
  42.  
  43.   //阻塞模式接收消息 
  44.   echo "Message:\n"
  45.   while(True){ 
  46.    $q->consume(function ($envelope,$queue){ 
  47.     $msg = $envelope->getBody(); 
  48.     echo $msg."\n"//处理消息 
  49.     $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
  50.    }); 
  51.    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 
  52.   } 
  53.   $this->close(); 
  54.  } 
  55. try{ 
  56.  (new ConsumerMQ)->run(); 
  57. }catch (\Exception $exception){ 
  58.  var_dump($exception->getMessage()) ; 
  59. }

Tags: PHP+RabbitMQ PHP消息队列

分享到: