当前位置:首页 > PHP教程 > php高级应用 > 列表

swoole_process实现进程池的方法示例

发布:smiling 来源: PHP粉丝网  添加日期:2021-11-01 14:21:21 浏览: 评论:0 

这篇文章主要介绍了swoole_process实现进程池的方法示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考,一起跟随小编过来看看吧。

swoole —— 重新定义PHP

swoole 的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe),对swoole_process 的研究在swoole中显得尤为重要。

预备知识

IO多路复用

swoole 中的io多路复用表现为底层的 epoll进程模型,在C语言中表现为 epoll 函数。

epoll 模型下会持续监听自己名下的素有socket 描述符 fd

当触发了 socket 监听的事件时,epoll 函数才会响应,并返回所有监听该时间的 socket 集合

epoll 的本质是阻塞IO,它的优点在于能同事处理大量socket连接

Event loop 事件循环

swoole 对 epoll 实现了一个Reactor线程模型封装,设置了read事件和write事件的监听回调函数。(详见swoole_event_add)

Event loop 是一个Reactor线程,其中运行了一个epoll实例。

通过swoole_event_add将socket描述符的一个事件添加到epoll监听中,事件发生时将执行回调函数

不可用于fpm环境下,因为fpm在任务结束时可能会关掉进程。

swoole_process

基于C语言封装的进程管理模块,方便php来调用内置管道、消息队列接口,方便实现进程间通信

我们在php-fpm.conf配置文件中发现,php-fpm中有两种进程池管理设置。

静态模式 即初始化固定的进程数,当来了一个请求时,从中选取一个进程来处理。

动态模式 指定最小、最大进程数,当请求量过大,进程数不超过最大限制时,新增线程去处理请求

接下来用swoole代码来实现,这里只是为理解swoole_process、进程间通信、定时器等使用,实际情况使用封装好的swoole_server来实现task任务队列池会更方便。

假如有个定时投递的任务队列:

  1. <?php 
  2.  
  3. /** 
  4.  * 动态进程池,类似fpm 
  5.  * 动态新建进程 
  6.  * 有初始进程数,最小进程数,进程不够处理时候新建进程,不超过最大进程数 
  7.  */ 
  8.  
  9. // 一个进程定时投递任务 
  10.  
  11. /** 
  12.  * 1. tick 
  13.  * 2. process及其管道通讯 
  14.  * 3. event loop 事件循环 
  15.  */ 
  16. class processPool 
  17.   private $pool
  18.  
  19.   /** 
  20.    * @var swoole_process[] 记录所有worker的process对象 
  21.    */ 
  22.   private $workers = []; 
  23.  
  24.   /** 
  25.    * @var array 记录worker工作状态 
  26.    */ 
  27.   private $used_workers = []; 
  28.  
  29.   /** 
  30.    * @var int 最小进程数 
  31.    */ 
  32.   private $min_woker_num = 5; 
  33.  
  34.   /** 
  35.    * @var int 初始进程数 
  36.    */ 
  37.   private $start_worker_num = 10; 
  38.  
  39.   /** 
  40.    * @var int 最大进程数 
  41.    */ 
  42.   private $max_woker_num = 20; 
  43.  
  44.   /** 
  45.    * 进程闲置销毁秒数 
  46.    * @var int 
  47.    */ 
  48.   private $idle_seconds = 5; 
  49.  
  50.   /** 
  51.    * @var int 当前进程数 
  52.    */ 
  53.   private $curr_num
  54.  
  55.   /** 
  56.    * 闲置进程时间戳 
  57.    * @var array 
  58.    */ 
  59.   private $active_time = []; 
  60.  
  61.   public function __construct() 
  62.   { 
  63.     $this->pool = new swoole_process(function () { 
  64.       // 循环建立worker进程 
  65.       for ($i = 0; $i < $this->start_worker_num; $i++) { 
  66.         $this->createWorker(); 
  67.       } 
  68.       echo '初始化进程数:' . $this->curr_num . PHP_EOL; 
  69.       // 每秒定时往闲置的worker的管道中投递任务 
  70.       swoole_timer_tick(1000, function ($timer_id) { 
  71.         static $count = 0; 
  72.         $count++; 
  73.         $need_create = true; 
  74.         foreach ($this->used_workers as $pid => $used) { 
  75.           if ($used == 0) { 
  76.             $need_create = false; 
  77.             $this->workers[$pid]->write($count . ' job'); 
  78.             // 标记使用中 
  79.             $this->used_workers[$pid] = 1; 
  80.             $this->active_time[$pid] = time(); 
  81.             break
  82.           } 
  83.         } 
  84.         foreach ($this->used_workers as $pid => $used
  85.           // 如果所有worker队列都没有闲置的,则新建一个worker来处理 
  86.           if ($need_create && $this->curr_num < $this->max_woker_num) { 
  87.             $new_pid = $this->createWorker(); 
  88.             $this->workers[$new_pid]->write($count . ' job'); 
  89.             $this->used_workers[$new_pid] = 1; 
  90.             $this->active_time[$new_pid] = time(); 
  91.           } 
  92.  
  93.         // 闲置超过一段时间则销毁进程 
  94.         foreach ($this->active_time as $pid => $timestamp) { 
  95.           if ((time() - $timestamp) > $this->idle_seconds && $this->curr_num > $this->min_woker_num) { 
  96.             // 销毁该进程 
  97.             if (isset($this->workers[$pid]) && $this->workers[$pid] instanceof swoole_process) { 
  98.               $this->workers[$pid]->write('exit'); 
  99.               unset($this->workers[$pid]); 
  100.               $this->curr_num = count($this->workers); 
  101.               unset($this->used_workers[$pid]); 
  102.               unset($this->active_time[$pid]); 
  103.               echo "{$pid} destroyed\n"
  104.               break
  105.             } 
  106.           } 
  107.         } 
  108.  
  109.         echo "任务{$count}/{$this->curr_num}\n"
  110.  
  111.         if ($count == 20) { 
  112.           foreach ($this->workers as $pid => $worker) { 
  113.             $worker->write('exit'); 
  114.           } 
  115.           // 关闭定时器 
  116.           swoole_timer_clear($timer_id); 
  117.           // 退出进程池 
  118.           $this->pool->exit(0); 
  119.           exit(); 
  120.         } 
  121.       }); 
  122.  
  123.     }); 
  124.  
  125.     $master_pid = $this->pool->start(); 
  126.     echo "Master $master_pid start\n"
  127.  
  128.     while ($ret = swoole_process::wait()) { 
  129.       $pid = $ret['pid']; 
  130.       echo "process {$pid} existed\n"
  131.     } 
  132.   } 
  133.  
  134.   /** 
  135.    * 创建一个新进程 
  136.    * @return int 新进程的pid 
  137.    */ 
  138.   public function createWorker() 
  139.   { 
  140.     $worker_process = new swoole_process(function (swoole_process $worker) { 
  141.       // 给子进程管道绑定事件 
  142.       swoole_event_add($worker->pipe, function ($pipeuse ($worker) { 
  143.         $data = trim($worker->read()); 
  144.         if ($data == 'exit') { 
  145.           $worker->exit(0); 
  146.           exit(); 
  147.         } 
  148.         echo "{$worker->pid} 正在处理 {$data}\n"
  149.         sleep(5); 
  150.         // 返回结果,表示空闲 
  151.         $worker->write("complete"); 
  152.       }); 
  153.     }); 
  154.  
  155.     $worker_pid = $worker_process->start(); 
  156.  
  157.     // 给父进程管道绑定事件 
  158.     swoole_event_add($worker_process->pipe, function ($pipeuse ($worker_process) { 
  159.       $data = trim($worker_process->read()); 
  160.       if ($data == 'complete') { 
  161.         // 标记为空闲 
  162. //        echo "{$worker_process->pid} 空闲了\n"; 
  163.         $this->used_workers[$worker_process->pid] = 0; 
  164.       } 
  165.     }); 
  166.  
  167.     // 保存process对象 
  168.     $this->workers[$worker_pid] = $worker_process
  169.     // 标记为空闲 
  170.     $this->used_workers[$worker_pid] = 0; 
  171.     $this->active_time[$worker_pid] = time(); 
  172.     $this->curr_num = count($this->workers); 
  173.     return $worker_pid
  174.   } 
  175.  
  176.  
  177. new processPool();

Tags: swoole_process php进程池

分享到: