当前位置:首页 > CMS教程 > 其它CMS > 列表

Laravel中Kafka的使用详解

发布:smiling 来源: PHP粉丝网  添加日期:2022-04-20 11:48:21 浏览: 评论:0 

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类。

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

  1. <?php 
  2. namespace App\Tools; 
  3.    
  4. use Illuminate\Config\Repository; 
  5.    
  6. use Illuminate\Support\Facades\DB; 
  7. use Monolog\Logger; 
  8. use Monolog\Handler\StreamHandler; 
  9.    
  10. use Illuminate\Http\Request; 
  11.    
  12. class Kafka 
  13.   public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka 
  14.   public $topic = 'test';//管道名称 
  15.   public $partition = 0; 
  16.    
  17.   protected $producer = null; 
  18.   protected $consumer = null; 
  19.    
  20.   public function __construct() 
  21.   { 
  22.     if (emptyempty($this->broker_list)) { 
  23.       throw new InvalidConfigException("broker not config"); 
  24.     } 
  25.     $rk = new \RdKafka\Producer(); 
  26.     if (emptyempty($rk)) { 
  27.       throw new InvalidConfigException("producer error"); 
  28.     } 
  29.     $rk->setLogLevel(LOG_DEBUG); 
  30.     if (!$rk->addBrokers($this->broker_list)) { 
  31.       throw new InvalidConfigException("producer error"); 
  32.     } 
  33.     $this->producer = $rk
  34.   } 
  35.    
  36.   /** 
  37.    * 生产者 
  38.    * @param array $messages 
  39.    * @return mixed 
  40.    */ 
  41.   public function send($messages = [],$topic
  42.   { 
  43.     $topic = $this->producer->newTopic($topic); 
  44.     return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages)); 
  45.   } 
  46.    
  47.   /** 
  48.    * 消费者 
  49.    */ 
  50.   public function consumer($object$callback){ 
  51.     $conf = new \RdKafka\Conf(); 
  52.     $conf->set('group.id', 0); 
  53.     $conf->set('metadata.broker.list'$this->broker_list); 
  54.    
  55.     $topicConf = new \RdKafka\TopicConf(); 
  56.     $topicConf->set('auto.offset.reset''smallest'); 
  57.    
  58.     $conf->setDefaultTopicConf($topicConf); 
  59.    
  60.     $consumer = new \RdKafka\KafkaConsumer($conf); 
  61.    
  62.     $consumer->subscribe([$this->topic]); 
  63.    
  64.     echo "waiting for messages.....\n"
  65.     while(true) { 
  66.       $message = $consumer->consume(120*1000); 
  67.       switch ($message->err) { 
  68.         case RD_KAFKA_RESP_ERR_NO_ERROR: 
  69.           echo "message payload...."
  70.           $object->$callback($message->payload); 
  71.           break
  72.       } 
  73.       sleep(1); 
  74.     } 
  75.   } 
  76. ?> 

在控制器中如何使用:

首先再头部导入这个类:use App\Tools\Kafka;

下面是使用生产者实例:

  1. public function test(){ 
  2.    
  3.    $topic = 'tool';//输入使用管道名称 
  4.    $data['shop_id'] = 58; 
  5.    $data['bar_code']=586; 
  6.    $data['goods_num'] = 1; 
  7.    $data['goods_unit'] = '个'
  8.    
  9. $Kafka = new Kafka(); 
  10. $Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json 
  11. var_dump($Error_Msg); 
  12.    
  13.    
  14.   } 

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

  1. <?php 
  2.    
  3. $conf = new RdKafka\Conf(); 
  4.    
  5. $conf->set('group.id''myConsumerGroup'); 
  6.    
  7. $rk = new RdKafka\Consumer($conf); 
  8. $rk->addBrokers("localhost:9092"); 
  9.    
  10. $topicConf = new RdKafka\TopicConf(); 
  11. $topicConf->set('auto.commit.interval.ms', 100); 
  12. $topicConf->set('offset.store.method''file'); 
  13. $topicConf->set('offset.store.path', sys_get_temp_dir()); 
  14. $topicConf->set('auto.offset.reset''smallest'); 
  15.    
  16. $topic = $rk->newTopic("tool"$topicConf);//读取的管道 
  17.    
  18. // Start consuming partition 0 
  19. $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); 
  20.    
  21. while (true) { 
  22.   $message = $topic->consume(0, 120*10000); 
  23.   switch ($message->err) { 
  24.     case RD_KAFKA_RESP_ERR_NO_ERROR: 
  25.     //没有错误打印信息 
  26.       $message = json_decode(json_encode($message),true); 
  27.       $data = json_decode($message['payload'],true); 
  28.       var_dump($data); 
  29.       break
  30.     case RD_KAFKA_RESP_ERR__PARTITION_EOF: 
  31.       echo "等待接收信息\n"
  32.       break
  33.     case RD_KAFKA_RESP_ERR__TIMED_OUT: 
  34.       echo "超时\n"
  35.       break
  36.     default
  37.       throw new \Exception($message->errstr(), $message->err); 
  38.       break
  39.   } 
  40.  sleep(1); 
  41.    
  42. ?>

Tags: Kafka

分享到: