当前位置:首页 > PHP教程 > php高级应用 > 列表

php中socket服务的模型下的编程方式(同步和异步)

发布:smiling 来源: PHP粉丝网  添加日期:2018-09-18 09:27:45 浏览: 评论:0 

前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下。

如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待,如果并发三个请求,那么三个请求的执行时间会分别昌1秒,2秒,3秒.提高并发的方法有以下几种:

1:多启动进程,提高并发数

2:优化业务,减少耗时间相当于减少阻塞时间,提高并发数

3:异步编程,避免阻塞,提高并发数

这里我们重点介绍第三种方法,以访问第三方http为例。

代码如下:

  1. //同步读取 
  2. function get_data_blocking(){ 
  3.     $socket = stream_socket_client("tcp://test.raventech.cn:80"$errno$errstr, 6); 
  4.     fwrite($socket"GET /sleep1.php HTTP/1.0\r\nHost: test.raventech.cn\r\nAccept: */*\r\n\r\n"); 
  5.     $str = ""
  6.     while (!feof($socket)) { 
  7.         $str .= fgets($socket, 1024); 
  8.     } 
  9.     fclose($socket); 
  10.     return $str
  11.  
  12. //异步读取 
  13. function get_data_unblocking(){ 
  14.     $socket = stream_socket_client("tcp://test.raventech.cn:80"$errno$errstr, 6); 
  15.     stream_set_blocking($socket, 0); 
  16.     fwrite($socket"GET /sleep1.php HTTP/1.0\r\nHost: test.raventech.cn\r\nAccept: */*\r\n\r\n"); 
  17.     $write  = NULL; 
  18.     $except = NULL; 
  19.     while$socket ){ 
  20.         $read   = array($socket); 
  21.         $num_changed_streams = stream_select($read$write$except, 0); 
  22.         if ( $num_changed_streams > 0 ) { 
  23.             foreach($read as $r){ 
  24.                 $str = fread($r,2048); 
  25.                 fclose($socket); 
  26.                 $socket = false; 
  27.                 return $str
  28.             } 
  29.         } 
  30.         usleep(100); 
  31.     } 
  32.  
  33. //真正的异步读取--利用server的IO复用事件来提高并发 
  34. class Get_data_event{ 
  35.  
  36.     public $onMessage = null; 
  37.     private $str=''
  38.  
  39.     function __construct(&$server){ 
  40.         $socket = stream_socket_client("tcp://test.xtgxiso.cn:80"$errno$errstr, 6); 
  41.         stream_set_blocking($socket, 0); 
  42.         fwrite($socket"GET /sleep1.php HTTP/1.0\r\nHost: test.xtgxiso.cn\r\nAccept: */*\r\n\r\n"); 
  43.         $server->add_socket($socketarray($this'read')); 
  44.     } 
  45.  
  46.     public function read($socket){ 
  47.         while (1) { 
  48.             $buffer = fread($socket, 1024); 
  49.             if ($buffer === '' || $buffer === false) { 
  50.                 break
  51.             } 
  52.             $this->str .= $buffer
  53.         } 
  54.         if$this->onMessage && $this->str ) { 
  55.             call_user_func($this->onMessage, $this->str); 
  56.         } 
  57.         $this->str = ''
  58.         return false; 
  59.     } 
  60.  
  61.  
  62. /** 
  63.  * 单进程IO复用select 
  64.  */ 
  65. class Xtgxiso_server 
  66.     public $socket = false; 
  67.     public $master = array(); 
  68.     public $onConnect = null; 
  69.     public $onMessage = null; 
  70.     public $other_socket_callback = array(); 
  71.  
  72.     function __construct($host="0.0.0.0",$port=1215) 
  73.     { 
  74.         $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno$errstr); 
  75.         if (!$this->socket) die($errstr."--".$errno); 
  76.         stream_set_blocking($this->socket,0); 
  77.         $id = (int)$this->socket; 
  78.         $this->master[$id] = $this->socket; 
  79.     } 
  80.  
  81.     public function add_socket($socket,$callback){ 
  82.         $id = (int)$socket
  83.         $this->master[$id] = $socket
  84.         $this->other_socket_callback[$id] = $callback
  85.     } 
  86.  
  87.     public function run(){ 
  88.         $read = $this->master; 
  89.         $receive = array(); 
  90.         echo  "start run...\n"
  91.         while ( 1 ) { 
  92.             $read = $this->master; 
  93.             //echo  "waiting...\n"; 
  94.             $mod_fd = @stream_select($read$_w = NULL, $_e = NULL, 60); 
  95.             if ($mod_fd === FALSE) { 
  96.                 break
  97.             } 
  98.             foreach ( $read as $k => $v ) { 
  99.                 $id = (int)$v
  100.                 if ( $v === $this->socket ) { 
  101.                     //echo "new conn\n"; 
  102.                     $conn = stream_socket_accept($this->socket); 
  103.                     if ($this->onConnect) { 
  104.                         call_user_func($this->onConnect, $conn); 
  105.                     } 
  106.                     $id = (int)$conn
  107.                     $this->master[$id] = $conn
  108.                 } else if ( @$this->other_socket_callback[$id] ){ 
  109.                     call_user_func_array($this->other_socket_callback[$id], array($v)); 
  110.                 } else { 
  111.                     //echo "read data\n"; 
  112.                     if ( !isset($receive[$k]) ){ 
  113.                         $receive[$k]=""
  114.                     } 
  115.                     $buffer = fread($v, 1024); 
  116.                     //echo $buffer."\n"; 
  117.                     if ( strlen($buffer) === 0 ) { 
  118.                         if ( $this->onClose ){ 
  119.                             call_user_func($this->onClose,$v); 
  120.                         } 
  121.                         fclose($v); 
  122.                         $id = (int)$v
  123.                         unset($this->master[$id]); 
  124.                     } else if ( $buffer === FALSE ) { 
  125.                         if ( $this->onClose ){ 
  126.                             call_user_func($this->onClose, $this->master[$key_to_del]); 
  127.                         } 
  128.                         fclose($v); 
  129.                         $id = (int)$v
  130.                         unset($this->master[$id]); 
  131.                     } else { 
  132.                         $pos = strpos($buffer"\r\n\r\n"); 
  133.                         if ( $pos === false) { 
  134.                             $receive[$k] .= $buffer
  135.                             //echo "received:".$buffer.";not all package,continue recdiveing\n"; 
  136.                         }else
  137.                             $receive[$k] .= trim(substr ($buffer,0,$pos+4)); 
  138.                             $buffer = substr($buffer,$pos+4); 
  139.                             if($this->onMessage) { 
  140.                                 call_user_func($this->onMessage,$v,$receive[$k]); 
  141.                             } 
  142.                             $receive[$k]=''
  143.                         } 
  144.                     } 
  145.                 } 
  146.             } 
  147.             usleep(10000); 
  148.         } 
  149.     } 
  150.  
  151.  
  152. $server =  new Xtgxiso_server(); 
  153.  
  154. $server->onConnect = function($conn){ 
  155.     echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n"
  156. }; 
  157.  
  158. $server->onMessage = function($conn,$msguse ( $server ) { 
  159.     /* 
  160.     $respone ="";//响应内容 
  161.     $respone = "HTTP/1.1 200 OK\r\n"; 
  162.     $respone .= "Server: openresty\r\n"; 
  163.     $respone .= "Content-Type: text/html; charset=utf-8\r\n"; 
  164.     $body = time().rand(111111,999999); 
  165.     $len = strlen($body); 
  166.     $respone .= "Content-Length:$len\r\n"; 
  167.     $respone .= "Connection: close\r\n"; 
  168.     $respone .= "\r\n$body\r\n\r\n"; 
  169.     echo "onMessage --" . $msg . "\n"; 
  170.     */ 
  171.  
  172.     //同步读取 
  173.     //$respone = get_data_blocking(); 
  174.     //fwrite($conn,$respone); 
  175.  
  176.     //异步读取 
  177.     //$respone = get_data_unblocking(); 
  178.     //fwrite($conn,$respone); 
  179.  
  180.     //真正异步 
  181.     $data = new Get_data_event($server); 
  182.     $data->onMessage = function($struse($conn){ 
  183.         fwrite($conn,$str); 
  184.     }; 
  185.  
  186. }; 
  187.  
  188. $server->onClose = function($conn){ 
  189.     echo "onClose --" . "\n"
  190. }; 
  191.  
  192. $server->run(); 

第三方服务sleep1.php的代码比较简单:

  1. sleep(1);//模拟耗时 
  2. echo "OK"

通过以上代码示例,我们分别注释运行 同步读取,异步读取,真正异步,来观察server的并发.测试方法可以写个test.html来模拟三个并发.

  1. <script src="http://127.0.0.1:1215/?id=1"></script> 
  2. <script src="http://127.0.0.1:1215/?id=2"></script> 
  3. <script src="http://127.0.0.1:1215/?id=3"></script> 

通过测试发现,真正异步的是并发的,每个请求耗时1秒,这样我们总算明白什么是真正的非阻塞异步编程了,关键就在共用IO复用.

Tags: php模型 socket

分享到: