当前位置:首页 > PHP教程 > php高级应用 > 列表

PHP使用enqueue/amqp-lib实现rabbitmq任务处理

发布:smiling 来源: PHP粉丝网  添加日期:2024-04-28 11:24:09 浏览: 评论:0 

这篇文章主要为大家详细介绍了PHP如何使用enqueue/amqp-lib实现rabbitmq任务处理,文中的示例代码讲解详细,感兴趣的小伙伴可以学习一下。

一:拓展安装

composer require enqueue/amqp-lib

文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md

二:方法介绍

1:连接rabbitmq

  1. $factory = new AmqpConnectionFactory([ 
  2.     'host' => '192.168.6.88',//host 
  3.     'port' => '5672',//端口 
  4.     'vhost' => '/',//虚拟主机 
  5.     'user' => 'admin',//账号 
  6.     'pass' => 'admin',//密码 
  7. ]); 
  8. $context = $factory->createContext(); 

2:声明主题

  1. //声明并创建主题 
  2. $exchangeName = 'exchange'
  3. $fooTopic = $context->createTopic($exchangeName); 
  4. $fooTopic->setType(AmqpTopic::TYPE_FANOUT); 
  5. $context->declareTopic($fooTopic); 
  6.    
  7. //删除主题 
  8. $context->deleteTopic($fooTopic); 

3:声明队列

  1. //声明并创建队列 
  2. $queueName = 'rabbitmq'
  3. $fooQueue = $context->createQueue($queueName); 
  4. $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); 
  5. $context->declareQueue($fooQueue); 
  6.    
  7. //删除队列 
  8. $context->deleteQueue($fooQueue); 

4:将队列绑定到主题

$context->bind(new AmqpBind($fooTopic, $fooQueue));

5:发送消息

  1. //向队列发送消息 
  2. $message = $context->createMessage('Hello world!'); 
  3. $context->createProducer()->send($fooQueue$message); 
  4.    
  5. //向队列发送优先消息 
  6. $queueName = 'rabbitmq'
  7. $fooQueue = $context->createQueue(queueName); 
  8. $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); 
  9. //设置队列的最大优先级 
  10. $fooQueue->setArguments(['x-max-priority' => 10]); 
  11. $context->declareQueue($fooQueue); 
  12.    
  13. $message = $context->createMessage('Hello world!'); 
  14.    
  15. $context->createProducer() 
  16.     ->setPriority(5) //设置优先级,优先级越高,消息越快到达消费者 
  17.     ->send($fooQueue$message); 
  18.    
  19. //向队列发送延时消息 
  20. $message = $context->createMessage('Hello world!'); 
  21.    
  22. $context->createProducer() 
  23.     ->setDelayStrategy(new RabbitMqDlxDelayStrategy()) 
  24.     ->setDeliveryDelay(5000) //消息延时5秒 
  25.     ->send($fooQueue$message); 

6:消费消息【接收消息】

  1. //消费消息 
  2. $consumer = $context->createConsumer($fooQueue); 
  3.    
  4. $message = $consumer->receive(); 
  5.    
  6. // process a message 
  7. //业务代码 
  8.    
  9. $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务 
  10. // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务 
  11.    
  12.    
  13. //订阅消费者 
  14. $fooConsumer = $context->createConsumer($fooQueue); 
  15.    
  16. $subscriptionConsumer = $context->createSubscriptionConsumer(); 
  17. $subscriptionConsumer->subscribe($fooConsumerfunction(Message $message, Consumer $consumer) { 
  18.     // process message 
  19.     //业务代码 
  20.     $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务 
  21.     // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务 
  22.    
  23.     return true; 
  24. }); 
  25. $subscriptionConsumer->consume(); 
  26.    
  27. //清除队列消息 
  28. $queueName = 'rabbitmq'
  29. $queue = $context->createQueue($queueName); 
  30. $context->purgeQueue($queue); 

三:简单实现 

1:发送消息

  1. //连接rabbitmq 
  2. $factory = new AmqpConnectionFactory([ 
  3.     'host' => '192.168.6.88'
  4.     'port' => '5672'
  5.     'vhost' => '/'
  6.     'user' => 'admin'
  7.     'pass' => 'admin'
  8.     'persisted' => false, 
  9. ]); 
  10.    
  11. $context = $factory->createContext(); 
  12. //声明主题 
  13. $exchangeName = 'exchange'
  14. $fooTopic = $context->createTopic($exchangeName); 
  15. $fooTopic->setType(AmqpTopic::TYPE_FANOUT); 
  16. $context->declareTopic($fooTopic); 
  17.    
  18. //声明队列 
  19. $queueName = 'rabbitmq'
  20. $fooQueue = $context->createQueue($queueName); 
  21. $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); 
  22. $context->declareQueue($fooQueue); 
  23.    
  24. //将队列绑定到主题 
  25. $context->bind(new AmqpBind($fooTopic$fooQueue)); 
  26.    
  27. //发送消息到队列 
  28. $message = $context->createMessage('Hello world!'); 
  29.    
  30. $context->createProducer()->send($fooQueue$message); 

2:消费消息

  1. $factory = new AmqpConnectionFactory([ 
  2.     'host' => '192.168.6.88'
  3.     'port' => '5672'
  4.     'vhost' => '/'
  5.     'user' => 'admin'
  6.     'pass' => 'admin'
  7.     'persisted' => false, 
  8. ]); 
  9. $context = $factory->createContext(); 
  10.    
  11.    
  12. $queueName = 'rabbitmq'
  13. $fooQueue = $context->createQueue($queueName); 
  14.    
  15.    
  16.    
  17. $fooConsumer = $context->createConsumer($fooQueue); 
  18.    
  19. $subscriptionConsumer = $context->createSubscriptionConsumer(); 
  20. $subscriptionConsumer->subscribe($fooConsumerfunction(Message $message, Consumer $consumer) { 
  21.     // process message 
  22.     //业务代码 
  23.     $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务 
  24.     // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务 
  25.    
  26.     return true; 
  27. }); 
  28. $subscriptionConsumer->consume();

Tags: enqueue amqp-lib rabbitmq

分享到: