当前位置:首页 > PHP教程 > php高级应用 > 列表

php使用Swoole实现毫秒级定时任务的方法

发布:smiling 来源: PHP粉丝网  添加日期:2022-03-25 11:46:36 浏览: 评论:0 

这篇文章主要介绍了php使用Swoole实现毫秒级定时任务的方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下。

项目开发中,如果有定时任务的业务要求,我们会使用linux的crontab来解决,但是它的最小粒度是分钟级别,如果要求粒度是秒级别的,甚至毫秒级别的,crontab就无法满足,值得庆幸的是swoole提供的强大的毫秒定时器。

应用场景举例

我们可能会遇到这样的场景:

场景一:每隔30秒获取一次本机内存使用率

场景二:2分钟后执行报表发送任务

场景三:每天凌晨2点钟定时请求第三方接口,如果接口有数据返回则停止任务,如果接口由于某种原因没有响应或者没有数据返回则5分钟后继续尝试请求该接口,尝试5次后仍然失败则停止该任务

以上的三个场景我们都可以归纳为定时任务的范畴。

Swoole毫秒定时器

Swoole提供了异步毫秒定时器函数:

swoole_timer_tick(int $msec, callable $callback):设置一个间隔时钟定时器,每隔$msec毫秒执行一次$callback,类似于javascript中的setInterval()。

swoole_timer_after(int $after_time_ms, mixed $callback_function):在指定的时间$after_time_ms后执行$callback_function,类似于javascript的setTimeout()。

swoole_timer_clear(int $timer_id):删除指定id的定时器,类似于javascript的clearInterval()。

解决方案

对于场景一,经常用在系统检测统计方面,实时性要求比较高,但又能控制好频率,多用于后台服务器性能监控,可以生成可视化图表,可以是30秒获取一次内存使用率,也可以是10秒,而crontab最小粒度只能设置为1分钟。

  1. swoole_timer_tick(30000, function($timeruse ($task_id) { // 启用定时器,每30秒执行一次  
  2.   $memPercent = $this->getMemoryUsage(); //计算内存使用率  
  3.   echo date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent."\n";  
  4. }); 

对于场景二,直接定义xx时间后执行某项任务的话,貌似crontab比较困难,而使用swoole的swoole_timer_after可以实现:

  1. swoole_timer_after(120000, function() use ($str) { //2分钟后执行  
  2.   $this->sendReport(); //发送报表  
  3.   echo "send report, $str\n";  
  4. }); 

对于场景三,用来作尝试请求,请求失败后继续,如果成功则停止请求,用crontab也能解决,但是比较傻,比如设置每隔5分钟请求一次,不管成功会失败都会去执行一次,而用swoole定时器则智能多了。

  1. swoole_timer_tick(5*60*1000, function($timeruse ($url) { // 启用定时器,每5分钟执行一次  
  2.    $rs = $this->postUrl($url);  
  3.  
  4.    if ($rs) {  
  5.      //业务代码...  
  6.      swoole_timer_clear($timer); // 停止定时器  
  7.      echo date('Y-m-d H:i:s'). "请求接口任务执行成功\n";  
  8.    } else {  
  9.      echo date('Y-m-d H:i:s'). "请求接口失败,5分钟后再次尝试\n";  
  10.    }  
  11.  }); 

示例代码

新建文件\src\App\Task.php:

  1. namespace Helloweba\Swoole;  
  2.  
  3. use swoole_server;  
  4.  
  5. /**  
  6. * 任务调度  
  7. */ 
  8. class Task  
  9. {  
  10.   protected $serv;  
  11.   protected $host = '127.0.0.1';  
  12.   protected $port = 9506;  
  13.   // 进程名称  
  14.   protected $taskName = 'swooleTask';  
  15.   // PID路径  
  16.   protected $pidPath = '/run/swooletask.pid';  
  17.   // 设置运行时参数  
  18.   protected $options = [  
  19.     'worker_num' => 4, //worker进程数,一般设置为CPU数的1-4倍   
  20.     'daemonize' => true, //启用守护进程  
  21.     'log_file' => '/data/log/swoole-task.log'//指定swoole错误日志文件  
  22.     'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR  
  23.     'dispatch_mode' => 1, //数据包分发策略,1-轮询模式  
  24.     'task_worker_num' => 4, //task进程的数量  
  25.     'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式  
  26.   ];  
  27.  
  28.   public function __construct($options = [])  
  29.   {  
  30.     date_default_timezone_set('PRC');  
  31.     // 构建Server对象,监听127.0.0.1:9506端口  
  32.     $this->serv = new swoole_server($this->host, $this->port);  
  33.  
  34.     if (!emptyempty($options)) {  
  35.       $this->options = array_merge($this->options, $options);  
  36.     }  
  37.     $this->serv->set($this->options);  
  38.  
  39.     // 注册事件  
  40.     $this->serv->on('Start', [$this'onStart']);  
  41.     $this->serv->on('Connect', [$this'onConnect']);  
  42.     $this->serv->on('Receive', [$this'onReceive']);  
  43.     $this->serv->on('Task', [$this'onTask']);   
  44.     $this->serv->on('Finish', [$this'onFinish']);  
  45.     $this->serv->on('Close', [$this'onClose']);  
  46.   }  
  47.  
  48.   public function start()  
  49.   {  
  50.     // Run worker  
  51.     $this->serv->start();  
  52.   }  
  53.  
  54.   public function onStart($serv)  
  55.  
  56.   {  
  57.     // 设置进程名  
  58.     cli_set_process_title($this->taskName);  
  59.     //记录进程id,脚本实现自动重启  
  60.     $pid = "{$serv->master_pid}\\n{$serv->manager_pid}";  
  61.     file_put_contents($this->pidPath, $pid);  
  62.   }  
  63.  
  64.   //监听连接进入事件  
  65.   public function onConnect($serv$fd$from_id)  
  66.   {  
  67.     $serv->send( $fd"Hello {$fd}!" );  
  68.   }  
  69.  
  70.   // 监听数据接收事件  
  71.   public function onReceive(swoole_server $serv$fd$from_id$data)  
  72.   {  
  73.     echo "Get Message From Client {$fd}:{$data}\n";  
  74.     //$this->writeLog('接收客户端参数:'.$fd .'-'.$data);  
  75.     $res['result'] = 'success';  
  76.     $serv->send($fd, json_encode($res)); // 同步返回消息给客户端  
  77.     $serv->task($data); // 执行异步任务  
  78.   }  
  79.  
  80.   /**  
  81.    
  82.   * @param $serv swoole_server swoole_server对象  
  83.   * @param $task_id int 任务id  
  84.   * @param $from\id int 投递任务的worker_id  
  85.   * @param $data string 投递的数据  
  86.   */ 
  87.   public function onTask(swoole_server $serv$task_id$from_id$data)  
  88.   {  
  89.     swoole_timer_tick(30000, function($timeruse ($task_id) { // 启用定时器,每30秒执行一次  
  90.       $memPercent = $this->getMemoryUsage();  
  91.       echo date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent."\n";  
  92.     });  
  93.   }  
  94.  
  95.  
  96.   /**  
  97.   * @param $serv swoole_server swoole_server对象  
  98.   * @param $task_id int 任务id  
  99.   * @param $data string 任务返回的数据  
  100.   */ 
  101.   public function onFinish(swoole_server $serv$task_id$data)  
  102.   {  
  103.     //  
  104.   }  
  105.  
  106.    
  107.   // 监听连接关闭事件  
  108.   public function onClose($serv$fd$from_id) {  
  109.     echo "Client {$fd} close connection\n";  
  110.   }  
  111.  
  112.   public function stop()  
  113.   {  
  114.     $this->serv->stop();  
  115.   }  
  116.  
  117.   private function getMemoryUsage()  
  118.   {  
  119.     // MEMORY  
  120.     if (false === ($str = @file("/proc/meminfo"))) return false;  
  121.     $str = implode(""$str);  
  122.     preg_match_all("/MemTotal\s{0,}\:+\s{0,}([\d\.]+).+?MemFree\s{0,}\:+\s{0,}([\d\.]+).+?Cached\s{0,}\:+\s{0,}([\d\.]+).+?SwapTotal\s{0,}\:+\s{0,}([\d\.]+).+?SwapFree\s{0,}\:+\s{0,}([\d\.]+)/s"$str$buf);  
  123.     //preg_match_all("/Buffers\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buffers);  
  124.  
  125.     $memTotal = round($buf[1][0]/1024, 2);  
  126.     $memFree = round($buf[2][0]/1024, 2);  
  127.     $memUsed = $memTotal - $memFree;  
  128.     $memPercent = (floatval($memTotal)!=0) ? round($memUsed/$memTotal*100,2):0;  
  129.  
  130.     return $memPercent;  
  131.   }  

我们以场景一为例,在onTask启用定时任务,每隔30秒计算一次内存使用率,实际应用中可以把计算好的内存按时间写入数据库等存储中,然后可以根据前端需求用来渲染成统计图表,如:

php使用Swoole实现毫秒级定时任务的方法

接着服务端代码 public\taskServer.php :

  1. <?php 
  2. require dirname(__DIR__) . '/vendor/autoload.php';  
  3. use Helloweba\Swoole\Task;  
  4. $opt = [  
  5.   'daemonize' => false  
  6. ];  
  7. $ser = new Task($opt);  
  8. $ser->start(); 

客户端代码 public\taskClient.php :

  1. <?php 
  2. class Client  
  3. {  
  4.   private $client;  
  5.   public function __construct() {  
  6.     $this->client = new swoole_client(SWOOLE_SOCK_TCP);  
  7.   }  
  8.   public function connect() {  
  9.     if( !$this->client->connect("127.0.0.1", 9506 , 1) ) {  
  10.       echo "Error: {$this->client->errMsg}[{$this->client->errCode}]\n";  
  11.      }  
  12.     fwrite(STDOUT, "请输入消息 Please input msg:");  
  13.     $msg = trim(fgets(STDIN));  
  14.     $this->client->send( $msg );  
  15.     $message = $this->client->recv();  
  16.     echo "Get Message From Server:{$message}\n";  
  17.   }  
  18. }  
  19. $client = new Client();  
  20. $client->connect(); 

验证效果

1.启动服务端:

php taskServer.php

2.客户端输入:

另开命令行窗口,执行

[root@localhost public]# php taskClient.php

请输入消息 Please input msg:hello

Get Message From Server:{"result":"success"}

[root@localhost public]#

3.服务端返回:

php使用Swoole实现毫秒级定时任务的方法

如果返回上图中的结果,则定时任务正常运行,我们会发现每隔30秒会输出一条信息。

Tags: Swoole php毫秒级定时任务

分享到: