当前位置:首页 > PHP教程 > php高级应用 > 列表

PHP编程中尝试程序并发的几种方式总结

发布:smiling 来源: PHP粉丝网  添加日期:2021-07-20 21:33:26 浏览: 评论:0 

这篇文章主要介绍了PHP编程中尝试程序并发的几种方式总结,这里举了借助yield的异步以及swoole_process的进程创建等例子,PHP本身并不支持多线程并发,需要的朋友可以参考下。

本文大约总结了PHP编程中的五种并发方式:

1.curl_multi_init

文档中说的是 Allows the processing of multiple cURL handles asynchronously. 确实是异步。这里需要理解的是select这个方法,文档中是这么解释的Blocks until there is activity on any of the curl_multi connections.。了解一下常见的异步模型就应该能理解,select, epoll,都很有名

  1. <?php 
  2. // build the individual requests as above, but do not execute them 
  3. $ch_1 = curl_init('https://www.phpfensi.com/'); 
  4. $ch_2 = curl_init('https://www.phpfensi.com/'); 
  5. curl_setopt($ch_1, CURLOPT_RETURNTRANSFER, true); 
  6. curl_setopt($ch_2, CURLOPT_RETURNTRANSFER, true); 
  7.  
  8. // build the multi-curl handle, adding both $ch 
  9. $mh = curl_multi_init(); 
  10. curl_multi_add_handle($mh$ch_1); 
  11. curl_multi_add_handle($mh$ch_2); 
  12.  
  13. // execute all queries simultaneously, and continue when all are complete 
  14. $running = null; 
  15. do { 
  16.   curl_multi_exec($mh$running); 
  17.   $ch = curl_multi_select($mh); 
  18.   if($ch !== 0){ 
  19.     $info = curl_multi_info_read($mh); 
  20.     if($info){ 
  21.       var_dump($info); 
  22.       $response_1 = curl_multi_getcontent($info['handle']); 
  23.       echo "$response_1 \n"
  24.       break
  25.     } 
  26.   } 
  27. while ($running > 0); 
  28.  
  29. //close the handles 
  30. curl_multi_remove_handle($mh$ch_1); 
  31. curl_multi_remove_handle($mh$ch_2); 
  32. curl_multi_close($mh); 

这里我设置的是,select得到结果,就退出循环,并且删除 curl resource, 从而达到取消http请求的目的。

2.swoole_client

swoole_client提供了异步模式,我竟然把这个忘了。这里的sleep方法需要swoole版本大于等于1.7.21, 我还没升到这个版本,所以直接exit也可以。

  1. <?php 
  2. $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC); 
  3. //设置事件回调函数 
  4. $client->on("connect"function($cli) { 
  5.   $req = "GET / HTTP/1.1\r\n 
  6.   Host: www.phpfensi.com\r\n 
  7.   Connection: keep-alive\r\n 
  8.   Cache-Control: no-cache\r\n 
  9.   Pragma: no-cache\r\n\r\n"; 
  10.  
  11.   for ($i=0; $i < 3; $i++) { 
  12.     $cli->send($req); 
  13.   } 
  14. }); 
  15. $client->on("receive"function($cli$data){ 
  16.   echo "Received: ".$data."\n"
  17.   exit(0); 
  18.   $cli->sleep(); // swoole >= 1.7.21 
  19. }); 
  20. $client->on("error"function($cli){ 
  21.   echo "Connect failed\n"
  22. }); 
  23. $client->on("close"function($cli){ 
  24.   echo "Connection close\n"
  25. }); 
  26. //发起网络连接 
  27. $client->connect('183.207.95.145', 80, 1); 

3.process

哎,竟然差点忘了 swoole_process, 这里就不用 pcntl 模块了,但是写完发现,这其实也不算是中断请求,而是哪个先到读哪个,忽视后面的返回值。

  1. <?php 
  2.  
  3. $workers = []; 
  4. $worker_num = 3;//创建的进程数 
  5. $finished = false; 
  6. $lock = new swoole_lock(SWOOLE_MUTEX); 
  7.  
  8. for($i=0;$i<$worker_num ; $i++){ 
  9.   $process = new swoole_process('process'); 
  10.   //$process->useQueue(); 
  11.   $pid = $process->start(); 
  12.   $workers[$pid] = $process
  13.  
  14. foreach($workers as $pid => $process){ 
  15.   //子进程也会包含此事件 
  16.   swoole_event_add($process->pipe, function ($pipeuse($process$lock, &$finished) { 
  17.     $lock->lock(); 
  18.     if(!$finished){ 
  19.       $finished = true; 
  20.       $data = $process->read(); 
  21.       echo "RECV: " . $data.PHP_EOL; 
  22.     } 
  23.     $lock->unlock(); 
  24.   }); 
  25.  
  26. function process(swoole_process $process){ 
  27.   $response = 'http response'
  28.   $process->write($response); 
  29.   echo $process->pid,"\t",$process->callback .PHP_EOL; 
  30.  
  31. for($i = 0; $i < $worker_num$i++) { 
  32.   $ret = swoole_process::wait(); 
  33.   $pid = $ret['pid']; 
  34.   echo "Worker Exit, PID=".$pid.PHP_EOL; 

4.pthreads

编译pthreads模块时,提示php编译时必须打开ZTS, 所以貌似必须 thread safe 版本才能使用. wamp中多php正好是TS的,直接下了个dll, 文档中的说明复制到对应目录,就在win下测试了。 还没完全理解,查到文章说 php 的 pthreads 和 POSIX pthreads是完全不一样的,代码有些烂,还需要多看看文档,体会一下。

  1. <?php 
  2. class Foo extends Stackable { 
  3.   public $url
  4.   public $response = null; 
  5.   public function __construct(){ 
  6.     $this->url = 'https://www.phpfensi.com'
  7.   } 
  8.   public function run(){} 
  9.  
  10. class Process extends Worker { 
  11.   private $text = ""
  12.   public function __construct($text,$object){ 
  13.     $this->text = $text
  14.     $this->object = $object
  15.   } 
  16.   public function run(){ 
  17.     while (is_null($this->object->response)){ 
  18.       print " Thread {$this->text} is running\n"
  19.       $this->object->response = 'http response'
  20.       sleep(1); 
  21.     } 
  22.   } 
  23.  
  24. $foo = new Foo(); 
  25.  
  26. $a = new Process("A",$foo); 
  27. $a->start(); 
  28.  
  29. $b = new Process("B",$foo); 
  30. $b->start(); 
  31.  
  32. echo $foo->response; 

5.yield

以同步方式书写异步代码:

  1. <?php  
  2.    
  3. class AsyncServer {  
  4.   protected $handler;  
  5.   protected $socket;  
  6.   protected $tasks = [];  
  7.   protected $timers = [];  
  8.    
  9.   public function __construct(callable $handler) {  
  10.     $this->handler = $handler;  
  11.    
  12.     $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);  
  13.     if(!$this->socket) {  
  14.       die(socket_strerror(socket_last_error())."\n");  
  15.     }  
  16.     if (!socket_set_nonblock($this->socket)) {  
  17.       die(socket_strerror(socket_last_error())."\n");  
  18.     }  
  19.     if(!socket_bind($this->socket, "0.0.0.0", 1234)) {  
  20.       die(socket_strerror(socket_last_error())."\n");  
  21.     }  
  22.   }  
  23.    
  24.   public function Run() {  
  25.     while (true) {  
  26.       $now = microtime(true) * 1000;  
  27.       foreach ($this->timers as $time => $sockets) {  
  28.         if ($time > $nowbreak;  
  29.         foreach ($sockets as $one) {  
  30.           list($socket$coroutine) = $this->tasks[$one];  
  31.           unset($this->tasks[$one]);  
  32.           socket_close($socket);  
  33.           $coroutine->throw(new Exception("Timeout"));  
  34.         }  
  35.         unset($this->timers[$time]);  
  36.       }  
  37.    
  38.       $reads = array($this->socket);  
  39.       foreach ($this->tasks as list($socket)) {  
  40.         $reads[] = $socket;  
  41.       }  
  42.       $writes = NULL;  
  43.       $excepts= NULL;  
  44.       if (!socket_select($reads$writes$excepts, 0, 1000)) {  
  45.         continue;  
  46.       }  
  47.    
  48.       foreach ($reads as $one) {  
  49.         $len = socket_recvfrom($one$data, 65535, 0, $ip$port);  
  50.         if (!$len) {  
  51.           //echo "socket_recvfrom fail.\n";  
  52.           continue;  
  53.         }  
  54.         if ($one == $this->socket) {  
  55.           //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n";  
  56.           $handler = $this->handler;  
  57.           $coroutine = $handler($one$data$len$ip$port);  
  58.           if (!$coroutine) {  
  59.             //echo "[Run]everything is done.\n";  
  60.             continue;  
  61.           }  
  62.           $task = $coroutine->current();  
  63.           //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n";  
  64.           $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);  
  65.           if(!$socket) {  
  66.             //echo socket_strerror(socket_last_error())."\n";  
  67.             $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));  
  68.             continue;  
  69.           }  
  70.           if (!socket_set_nonblock($socket)) {  
  71.             //echo socket_strerror(socket_last_error())."\n";  
  72.             $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error()));  
  73.             continue;  
  74.           }  
  75.           socket_sendto($socket$task->data, $task->len, 0, $task->ip, $task->port);  
  76.           $deadline = $now + $task->timeout;  
  77.           $this->tasks[$socket] = [$socket$coroutine$deadline];  
  78.           $this->timers[$deadline][$socket] = $socket;  
  79.         } else {  
  80.           //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n";  
  81.           list($socket$coroutine$deadline) = $this->tasks[$one];  
  82.           unset($this->tasks[$one]);  
  83.           unset($this->timers[$deadline][$one]);  
  84.           socket_close($socket);  
  85.           $coroutine->send(array($data$len));  
  86.         }  
  87.       }  
  88.     }  
  89.   }  
  90. }  
  91.    
  92. class AsyncTask {  
  93.   public $data;  
  94.   public $len;  
  95.   public $ip;  
  96.   public $port;  
  97.   public $timeout;  
  98.    
  99.   public function __construct($data$len$ip$port$timeout) {  
  100.     $this->data = $data;  
  101.     $this->len = $len;  
  102.     $this->ip = $ip;  
  103.     $this->port = $port;  
  104.     $this->timeout = $timeout;  
  105.   }  
  106. }  
  107.    
  108. function AsyncSendRecv($req_buf$req_len$ip$port$timeout) {  
  109.   return new AsyncTask($req_buf$req_len$ip$port$timeout);  
  110. }  
  111.    
  112. function RequestHandler($socket$req_buf$req_len$ip$port) {  
  113.   //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n";  
  114.   try {  
  115.     list($rsp_buf$rsp_len) = (yield AsyncSendRecv($req_buf$req_len"127.0.0.1", 2345, 3000));  
  116.   } catch (Exception $ex) {  
  117.     $rsp_buf = $ex->getMessage();  
  118.     $rsp_len = strlen($rsp_buf);  
  119.     //echo "[Exception]$rsp_buf\n";  
  120.   }  
  121.   //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n";  
  122.   socket_sendto($socket$rsp_buf$rsp_len, 0, $ip$port);  
  123. }  
  124.    
  125. $server = new AsyncServer(RequestHandler);  
  126. $server->Run();  
  127.    
  128. ?>  

代码解读:

借助PHP内置array能力,实现简单的“超时管理”,以毫秒为精度作为时间分片;

封装AsyncSendRecv接口,调用形如yield AsyncSendRecv(),更加自然;

添加Exception作为错误处理机制,添加ret_code亦可,仅为展示之用。

Tags: PHP程序并发

分享到: