当前位置:首页 > PHP教程 > php高级应用 > 列表

PHP基于rabbitmq操作类的生产者和消费者功能示例

发布:smiling 来源: PHP粉丝网  添加日期:2021-10-01 16:46:21 浏览: 评论:0 

这篇文章主要介绍了PHP基于rabbitmq操作类的生产者和消费者功能,结合实例形式分析了基于rabbitmq操作类的生产者和消费者定义与使用方法,需要的朋友可以参考下

本文实例讲述了PHP基于rabbitmq操作类的生产者和消费者功能,分享给大家供大家参考,具体如下:

注意事项:

1、accept.php消费者代码需要在命令行执行

2、'username'=>'asdf','password'=>'123456' 改成自己的帐号和密码

RabbitMQCommand.php操作类代码

  1. <?php 
  2. /* 
  3.  * amqp协议操作类,可以访问rabbitMQ 
  4.  * 需先安装php_amqp扩展 
  5.  */ 
  6. class RabbitMQCommand{ 
  7.   public $configs = array(); 
  8.   //交换机名称 
  9.   public $exchange_name = ''
  10.   //队列名称 
  11.   public $queue_name = ''
  12.   //路由名称 
  13.   public $route_key = ''
  14.   /* 
  15.    * 持久化,默认True 
  16.    */ 
  17.   public $durable = True; 
  18.   /* 
  19.    * 自动删除 
  20.    * exchange is deleted when all queues have finished using it 
  21.    * queue is deleted when last consumer unsubscribes 
  22.    * 
  23.    */ 
  24.   public $autodelete = False; 
  25.   /* 
  26.    * 镜像 
  27.    * 镜像队列,打开后消息会在节点之间复制,有master和slave的概念 
  28.    */ 
  29.   public $mirror = False; 
  30.   private $_conn = Null; 
  31.   private $_exchange = Null; 
  32.   private $_channel = Null; 
  33.   private $_queue = Null; 
  34.   /* 
  35.    * @configs array('host'=>$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/') 
  36.    */ 
  37.   public function __construct($configs = array(), $exchange_name = ''$queue_name = ''$route_key = '') { 
  38.     $this->setConfigs($configs); 
  39.     $this->exchange_name = $exchange_name
  40.     $this->queue_name = $queue_name
  41.     $this->route_key = $route_key
  42.   } 
  43.   private function setConfigs($configs) { 
  44.     if (!is_array($configs)) { 
  45.       throw new Exception('configs is not array'); 
  46.     } 
  47.     if (!($configs['host'] && $configs['port'] && $configs['username'] && $configs['password'])) { 
  48.       throw new Exception('configs is empty'); 
  49.     } 
  50.     if (emptyempty($configs['vhost'])) { 
  51.       $configs['vhost'] = '/'
  52.     } 
  53.     $configs['login'] = $configs['username']; 
  54.     unset($configs['username']); 
  55.     $this->configs = $configs
  56.   } 
  57.   /* 
  58.    * 设置是否持久化,默认为True 
  59.    */ 
  60.   public function setDurable($durable) { 
  61.     $this->durable = $durable
  62.   } 
  63.   /* 
  64.    * 设置是否自动删除 
  65.    */ 
  66.   public function setAutoDelete($autodelete) { 
  67.     $this->autodelete = $autodelete
  68.   } 
  69.   /* 
  70.    * 设置是否镜像 
  71.    */ 
  72.   public function setMirror($mirror) { 
  73.     $this->mirror = $mirror
  74.   } 
  75.   /* 
  76.    * 打开amqp连接 
  77.    */ 
  78.   private function open() { 
  79.     if (!$this->_conn) { 
  80.       try { 
  81.         $this->_conn = new AMQPConnection($this->configs); 
  82.         $this->_conn->connect(); 
  83.         $this->initConnection(); 
  84.       } catch (AMQPConnectionException $ex) { 
  85.         throw new Exception('cannot connection rabbitmq',500); 
  86.       } 
  87.     } 
  88.   } 
  89.   /* 
  90.    * rabbitmq连接不变 
  91.    * 重置交换机,队列,路由等配置 
  92.    */ 
  93.   public function reset($exchange_name$queue_name$route_key) { 
  94.     $this->exchange_name = $exchange_name
  95.     $this->queue_name = $queue_name
  96.     $this->route_key = $route_key
  97.     $this->initConnection(); 
  98.   } 
  99.   /* 
  100.    * 初始化rabbit连接的相关配置 
  101.    */ 
  102.   private function initConnection() { 
  103.     if (emptyempty($this->exchange_name) || emptyempty($this->queue_name) || emptyempty($this->route_key)) { 
  104.       throw new Exception('rabbitmq exchange_name or queue_name or route_key is empty',500); 
  105.     } 
  106.     $this->_channel = new AMQPChannel($this->_conn); 
  107.     $this->_exchange = new AMQPExchange($this->_channel); 
  108.     $this->_exchange->setName($this->exchange_name); 
  109.     $this->_exchange->setType(AMQP_EX_TYPE_DIRECT); 
  110.     if ($this->durable) 
  111.       $this->_exchange->setFlags(AMQP_DURABLE); 
  112.     if ($this->autodelete) 
  113.       $this->_exchange->setFlags(AMQP_AUTODELETE); 
  114.     $this->_exchange->declare(); 
  115.     $this->_queue = new AMQPQueue($this->_channel); 
  116.     $this->_queue->setName($this->queue_name); 
  117.     if ($this->durable) 
  118.       $this->_queue->setFlags(AMQP_DURABLE); 
  119.     if ($this->autodelete) 
  120.       $this->_queue->setFlags(AMQP_AUTODELETE); 
  121.     if ($this->mirror) 
  122.       $this->_queue->setArgument('x-ha-policy''all'); 
  123.     $this->_queue->declare(); 
  124.     $this->_queue->bind($this->exchange_name, $this->route_key); 
  125.   } 
  126.   public function close() { 
  127.     if ($this->_conn) { 
  128.       $this->_conn->disconnect(); 
  129.     } 
  130.   } 
  131.   public function __sleep() { 
  132.     $this->close(); 
  133.     return array_keys(get_object_vars($this)); 
  134.   } 
  135.   public function __destruct() { 
  136.     $this->close(); 
  137.   } 
  138.   /* 
  139.    * 生产者发送消息 
  140.    */ 
  141.   public function send($msg) { 
  142.     $this->open(); 
  143.     if(is_array($msg)){ 
  144.       $msg = json_encode($msg); 
  145.     }else
  146.       $msg = trim(strval($msg)); 
  147.     } 
  148.     return $this->_exchange->publish($msg$this->route_key); 
  149.   } 
  150.   /* 
  151.    * 消费者 
  152.    * $fun_name = array($classobj,$function) or function name string 
  153.    * $autoack 是否自动应答 
  154.    * 
  155.    * function processMessage($envelope, $queue) { 
  156.       $msg = $envelope->getBody(); 
  157.       echo $msg."\n"; //处理消息 
  158.       $queue->ack($envelope->getDeliveryTag());//手动应答 
  159.     } 
  160.    */ 
  161.   public function run($fun_name$autoack = True){ 
  162.     $this->open(); 
  163.     if (!$fun_name || !$this->_queue) return False; 
  164.     while(True){ 
  165.       if ($autoack$this->_queue->consume($fun_name, AMQP_AUTOACK); 
  166.       else $this->_queue->consume($fun_name); 
  167.     } 
  168.   } 

send.php生产者代码

  1. <?php 
  2. set_time_limit(0); 
  3. include_once('RabbitMQCommand.php'); 
  4. $configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/'); 
  5. $exchange_name = 'class-e-1'
  6. $queue_name = 'class-q-1'
  7. $route_key = 'class-r-1'
  8. $ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key); 
  9. for($i=0;$i<=100;$i++){ 
  10.   $ra->send(date('Y-m-d H:i:s',time())); 
  11. exit(); 

accept.php消费者代码

  1. <?php 
  2. error_reporting(0); 
  3. include_once('RabbitMQCommand.php'); 
  4. $configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/'); 
  5. $exchange_name = 'class-e-1'
  6. $queue_name = 'class-q-1'
  7. $route_key = 'class-r-1'
  8. $ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key); 
  9. class A{ 
  10.   function processMessage($envelope$queue) { 
  11.     $msg = $envelope->getBody(); 
  12.     $envelopeID = $envelope->getDeliveryTag(); 
  13.     $pid = posix_getpid(); 
  14.     file_put_contents("log{$pid}.log"$msg.'|'.$envelopeID.''."\r\n",FILE_APPEND); 
  15.     $queue->ack($envelopeID); 
  16.   } 
  17. $a = new A(); 
  18. $s = $ra->run(array($a,'processMessage'),false);

Tags: rabbitmq PHP生产者

分享到: