当前位置:首页 > PHP教程 > php应用 > 列表

PHP 简单实现延时操作

发布:smiling 来源: PHP粉丝网  添加日期:2022-07-04 09:17:16 浏览: 评论:0 

场景:在业务中有时会碰到延迟操作,如下单后半小时未支付则取消订单、下单后十五分钟未支付则发短信提醒等等。那这样的需求如何去实现呢。

实现方式

第一个简单的方式就是用一个后台进程死循环去查订单,根据下单时间去做不同的操作

第二种就是使用消息队列的定时消息,下单之后发送定时消息,不同的定时队列去处理不同的逻辑

第三种可以使用框架提供的一些既有功能去做

实现代码

我们以订单创建15分钟后未支付,给用户发送邮件为场景进行学习

准备工作:

简单的订单表:order

各种需要的composer包

rabbitMq本地服务

开通阿里云RocketMq服务

第一种

代码逻辑很简单就直接死循环就行了

启动这个脚本进程,可以用supervisor配置

部分代码

  1. //创建订单的逻辑/** 
  2.  
  3.  * 随机创建订单 
  4.  
  5.  */$order = [ 
  6.  
  7.     'order_number' => mt_rand(100,10000).date("YmdHis"), 
  8.  
  9.     'user_id' => mt_rand(1, 100), 
  10.  
  11.     'order_amount' => mt_rand(100, 1000),]; 
  12.  
  13.     /**@var $manager Illuminate\Database\Capsule\Manager **/ 
  14.  
  15.     $conn = $manager;$insertResult = $conn::table("order"
  16.  
  17.     ->insert($order);print_r($insertResult); 

延迟处理逻辑

  1. while(true) { 
  2.  
  3.     // 未支付订单列表 
  4.  
  5.     $orderList = $conn::table("order"
  6.  
  7.         ->where("created_time",  '<='date("Y-m-d H:i:s"strtotime("-15 minutes"))) 
  8.  
  9.         ->where('sended_need_pay_notify''=', 2) 
  10.  
  11.         ->where('status''=', 1) 
  12.  
  13.         ->select(['user_id''id']) 
  14.  
  15.         ->orderBy("id"'asc'
  16.  
  17.         ->get(); 
  18.  
  19.     $orderList = json_decode(json_encode($orderList), true); 
  20.  
  21.     foreach ($orderList as $orderInfo) { 
  22.  
  23.         sendEmail($orderInfo['user_id']); 
  24.  
  25.         $conn::table('order'
  26.  
  27.             ->where('id''='$orderInfo['id']) 
  28.  
  29.             ->update(['sended_need_pay_notify' => 1]); 
  30.  
  31.         logs("update-success-orderId-"$orderInfo['id']."-userId-".$orderInfo['user_id']); 
  32.  
  33.     } 
  34.  
  35.     sleep(10);} 
执行处理脚本

gaoz@nobodyMBP delay_mq_demo % php first_while_handler.php

send email to 73 success ...

2020-06-24 11:37:36:update-success-orderId-3-userId-73

这种方式吧实现简单,但是不优雅,同时大批量订单产生也会遇到问题。

第二种

比如使用阿里云的MQ服务,目前rocketMq与rabbitMq版本支持延迟消息,但是rabbit的延时消息收费太高了

这里先使用rocketMq的延迟消息去实现

需要开通阿里云的服务

  1. // 创建订单的逻辑try 
  2.  
  3.         { 
  4.  
  5.  
  6.  
  7.             /** 
  8.  
  9.              * 随机创建订单 
  10.  
  11.              */ 
  12.  
  13.             $order = [ 
  14.  
  15.                 'order_number' => mt_rand(100,10000).date("YmdHis"), 
  16.  
  17.                 'user_id' => mt_rand(1100), 
  18.  
  19.                 'order_amount' => mt_rand(1001000), 
  20.  
  21.             ]; 
  22.  
  23.  
  24.  
  25.             /**@var $manager Illuminate\Database\Capsule\Manager **/ 
  26.  
  27.             $conn = $manager; 
  28.  
  29.  
  30.  
  31.             $insertId = $conn::table("order"
  32.  
  33.                 ->insertGetId($order); 
  34.  
  35.  
  36.  
  37.             $body = json_encode(['order_id' => $insertId, 'created_time' => date("Y-m-d H:i:s")]); 
  38.  
  39.             $publishMessage = new TopicMessage( 
  40.  
  41.                 $body            ); 
  42.  
  43.             // 设置消息KEY 
  44.  
  45.             $publishMessage->setMessageKey("MessageKey"); 
  46.  
  47.  
  48.  
  49.             // 定时消息, 定时时间为3分钟后 
  50.  
  51.             $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000); 
  52.  
  53.  
  54.  
  55.             $result = $this->producer->publishMessage($publishMessage); 
  56.  
  57.  
  58.  
  59.             print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result 
  60.  
  61.             - 
  62.  
  63.             >getMessageBodyMD5() . "\n"
  64.  
  65.         } catch (\Exception $e) { 
  66.  
  67.             print_r($e->getMessage() . "\n"); 
  68.  
  69.         } 

消费逻辑 同样是在消费者中处理

  1. foreach ($messages as $message) { 
  2.  
  3.                 $receiptHandles[] = $message->getReceiptHandle(); 
  4.  
  5.  
  6.  
  7.                 $messageBody = $message->getMessageBody(); 
  8.  
  9.  
  10.  
  11.                 $orderInfo = json_decode($messageBody, true); 
  12.  
  13.                 if (!emptyempty($orderInfo['order_id'])) { 
  14.  
  15.                     $orderId = $orderInfo['order_id']; 
  16.  
  17.  
  18.  
  19.                     /**@var $manager Illuminate\Database\Capsule\Manager * */ 
  20.  
  21.                     $conn = $manager
  22.  
  23.                     $orderInfo = $conn::table("order"
  24.  
  25.                         ->select(['id''user_id']) 
  26.  
  27.                         ->where('id''='$orderId
  28.  
  29.                         ->where('status''=', 1) 
  30.  
  31.                         ->first(); 
  32.  
  33.                     if (!emptyempty($orderInfo)) { 
  34.  
  35.                         $orderInfo = json_decode(json_encode($orderInfo), true); 
  36.  
  37.                         sendEmail($orderInfo['user_id']); 
  38.  
  39.                         $conn::table('order'
  40.  
  41.                             ->where('id''='$orderInfo['id']) 
  42.  
  43.                             ->update(['sended_need_pay_notify' => 1]); 
  44.  
  45.                         logs("update-success-orderId-" . $orderInfo['id'] .  
  46.  
  47.                         "-userId-" . $orderInfo['user_id']); 
  48.  
  49.                     } 
  50.  
  51.                 } 
  52.  
  53.             } 

启动生产一条消息

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php

Send mq message success. msgId is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5

is:63448B50AA7B8AF47B07AA7CE807E3D3

gaoz@nobodyMBP delay_mq_demo %

启动消费者慢慢等待

gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php

No message, contine long polling!RequestId:5EF752583441411C74869BA9

No message, contine long polling!RequestId:5EF7525B3441411C74869FE2

No message, contine long polling!RequestId:5EF7525E3441411C7486A42C

No message, contine long polling!RequestId:5EF752613441411C7486A7D9

consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95

Array(

 
    [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw)

ack

这种方式有现有的服务可以使用,减少开发时间

第三种 使用rabbitMq去实现

查阅文档没有找到rabbitMq支持延迟队列的原生功能,但是可以通过消息的ttl+死信队列实现

私信队列就是用来存放没有被消费或者消费失败等消息的队列

当设置消息的有效期内没有被消费消息就会被转发到死信队列

通过设置消息的有效期实现延时功能

  1. // 生产者$exchange = 'order15min_notify_exchange'; 
  2.  
  3. $queue = 'order15minx_notify_queue';$dlxExchange = "dlx_order15min_exchange"
  4.  
  5. $dlxQueue = "dlx_order15min_queue"
  6.  
  7. $connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST")); 
  8.  
  9. $channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); 
  10.  
  11. $channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置队列的过期时间// 正常队列$table = new \PhpAmqpLib\Wire\AMQPTable();// 消息有效期$table->set('x-message-ttl', 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信队列$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/** 
  12.  
  13.  * 随机创建订单 
  14.  
  15.  */$order = [ 
  16.  
  17.     'order_number' => mt_rand(100,10000).date("YmdHis"), 
  18.  
  19.     'user_id' => mt_rand(1, 100), 
  20.  
  21.     'order_amount' => mt_rand(100, 1000),];/**@var $manager Illuminate\Database\Capsule\Manager **/$conn = $manager;$insertId = $conn::table("order"
  22.  
  23.     ->insertGetId($order);$messageBody = json_encode(['order_id' => $insertId'created_time' => date("Y-m-d H:i:s")]); 
  24.  
  25.     $message = new AMQPMessage($messageBodyarray('content_type' => 'text/plain''delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); 
  26.  
  27.     $channel->basic_publish($message$exchange); 

消费者

  1. $dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue"
  2.  
  3. $connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST")); 
  4.  
  5. $channel = $connection->channel(); 
  6.  
  7. $channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false); 
  8.  
  9. $channel->queue_bind($dlxQueue$dlxExchange);/** 
  10.  
  11.  * @param \PhpAmqpLib\Message\AMQPMessage $message 
  12.  
  13.  */function process_message($message){ 
  14.  
  15.     echo "\n--------\n"
  16.  
  17.     echo $message->body; 
  18.  
  19.     echo "\n--------\n"
  20.  
  21.  
  22.  
  23.     $orderInfo = json_decode($message->body, true); 
  24.  
  25.     if (!emptyempty($orderInfo['order_id'])) { 
  26.  
  27.         $orderId = $orderInfo['order_id']; 
  28.  
  29.  
  30.  
  31.         /**@var $conn Illuminate\Database\Capsule\Manager * */ 
  32.  
  33.         $conn = getdb(); 
  34.  
  35.         $orderInfo = $conn::table("order"
  36.  
  37.             ->select(['id''user_id']) 
  38.  
  39.             ->where('id''='$orderId
  40.  
  41.             ->where('status''=', 1) 
  42.  
  43.             ->first(); 
  44.  
  45.         if (!emptyempty($orderInfo)) { 
  46.  
  47.             $orderInfo = json_decode(json_encode($orderInfo), true); 
  48.  
  49.             sendEmail($orderInfo['user_id']); 
  50.  
  51.             $conn::table('order'
  52.  
  53.                 ->where('id''='$orderInfo['id']) 
  54.  
  55.                 ->update(['sended_need_pay_notify' => 1]); 
  56.  
  57.             logs("update-success-orderId-" . $orderInfo['id'] . "-userId-" . $orderInfo['user_id']); 
  58.  
  59.         } 
  60.  
  61.  
  62.  
  63.     } 
  64.  
  65.     $message->delivery_info['channel']->basic_ack( 
  66.  
  67.         $message->delivery_info['delivery_tag']);}$channel->basic_consume($dlxQueue$consumerTag, false, false, false, false, 'process_message'); 

启动消费者

  1. gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php 
  2.  
  3. -------- 
  4.  
  5. {"order_id":7,"created_time":"2020-06-27 11:50:08"
  6.  
  7. -------- 
  8.  
  9. send email to 2 success ... 
  10.  
  11. 2020-06-27 11:56:55:update-success-orderId-7-userId-2 

分别启动消费者、生产者就可以了,这里面消息的流转可以看到

PHP 简单实现延时操作

消息先进入到正常队列,过期后进入了死信队列而被消费

第四种

使用laravel自带的Queue去实现

这里没有整理详细代码,后面更新出来

可以查看官方文档 队列《Laravel 5.7 中文文档》

代码示例:github.com/nobody05/delay_mq_demo

Tags: PHP延时操作

分享到: