当前位置:首页 > PHP教程 > php高级应用 > 列表

php socket服务的模型以及实现 多进程IO复用libevent

发布:smiling 来源: PHP粉丝网  添加日期:2018-09-18 09:30:06 浏览: 评论:0 

端口复用技术,这样就可以很好的解决惊群问题和stream_socket_server性能瓶颈的问题.

  1. /** 
  2.  * 多进程IO复用libevent 
  3.  * 同时处理多个连接 
  4.  * 端口复用---建议php7 
  5.  */ 
  6. class Xtgxiso_server 
  7.     public $socket = false; 
  8.     public $master = array(); 
  9.     public $onConnect = null; 
  10.     public $onMessage = null; 
  11.     public $onClose = null; 
  12.     public $process_num = 2; 
  13.     private $pids = array(); 
  14.     public $receive = array(); 
  15.     private  $host='127.0.0.1'
  16.     private $port = 1215; 
  17.  
  18.     function __construct($host="0.0.0.0",$port=1215){ 
  19.         //产生子进程分支 
  20.         $pid = pcntl_fork(); 
  21.         if ($pid == -1) { 
  22.             die("could not fork"); //pcntl_fork返回-1标明创建子进程失败 
  23.         } else if ($pid) { 
  24.             exit(); //父进程中pcntl_fork返回创建的子进程进程号 
  25.         } else { 
  26.             // 子进程pcntl_fork返回的时0 
  27.         } 
  28.         // 从当前终端分离 
  29.         if (posix_setsid() == -1) { 
  30.             die("could not detach from terminal"); 
  31.         } 
  32.         umask(0); 
  33.         $this->host = $host
  34.         $this->port = $port
  35.     } 
  36.  
  37.     private function start_worker_process(){ 
  38.         $pid = pcntl_fork(); 
  39.         switch ($pid) { 
  40.             case -1: 
  41.                 echo "fork error : {$i} \r\n"
  42.                 exit
  43.             case 0: 
  44.                 $context_option['socket']['so_reuseport'] = 1; 
  45.                 $context = stream_context_create($context_option); 
  46.                 $this->socket = stream_socket_server("tcp://".$this->host.":".$this->port, $errno$errstr,STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,$context); 
  47.                 if (!$this->socket) die($errstr."--".$errno); 
  48.                 stream_set_blocking($this->socket,0); 
  49.                 $id = (int)$this->socket; 
  50.                 $this->master[$id] = $this->socket; 
  51.                 $base = event_base_new(); 
  52.                 $event = event_new(); 
  53.                 event_set($event$this->socket, EV_READ | EV_PERSIST, array(__CLASS__'ev_accept'), $base); 
  54.                 event_base_set($event$base); 
  55.                 event_add($event); 
  56.                 echo   posix_getpid()." start run...\n"
  57.                 event_base_loop($base); 
  58.             default
  59.                 $this->pids[$pid] = $pid
  60.                 break
  61.         } 
  62.     } 
  63.  
  64.     public function run(){ 
  65.  
  66.         for($i = 1; $i <= $this->process_num; $i++){ 
  67.             $this->start_worker_process(); 
  68.         } 
  69.  
  70.         while(1){ 
  71.             foreach ($this->pids as $i => $pid) { 
  72.                 if($pid) { 
  73.                     $res = pcntl_waitpid($pid$status,WNOHANG); 
  74.  
  75.                     if ( $res == -1 || $res > 0 ){ 
  76.                         $this->start_worker_process(); 
  77.                         unset($this->pids[$pid]); 
  78.                     } 
  79.                 } 
  80.             } 
  81.             sleep(1); 
  82.         } 
  83.     } 
  84.  
  85.     public function ev_accept($socket$flag$base){ 
  86.         $connection = @stream_socket_accept($socket); 
  87.         echo posix_getpid()." -- accepted " . stream_socket_get_name($connection,true) . "\n"
  88.         if ( !$connection ){ 
  89.             return
  90.         } 
  91.         stream_set_blocking($connection, 0); 
  92.         $id = (int)$connection
  93.         if($this->onConnect) { 
  94.             call_user_func($this->onConnect, $connection); 
  95.         } 
  96.         $buffer = event_buffer_new($connectionarray(__CLASS__'ev_read'), array(__CLASS__'ev_write'), array(__CLASS__'ev_error'), $id); 
  97.         event_buffer_base_set($buffer$base); 
  98.         event_buffer_timeout_set($buffer, 30, 30); 
  99.         event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff); 
  100.         event_buffer_priority_set($buffer, 10); 
  101.         event_buffer_enable($buffer, EV_READ | EV_PERSIST); 
  102.         $this->master[$id] = $connection
  103.         $this->buffer[$id] = $buffer
  104.         $this->receive[$id] = ''
  105.     } 
  106.  
  107.     function ev_read($buffer$id
  108.     { 
  109.         while( 1 ) { 
  110.             $read = event_buffer_read($buffer, 3); 
  111.             if($read === '' || $read === false) 
  112.             { 
  113.                 break
  114.             } 
  115.             $pos = strpos($read"\n"); 
  116.             if($pos === false) 
  117.             { 
  118.                 $this->receive[$id] .= $read
  119.                 //echo "received:".$read.";not all package,continue recdiveing\n"; 
  120.             }else
  121.                 $this->receive[$id] .= trim(substr ($read,0,$pos+1)); 
  122.                 $read = substr($read,$pos+1); 
  123.                 if($this->onMessage) 
  124.                 { 
  125.                     call_user_func($this->onMessage,$this->master[$id],$this->receive[$id]); 
  126.                 } 
  127.                 switch ( $this->receive[$id] ){ 
  128.                     case "quit"
  129.                         echo "client close conn\n"
  130.                         if($this->onClose) { 
  131.                             call_user_func($this->onClose, $this->master[$id]); 
  132.                         } 
  133.                         fclose($this->master[$id]); 
  134.                         break
  135.                     default
  136.                         //echo "all package:\n"; 
  137.                         //echo $this->receive[$id]."\n"; 
  138.                         break
  139.                 } 
  140.                 $this->receive[$id]=''
  141.             } 
  142.         } 
  143.     } 
  144.  
  145.     function ev_write($buffer$id
  146.     { 
  147.         echo "$id -- " ."\n"
  148.     } 
  149.  
  150.     function ev_error($buffer$error$id
  151.     { 
  152.         echo "ev_error - ".$error."\n"
  153.     } 
  154.  
  155. $server =  new Xtgxiso_server(); 
  156.  
  157. $server->onConnect = function($conn){ 
  158.     echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n"
  159.     fwrite($conn,"conn success\n"); 
  160. }; 
  161.  
  162. $server->onMessage = function($conn,$msg){ 
  163.     echo "onMessage --" . $msg . "\n"
  164.     fwrite($conn,"received ".$msg."\n"); 
  165. }; //phpfensi.com 
  166.  
  167. $server->onClose = function($conn){ 
  168.     echo "onClose --" . stream_socket_get_name($conn,true) . "\n"
  169. }; 
  170.  
  171. $server->run(); 

经过多次服务模型的演变,基本我们实现了一个高性能的服务模型!

Tags: socket模型 多进程IO libevent

分享到:

相关文章