当前位置:首页 > PHP教程 > php应用 > 列表

PHP编程中尝试程序并发的几种方式总结

发布:smiling 来源: PHP粉丝网  添加日期:2019-11-14 14:59:37 浏览: 评论:0 

本文大约总结了PHP编程中的五种并发方式:

1.curl_multi_init

文档中说的是 Allows the processing of multiple cURL handles asynchronously. 确实是异步。这里需要理解的是select这个方法,文档中是这么解释的Blocks until there is activity on any of the curl_multi connections.。了解一下常见的异步模型就应该能理解,select, epoll,都很有名.

  1. // build the individual requests as above, but do not execute them 
  2.  
  3. $ch_1 = curl_init('http://www.jb51.net/'); 
  4.  
  5. $ch_2 = curl_init('http://www.jb51.net/'); 
  6.  
  7. curl_setopt($ch_1, CURLOPT_RETURNTRANSFER, true); 
  8.  
  9. curl_setopt($ch_2, CURLOPT_RETURNTRANSFER, true); 
  10.  
  11.   
  12.  
  13. // build the multi-curl handle, adding both $ch 
  14.  
  15. $mh = curl_multi_init(); 
  16.  
  17. curl_multi_add_handle($mh$ch_1); 
  18.  
  19. curl_multi_add_handle($mh$ch_2); 
  20.  
  21.   
  22.  
  23. // execute all queries simultaneously, and continue when all are complete 
  24.  
  25. $running = null; 
  26.  
  27. do { 
  28.  
  29.   curl_multi_exec($mh$running); 
  30.  
  31.   $ch = curl_multi_select($mh); 
  32.  
  33.   if($ch !== 0){ 
  34.  
  35.     $info = curl_multi_info_read($mh); 
  36.  
  37.     if($info){ 
  38.  
  39.       var_dump($info); 
  40.  
  41.       $response_1 = curl_multi_getcontent($info['handle']); 
  42.  
  43.       echo "$response_1 \n"
  44.  
  45.       break
  46.  
  47.     } 
  48.  
  49.   } 
  50.  
  51. while ($running > 0); 
  52. //phpfensi.com 
  53.  
  54. //close the handles 
  55.  
  56. curl_multi_remove_handle($mh$ch_1); 
  57.  
  58. curl_multi_remove_handle($mh$ch_2); 
  59.  
  60. curl_multi_close($mh); 

这里我设置的是,select得到结果,就退出循环,并且删除 curl resource, 从而达到取消http请求的目的。

2.swoole_client

swoole_client提供了异步模式,我竟然把这个忘了。这里的sleep方法需要swoole版本大于等于1.7.21, 我还没升到这个版本,所以直接exit也可以。

  1. $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC); 
  2.  
  3. //设置事件回调函数 
  4.  
  5. $client->on("connect"function($cli) { 
  6.  
  7.   $req = "GET / HTTP/1.1\r\n 
  8.  
  9.   Host: www.jb51.net\r\n 
  10.  
  11.   Connection: keep-alive\r\n 
  12.  
  13.   Cache-Control: no-cache\r\n 
  14.  
  15.   Pragma: no-cache\r\n\r\n"; 
  16.  
  17.   
  18.  
  19.   for ($i=0; $i < 3; $i++) { 
  20.  
  21.     $cli->send($req); 
  22.  
  23.   } 
  24.  
  25. }); 
  26.  
  27. $client->on("receive"function($cli$data){ 
  28.  
  29.   echo "Received: ".$data."\n"
  30.  
  31.   exit(0); 
  32.  
  33.   $cli->sleep(); // swoole >= 1.7.21 
  34.  
  35. }); 
  36.  
  37. $client->on("error"function($cli){ 
  38.  
  39.   echo "Connect failed\n"
  40.  
  41. }); 
  42. //phpfensi.com 
  43. $client->on("close"function($cli){ 
  44.  
  45.   echo "Connection close\n"
  46.  
  47. }); 
  48.  
  49. //发起网络连接 
  50.  
  51. $client->connect('183.207.95.145', 80, 1); 

3.process

哎,竟然差点忘了 swoole_process, 这里就不用 pcntl 模块了。但是写完发现,这其实也不算是中断请求,而是哪个先到读哪个,忽视后面的返回值。

  1. $workers = []; 
  2.  
  3. $worker_num = 3;//创建的进程数 
  4.  
  5. $finished = false; 
  6.  
  7. $lock = new swoole_lock(SWOOLE_MUTEX); 
  8.  
  9.   
  10.  
  11. for($i=0;$i<$worker_num ; $i++){ 
  12.  
  13.   $process = new swoole_process('process'); 
  14.  
  15.   //$process->useQueue(); 
  16.  
  17.   $pid = $process->start(); 
  18.  
  19.   $workers[$pid] = $process
  20.  
  21.  
  22.   
  23.  
  24. foreach($workers as $pid => $process){ 
  25.  
  26.   //子进程也会包含此事件 
  27.  
  28.   swoole_event_add($process->pipe, function ($pipeuse($process$lock, &$finished) { 
  29.  
  30.     $lock->lock(); 
  31.  
  32.     if(!$finished){ 
  33.  
  34.       $finished = true; 
  35.  
  36.       $data = $process->read(); 
  37.  
  38.       echo "RECV: " . $data.PHP_EOL; 
  39.  
  40.     } 
  41.  
  42.     $lock->unlock(); 
  43.  
  44.   }); 
  45.  
  46.  
  47.   
  48.  
  49. function process(swoole_process $process){ 
  50.  
  51.   $response = 'http response'
  52.  
  53.   $process->write($response); 
  54.  
  55.   echo $process->pid,"\t",$process->callback .PHP_EOL; 
  56.  
  57.  
  58. //phpfensi.com 
  59. for($i = 0; $i < $worker_num$i++) { 
  60.  
  61.   $ret = swoole_process::wait(); 
  62.  
  63.   $pid = $ret['pid']; 
  64.  
  65.   echo "Worker Exit, PID=".$pid.PHP_EOL; 
  66.  

4.pthreads

编译pthreads模块时,提示php编译时必须打开ZTS, 所以貌似必须 thread safe 版本才能使用. wamp中多php正好是TS的,直接下了个dll, 文档中的说明复制到对应目录,就在win下测试了。 还没完全理解,查到文章说 php 的 pthreads 和 POSIX pthreads是完全不一样的。代码有些烂,还需要多看看文档,体会一下。

  1. class Foo extends Stackable { 
  2.  
  3.   public $url
  4.  
  5.   public $response = null; 
  6.  
  7.   public function __construct(){ 
  8.  
  9.     $this->url = 'http://www.jb51.net'
  10.  
  11.   } 
  12.  
  13.   public function run(){} 
  14.  
  15.  
  16. class Process extends Worker { 
  17.  
  18.   private $text = ""
  19.  
  20.   public function __construct($text,$object){ 
  21.  
  22.     $this->text = $text
  23.  
  24.     $this->object = $object
  25.  
  26.   } 
  27.  
  28.   public function run(){ 
  29.  
  30.     while (is_null($this->object->response)){ 
  31.  
  32.       print " Thread {$this->text} is running\n"
  33.  
  34.       $this->object->response = 'http response'
  35.  
  36.       sleep(1); 
  37.  
  38.     } 
  39.  
  40.   } 
  41.  
  42.  
  43. $foo = new Foo(); 
  44.  
  45. $a = new Process("A",$foo); 
  46.  
  47. $a->start(); 
  48.  
  49. $b = new Process("B",$foo); 
  50.  
  51. $b->start(); 
  52.  
  53. echo $foo->response; 

5.yield

以同步方式书写异步代码:

  1. class AsyncServer {  
  2.  
  3.   protected $handler;  
  4.  
  5.   protected $socket;  
  6.  
  7.   protected $tasks = [];  
  8.  
  9.   protected $timers = [];  
  10.  
  11.    
  12.  
  13.   public function __construct(callable $handler) {  
  14.  
  15.     $this->handler = $handler;  
  16.  
  17.    
  18.  
  19.     $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);  
  20.  
  21.     if(!$this->socket) {  
  22.  
  23.       die(socket_strerror(socket_last_error())."\n");  
  24.  
  25.     }  
  26.  
  27.     if (!socket_set_nonblock($this->socket)) {  
  28.  
  29.       die(socket_strerror(socket_last_error())."\n");  
  30.  
  31.     }  
  32.  
  33.     if(!socket_bind($this->socket, "0.0.0.0", 1234)) {  
  34.  
  35.       die(socket_strerror(socket_last_error())."\n");  
  36.  
  37.     }  
  38.  
  39.   }  
  40.  
  41.    
  42.  
  43.   public function Run() {  
  44.  
  45.     while (true) {  
  46.  
  47.       $now = microtime(true) * 1000;  
  48.  
  49.       foreach ($this->timers as $time => $sockets) {  
  50.  
  51.         if ($time > $nowbreak;  
  52.  
  53.         foreach ($sockets as $one) {  
  54.  
  55.           list($socket$coroutine) = $this->tasks[$one];  
  56.  
  57.           unset($this->tasks[$one]);  
  58.  
  59.           socket_close($socket);  
  60.  
  61.           $coroutine->throw(new Exception("Timeout"));  
  62.  
  63.         }  
  64.  
  65.         unset($this->timers[$time]);  
  66.  
  67.       }  
  68.  
  69.    
  70.  
  71.       $reads = array($this->socket);  
  72.  
  73.       foreach ($this->tasks as list($socket)) {  
  74.  
  75.         $reads[] = $socket;  
  76.  
  77.       }  
  78.  
  79.       $writes = NULL;  
  80.  
  81.       $excepts= NULL;  
  82.  
  83.       if (!socket_select($reads$writes$excepts, 0, 1000)) {  
  84.  
  85.         continue;  
  86.  
  87.       }  
  88.  
  89.    
  90.  
  91.       foreach ($reads as $one) {  
  92.  
  93.         $len = socket_recvfrom($one$data, 65535, 0, $ip$port);  
  94.  
  95.         if (!$len) {  
  96.  
  97.           //echo "socket_recvfrom fail.\n";  
  98.  
  99.           continue;  
  100.  
  101.         }  
  102.  
  103.         if ($one == $this->socket) {  
  104.  
  105.           //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n";  
  106.  
  107.           $handler = $this->handler;  
  108.  
  109.           $coroutine = $handler($one$data$len$ip$port);  
  110.  
  111.           if (!$coroutine) {  
  112.  
  113.             //echo "[Run]everything is done.\n";  
  114.  
  115.             continue;  
  116.  
  117.           }  
  118.  
  119.           $task = $coroutine->current();  
  120.  
  121.           //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n";  
  122.  
  123.           $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);  
  124.  
  125.           if(!$socket) {  
  126.  
  127.             //echo socket_strerror(socket_last_error())."\n";  
  128.  
  129.             $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));  
  130.  
  131.             continue;  
  132.  
  133.           }  
  134.  
  135.           if (!socket_set_nonblock($socket)) {  
  136.  
  137.             //echo socket_strerror(socket_last_error())."\n";  
  138.  
  139.             $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));  
  140.  
  141.             continue;  
  142.  
  143.           }  
  144.  
  145.           socket_sendto($socket$task->data, $task->len, 0, $task->ip, $task->port);  
  146.  
  147.           $deadline = $now + $task->timeout;  
  148.  
  149.           $this->tasks[$socket] = [$socket$coroutine$deadline];  
  150.  
  151.           $this->timers[$deadline][$socket] = $socket;  
  152.  
  153.         } else {  
  154.  
  155.           //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n";  
  156.  
  157.           list($socket$coroutine$deadline) = $this->tasks[$one];  
  158.  
  159.           unset($this->tasks[$one]);  
  160.  
  161.           unset($this->timers[$deadline][$one]);  
  162.  
  163.           socket_close($socket);  
  164.  
  165.           $coroutine->send(array($data$len));  
  166.  
  167.         }  
  168.  
  169.       }  
  170.  
  171.     }  
  172.  
  173.   }  
  174.  
  175. }  
  176.  
  177.    
  178.  
  179. class AsyncTask {  
  180.  
  181.   public $data;  
  182.  
  183.   public $len;  
  184.  
  185.   public $ip;  
  186.  
  187.   public $port;  
  188.  
  189.   public $timeout;  
  190.  
  191.    
  192.  
  193.   public function __construct($data$len$ip$port$timeout) {  
  194.  
  195.     $this->data = $data;  
  196.  
  197.     $this->len = $len;  
  198.  
  199.     $this->ip = $ip;  
  200.  
  201.     $this->port = $port;  
  202.  
  203.     $this->timeout = $timeout;  
  204.  
  205.   }  
  206.  
  207. }  
  208.  
  209.    
  210.  
  211. function AsyncSendRecv($req_buf$req_len$ip$port$timeout) {  
  212.  
  213.   return new AsyncTask($req_buf$req_len$ip$port$timeout);  
  214.  
  215. }  
  216.  
  217. //phpfensi.com 
  218. function RequestHandler($socket$req_buf$req_len$ip$port) {  
  219.  
  220.   //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n";  
  221.  
  222.   try {  
  223.  
  224.     list($rsp_buf$rsp_len) = (yield AsyncSendRecv($req_buf$req_len"127.0.0.1", 2345, 3000));  
  225.  
  226.   } catch (Exception $ex) {  
  227.  
  228.     $rsp_buf = $ex->getMessage();  
  229.  
  230.     $rsp_len = strlen($rsp_buf);  
  231.  
  232.     //echo "[Exception]$rsp_buf\n";  
  233.  
  234.   }  
  235.  
  236.   //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n";  
  237.  
  238.   socket_sendto($socket$rsp_buf$rsp_len, 0, $ip$port);  
  239.  
  240. }  
  241.  
  242. $server = new AsyncServer(RequestHandler);  
  243.  
  244. $server->Run();  

代码解读:

借助PHP内置array能力,实现简单的“超时管理”,以毫秒为精度作为时间分片;

封装AsyncSendRecv接口,调用形如yield AsyncSendRecv(),更加自然;

添加Exception作为错误处理机制,添加ret_code亦可,仅为展示之用。

Tags: PHP编程 PHP程序并发

分享到: