Laravel中Kafka的使用详解
发布:smiling 来源: PHP粉丝网 添加日期:2022-04-20 11:48:21 浏览: 评论:0
本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类。
以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php
- <?php
 - namespace App\Tools;
 - use Illuminate\Config\Repository;
 - use Illuminate\Support\Facades\DB;
 - use Monolog\Logger;
 - use Monolog\Handler\StreamHandler;
 - use Illuminate\Http\Request;
 - class Kafka
 - {
 - public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka
 - public $topic = 'test';//管道名称
 - public $partition = 0;
 - protected $producer = null;
 - protected $consumer = null;
 - public function __construct()
 - {
 - if (emptyempty($this->broker_list)) {
 - throw new InvalidConfigException("broker not config");
 - }
 - $rk = new \RdKafka\Producer();
 - if (emptyempty($rk)) {
 - throw new InvalidConfigException("producer error");
 - }
 - $rk->setLogLevel(LOG_DEBUG);
 - if (!$rk->addBrokers($this->broker_list)) {
 - throw new InvalidConfigException("producer error");
 - }
 - $this->producer = $rk;
 - }
 - /**
 - * 生产者
 - * @param array $messages
 - * @return mixed
 - */
 - public function send($messages = [],$topic)
 - {
 - $topic = $this->producer->newTopic($topic);
 - return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
 - }
 - /**
 - * 消费者
 - */
 - public function consumer($object, $callback){
 - $conf = new \RdKafka\Conf();
 - $conf->set('group.id', 0);
 - $conf->set('metadata.broker.list', $this->broker_list);
 - $topicConf = new \RdKafka\TopicConf();
 - $topicConf->set('auto.offset.reset', 'smallest');
 - $conf->setDefaultTopicConf($topicConf);
 - $consumer = new \RdKafka\KafkaConsumer($conf);
 - $consumer->subscribe([$this->topic]);
 - echo "waiting for messages.....\n";
 - while(true) {
 - $message = $consumer->consume(120*1000);
 - switch ($message->err) {
 - case RD_KAFKA_RESP_ERR_NO_ERROR:
 - echo "message payload....";
 - $object->$callback($message->payload);
 - break;
 - }
 - sleep(1);
 - }
 - }
 - }
 - ?>
 
在控制器中如何使用:
首先再头部导入这个类:use App\Tools\Kafka;
下面是使用生产者实例:
- public function test(){
 - $topic = 'tool';//输入使用管道名称
 - $data['shop_id'] = 58;
 - $data['bar_code']=586;
 - $data['goods_num'] = 1;
 - $data['goods_unit'] = '个';
 - $Kafka = new Kafka();
 - $Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json
 - var_dump($Error_Msg);
 - }
 
下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:
- <?php
 - $conf = new RdKafka\Conf();
 - $conf->set('group.id', 'myConsumerGroup');
 - $rk = new RdKafka\Consumer($conf);
 - $rk->addBrokers("localhost:9092");
 - $topicConf = new RdKafka\TopicConf();
 - $topicConf->set('auto.commit.interval.ms', 100);
 - $topicConf->set('offset.store.method', 'file');
 - $topicConf->set('offset.store.path', sys_get_temp_dir());
 - $topicConf->set('auto.offset.reset', 'smallest');
 - $topic = $rk->newTopic("tool", $topicConf);//读取的管道
 - // Start consuming partition 0
 - $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
 - while (true) {
 - $message = $topic->consume(0, 120*10000);
 - switch ($message->err) {
 - case RD_KAFKA_RESP_ERR_NO_ERROR:
 - //没有错误打印信息
 - $message = json_decode(json_encode($message),true);
 - $data = json_decode($message['payload'],true);
 - var_dump($data);
 - break;
 - case RD_KAFKA_RESP_ERR__PARTITION_EOF:
 - echo "等待接收信息\n";
 - break;
 - case RD_KAFKA_RESP_ERR__TIMED_OUT:
 - echo "超时\n";
 - break;
 - default:
 - throw new \Exception($message->errstr(), $message->err);
 - break;
 - }
 - sleep(1);
 - }
 - ?>
 
Tags: Kafka
- 上一篇:YII2 全局异常处理深入讲解
 - 下一篇:最后一页
 
推荐文章
热门文章
最新评论文章
- 写给考虑创业的年轻程序员(10)
 - PHP新手上路(一)(7)
 - 惹恼程序员的十件事(5)
 - PHP邮件发送例子,已测试成功(5)
 - 致初学者:PHP比ASP优秀的七个理由(4)
 - PHP会被淘汰吗?(4)
 - PHP新手上路(四)(4)
 - 如何去学习PHP?(2)
 - 简单入门级php分页代码(2)
 - php中邮箱email 电话等格式的验证(2)
 
