|
@@ -0,0 +1,186 @@
|
|
|
+<?php
|
|
|
+/*
|
|
|
+ * @Author: 李康
|
|
|
+ * @Date: 2022-09-28 16:48:06
|
|
|
+ * @LastEditors: ‘likang’ zmcoding
|
|
|
+ * @LastEditTime: 2022-09-30 15:06:44
|
|
|
+ * @FilePath: /kafka_oracle/app/test/controller/Test.php
|
|
|
+ * @Description:
|
|
|
+ *
|
|
|
+ * Copyright (c) 2022 by ‘likang’ zmcoding, All Rights Reserved.
|
|
|
+ */
|
|
|
+
|
|
|
+namespace app\test\controller;
|
|
|
+
|
|
|
+use app\BaseController;
|
|
|
+use think\facade\Db;
|
|
|
+
|
|
|
+class Test extends BaseController
|
|
|
+{
|
|
|
+ //public static $brokerList = '116.62.220.88:9092';
|
|
|
+ /**
|
|
|
+ * @description: 发送kafka记录
|
|
|
+ * @return {*}
|
|
|
+ */
|
|
|
+ public function sent()
|
|
|
+ {
|
|
|
+ date_default_timezone_set('PRC');
|
|
|
+ $config = \Kafka\ProducerConfig::getInstance();
|
|
|
+
|
|
|
+ /* Topic的元信息刷新的间隔 */
|
|
|
+ $config->setMetadataRefreshIntervalMs(10000);
|
|
|
+
|
|
|
+ /* 设置broker的地址 */
|
|
|
+ $config->setMetadataBrokerList('192.168.1.106:9092');
|
|
|
+ /* 设置broker的代理版本 */
|
|
|
+ $config->setBrokerVersion('1.0.0');
|
|
|
+ /* 只需要leader确认消息 */
|
|
|
+ $config->setRequiredAck(1);
|
|
|
+ /* 选择异步 */
|
|
|
+ $config->setIsAsyn(false);
|
|
|
+ /* 每500毫秒发送消息 */
|
|
|
+ $config->setProduceInterval(500);
|
|
|
+ /* 创建⼀个⽣产者实例 */
|
|
|
+ $producer = new \Kafka\Producer();
|
|
|
+
|
|
|
+ for($i = 0; $i < 10; $i++ ) {
|
|
|
+
|
|
|
+ $producer->send([
|
|
|
+ [
|
|
|
+ 'topic' => 'test1',
|
|
|
+ 'value' => 'test'.$i,
|
|
|
+ ],
|
|
|
+ ]);
|
|
|
+ }
|
|
|
+ echo '发送成功</br>';
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 接收记录
|
|
|
+ * @return {*}
|
|
|
+ */
|
|
|
+ public function consumer()
|
|
|
+ {
|
|
|
+ $config = \Kafka\ConsumerConfig::getInstance();
|
|
|
+ $config->setMetadataRefreshIntervalMs(10000);
|
|
|
+ $config->setMetadataBrokerList('localhost:9092');
|
|
|
+ $config->setGroupId('test');
|
|
|
+ $config->setBrokerVersion('1.0.0');
|
|
|
+ $config->setTopics(['test']);
|
|
|
+
|
|
|
+ $consumer = new \Kafka\Consumer();
|
|
|
+ //读取kafka内容并存储在数据库中
|
|
|
+ $consumer->start(function($topic, $part, $message) {
|
|
|
+ $json_data = json_decode($message,true);
|
|
|
+ debug_log('接收记录',$message);
|
|
|
+ if(empty($json_data))
|
|
|
+ {
|
|
|
+ debug_log('接收记录',"解析失败");
|
|
|
+ return ;
|
|
|
+ }
|
|
|
+ $mac = Db::name('ADM_DEV')->where('LOGIN_NAME',$json_data['mac'])->find();
|
|
|
+ if(empty($mac))
|
|
|
+ {
|
|
|
+ debug_log('接收记录',"查找不到该mac地址");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if($json_data['methond']=='track')
|
|
|
+ {
|
|
|
+ $this->track($json_data);
|
|
|
+ }
|
|
|
+ elseif($json_data['methond']=='login')
|
|
|
+ {
|
|
|
+ $this->login($json_data);
|
|
|
+ }
|
|
|
+ elseif($json_data['methond']=='heartbeat')
|
|
|
+ {
|
|
|
+ $this->heartbeat($json_data);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ debug_log('接收记录',"类型错误");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ debug_log('接收记录',"上传成功");
|
|
|
+
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+ //如果是轨迹则执行该方法
|
|
|
+ private function track($data)
|
|
|
+ {
|
|
|
+ $mac = $data['mac'];
|
|
|
+ $content = [];
|
|
|
+ if($data['gps']['locationState']=="A")
|
|
|
+ {
|
|
|
+ $content['GPS_X']=$data['gps']['lat'];
|
|
|
+ $content['GPS_Y']=$data['gps']['lng'];
|
|
|
+ }
|
|
|
+ Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content);
|
|
|
+ foreach($data['labels'] as $item)
|
|
|
+ {
|
|
|
+ $lable = [];
|
|
|
+ $lable['RF_FLAGID']=$item['id'];
|
|
|
+ $lable['RF_ID']=$mac;
|
|
|
+ $lable['RF_DATE'] = $this->parsings($item['time']);
|
|
|
+ if($item['event']['entry']==1)
|
|
|
+ {
|
|
|
+ $lable['RF_STAT']=1;
|
|
|
+ }
|
|
|
+ elseif($item['event']['leave']==1)
|
|
|
+ {
|
|
|
+ $lable['RF_STAT']=2;
|
|
|
+ }else
|
|
|
+ {
|
|
|
+ $lable['RF_STAT'] = 0;
|
|
|
+
|
|
|
+ }
|
|
|
+ Db::name('W_DW_RF_RECORD')->insert($lable);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //如果是登录则执行该方法
|
|
|
+ private function login($data)
|
|
|
+ {
|
|
|
+ $mac = $data['mac'];
|
|
|
+ $content = [];
|
|
|
+ if($data['gps']['locationState']=="A")
|
|
|
+ {
|
|
|
+ $content['GPS_X']=$data['gps']['lat'];
|
|
|
+ $content['GPS_Y']=$data['gps']['lng'];
|
|
|
+ }
|
|
|
+ $content['IS_ONLINE']=1;
|
|
|
+ Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content);
|
|
|
+ }
|
|
|
+ //如果是心跳则执行该方法
|
|
|
+ private function heartbeat($data)
|
|
|
+ {
|
|
|
+ $mac = $data['mac'];
|
|
|
+ $content = [];
|
|
|
+ $content['IS_ONLINE']=1;
|
|
|
+ Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content);
|
|
|
+
|
|
|
+ }
|
|
|
+ //解析时间
|
|
|
+ private function parsings($date)
|
|
|
+ {
|
|
|
+ $getdate = '20'.$date;
|
|
|
+ $getdate = str_split($getdate,2);
|
|
|
+ $resout =$getdate[0].$getdate[1]."-".$getdate[2]."-".$getdate[3]." ".$getdate[4].":".$getdate[5].":".$getdate[6];
|
|
|
+ return $resout;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+}
|