setMetadataRefreshIntervalMs(10000); /* 设置broker的地址 */ $config->setMetadataBrokerList('192.168.1.106:9092'); /* 设置broker的代理版本 */ $config->setBrokerVersion('1.0.0'); /* 只需要leader确认消息 */ $config->setRequiredAck(1); /* 选择异步 */ $config->setIsAsyn(false); /* 每500毫秒发送消息 */ $config->setProduceInterval(500); /* 创建⼀个⽣产者实例 */ $producer = new \Kafka\Producer(); for($i = 0; $i < 10; $i++ ) { $producer->send([ [ 'topic' => 'test1', 'value' => 'test'.$i, ], ]); } echo '发送成功
'; } /** * @description: 接收记录 * @return {*} */ public function consumer() { $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('localhost:9092'); $config->setGroupId('test'); $config->setBrokerVersion('1.0.0'); $config->setTopics(['test']); $consumer = new \Kafka\Consumer(); //读取kafka内容并存储在数据库中 $consumer->start(function($topic, $part, $message) { $json_data = json_decode($message,true); debug_log('接收记录',$message); if(empty($json_data)) { debug_log('接收记录',"解析失败"); return ; } $mac = Db::name('ADM_DEV')->where('LOGIN_NAME',$json_data['mac'])->find(); if(empty($mac)) { debug_log('接收记录',"查找不到该mac地址"); return; } if($json_data['methond']=='track') { $this->track($json_data); } elseif($json_data['methond']=='login') { $this->login($json_data); } elseif($json_data['methond']=='heartbeat') { $this->heartbeat($json_data); } else { debug_log('接收记录',"类型错误"); return; } debug_log('接收记录',"上传成功"); }); } //如果是轨迹则执行该方法 private function track($data) { $mac = $data['mac']; $content = []; if($data['gps']['locationState']=="A") { $content['GPS_X']=$data['gps']['lat']; $content['GPS_Y']=$data['gps']['lng']; } Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content); foreach($data['labels'] as $item) { $lable = []; $lable['RF_FLAGID']=$item['id']; $lable['RF_ID']=$mac; $lable['RF_DATE'] = $this->parsings($item['time']); if($item['event']['entry']==1) { $lable['RF_STAT']=1; } elseif($item['event']['leave']==1) { $lable['RF_STAT']=2; }else { $lable['RF_STAT'] = 0; } Db::name('W_DW_RF_RECORD')->insert($lable); } } //如果是登录则执行该方法 private function login($data) { $mac = $data['mac']; $content = []; if($data['gps']['locationState']=="A") { $content['GPS_X']=$data['gps']['lat']; $content['GPS_Y']=$data['gps']['lng']; } $content['IS_ONLINE']=1; Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content); } //如果是心跳则执行该方法 private function heartbeat($data) { $mac = $data['mac']; $content = []; $content['IS_ONLINE']=1; Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content); } //解析时间 private function parsings($date) { $getdate = '20'.$date; $getdate = str_split($getdate,2); $resout =$getdate[0].$getdate[1]."-".$getdate[2]."-".$getdate[3]." ".$getdate[4].":".$getdate[5].":".$getdate[6]; return $resout; } }