git vor 2 Jahren
Ursprung
Commit
60285eb835

+ 20 - 0
Home/Common/func.php

@@ -106,4 +106,24 @@
 		file_put_contents($file, '[' . date('Y-m-d H:i:s') . ']' . $data . PHP_EOL,FILE_APPEND);
 	}
 	
+ 
+	function sendSmsAli( $params ){
+		$config = new Darabonba\OpenApi\Models\Config([
+				// 必填,您的 AccessKey ID
+				"accessKeyId" =>'LTAIOSQ4y09Jxbf3',
+				// 必填,您的 AccessKey Secret
+				"accessKeySecret" =>'LQm05sOLBss87lFG6jx9iq4lzKCAqA'
+		]);
+		// 访问的域名
+		$config->endpoint = "dysmsapi.aliyuncs.com";
+		$client =new \AlibabaCloud\SDK\Dysmsapi\V20170525\Dysmsapi($config);
+		$sendSmsRequest = new \AlibabaCloud\SDK\Dysmsapi\V20170525\Models\SendSmsRequest($params);
+		$runtime = new \AlibabaCloud\Tea\Utils\Utils\RuntimeOptions([]);
+		$res=$client->sendSmsWithOptions($sendSmsRequest, $runtime);
+		$reponse = get_object_vars($res->body);
+		return $reponse;
+			 
+		
+	}
+	
 

+ 190 - 0
Home/Lib/Action/NingboFangdaoSyncDataAction.class.php

@@ -0,0 +1,190 @@
+<?php
+
+
+
+class NingboFangdaoSyncDataAction extends Action {
+			
+
+
+ 
+	public  function kafka_index(  ){
+		
+		$broker_list = C('KAFKA_BROKER_LIST');
+		
+		if (empty($broker_list)) {
+			exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
+		}
+		$group = C('SYNC_DATA_KAFKA_GROUP');
+		if (empty($group)) {
+			exit("SYNC_DATA_KAFKA_GROUP must be config!".PHP_EOL);
+		}
+		$topics = C('SYNC_DATA_KAFKA_TOPIC');
+		if (empty($topics)) {
+			exit("SYNC_DATA_KAFKA_TOPIC must be config!".PHP_EOL);
+		}
+		$topics = explode(',',$topics);
+		
+		// 从 topic :rlstation_rfid_location 取轨迹
+		$conf = new RdKafka\Conf();
+		// Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
+		$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
+		    switch ($err) {
+		        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+		            echo "Assign: ";
+		            var_dump($partitions);
+		            $kafka->assign($partitions);
+		            break;
+		
+		         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
+		             echo "Revoke: ";
+		             var_dump($partitions);
+		             $kafka->assign(NULL);
+		             break;
+		
+		         default:
+		            throw new \Exception($err);
+		    }
+		});
+		// Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
+		// different partitions.
+		$conf->set('group.id', $group);
+		// Initial list of Kafka brokers(添加 kafka集群服务器地址)
+		
+		$conf->set('metadata.broker.list', $broker_list);
+		$topicConf = new RdKafka\TopicConf();
+		// Set where to start consuming messages when there is no initial offset in
+		// offset store or the desired offset is out of range.
+		// 'smallest': start from the beginning
+		$topicConf->set('auto.offset.reset', 'latest');
+		// Set the configuration to use for subscribed/assigned topics
+		$conf->setDefaultTopicConf($topicConf);
+		$consumer = new RdKafka\KafkaConsumer($conf);
+		
+		
+		
+		// 订阅轨迹数据topic
+		$consumer->subscribe($topics);
+		
+		$config = C('ORACLE_CONFIG');
+		if (empty($config)) {
+			exit("ORACLE_CONFIG must be config!".PHP_EOL);
+		}
+		$host= $config['host'];
+		$port=  $config['port'];
+		$instance_name= $config['instance_name'];
+		$username= $config['username'];
+		$password=  $config['password'];
+		
+		/*
+		$host= '192.168.100.23';
+		$port=   '1521';
+		$instance_name= 'helowin';
+		$username=  'DSSC3';
+		$password=  'Rliandssc3';
+		*/
+		$conn = oci_connect($username, $password, $host.':'.$port.'/'. $instance_name,'AL32UTF8');
+		if (!$conn) {
+		   $e = oci_error();
+		   trigger_error(htmlentities($e['message'], ENT_QUOTES), E_USER_ERROR);
+		}
+		
+		while (true) {
+			//var_dump($conn);
+		    $message = $consumer->consume(120*1000);
+		
+		    switch ($message->err) {
+		        case RD_KAFKA_RESP_ERR_NO_ERROR:
+						
+						 $data = json_decode($message->payload,true);
+						 if( $data ){
+							
+							$res=$this->addDataToNingbo($data,$conn);
+							if(!$res['success']){
+								throw new \Exception($res['message']);
+							}
+		
+							 //$this->addRfidDataToRenlian($data);
+						 }
+		            break;
+		        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
+		            //echo "No more messages; will wait for more\n";
+		            break;
+		        case RD_KAFKA_RESP_ERR__TIMED_OUT:
+		            echo "Timed out\n";
+		            break;
+		        default:
+					    echo "default break";
+						$this->debug_log( 'default_Log', $message->errstr() );
+						$this->debug_log( 'default_Log', $message->err );
+		            break;
+		    }
+		}
+	}
+	
+ 
+	private  function addDataToNingbo( $data, $conn ){
+		$type_arr=explode('_',$data['DATA_TYPE']);
+		if($type_arr[0]=='vehicle'){
+			 switch ($type_arr['1']) {
+		      case 'save':
+				   $sql='declare
+		            tagId number;
+		        	begin
+		            INSERT INTO DSSC3.W_DW_RFID_TAGS("ID", "RFID_SN","INSTALLER","INSTA_DATE") VALUES (DSSC3.SEQ_W_DW_RFID_TAGS.nextval, \''.$data['RFID_SN'].'\',\''.$data['INSTALLER'].'\', TO_DATE(\''.$data['INSTA_DATE'].'\', \'SYYYY-MM-DD HH24:MI:SS\'))  returning ID into tagId;
+		            INSERT INTO DSSC3.W_DW_NON_MOTOR("ID", "RFID_ID","PLATE_NO","CAR_BRAND","CAR_TYPE") VALUES (DSSC3.SEQ_W_DW_RFID_TAGS.nextval, tagId,\''.$data['PLATE_NO'].'\',\''.$data['CAR_BRAND'].'\',\''.$data['CAR_TYPE'].'\');
+		        	end;';
+		        $stid = oci_parse($conn, $sql);
+		        $r = oci_execute($stid);
+		        oci_free_statement($stid);
+		        break;        
+		      case 'update':
+					//更新
+		        $sql='UPDATE DSSC3.W_DW_NON_MOTOR o,DSSC3.W_DW_RFID_TAGS s SET s.RFID_SN = \''.$params['RFID_SN'].'\' WHERE s.PLATE_NO =\''.$params['OLD_NO'].'\'  AND o.rfid_id = s.id  ';
+		        $stid = oci_parse($conn, $sql);
+		        $r = oci_execute($stid);
+		        $sql='UPDATE DSSC3.W_DW_NON_MOTOR  SET PLATE_NO = \''.$params['PLATE_NO'].'\',CAR_BRAND=\''.$params['CAR_BRAND'].'\',CAR_TYPE=\''.$params['CAR_TYPE'].'\' WHERE PLATE_NO = \''.$params['OLD_NO'].'\' ';
+		        $stid2 = oci_parse($conn, $sql);
+		        $r2 = oci_execute($stid2);
+		      	  break;
+				case 'delete':
+				 	$rfid_ids=[]
+		 		   $sql='SELECT RFID_ID FROM DSSC3.W_DW_NON_MOTOR  WHERE PLATE_NO in ('.$data['PLATE_NO'].')';
+		     	  $stid = oci_parse($conn, $sql);
+		     	  $r = oci_execute($stid);
+		  		 	while($row = oci_fetch_array($stid, OCI_ASSOC+OCI_RETURN_NULLS)) {
+		            $rfid_ids[]=$row['RFID_ID'];
+		  			}
+				 	$sql='DELETE FROM DSSC3.W_DW_RFID_TAGS WHERE ID in ('.$rfid_ids.')';
+				 	$stid = oci_parse($conn, $sql);
+				 	$r = oci_execute($stid);
+				 	$sql='DELETE FROM DSSC3.W_DW_NON_MOTOR WHERE PLATE_NO in ('.$data['PLATE_NO'].')';
+				 	$stid = oci_parse($conn, $sql);
+				 	$r = oci_execute($stid);
+				 	break;
+		      default:
+		     	  break;
+		   }
+		}elseif($type_arr[0]=='station'){
+			 switch ($type_arr['1']) {
+		      case 'save':
+				   M('stations')->createAdd($data);
+		        break;        
+		      case 'update':
+					$where=array(
+						'id'=>$data['update_id'];
+					);
+				 	M('stations')->createSave($where,$data);
+		      	  break;
+				case 'delete':
+				 	//18610,18609
+				 	$deleteIds=explode(',',$date['delete_id']);
+				 	M('stations')->where(['id'=>['in',$$deleteIds]])->delete();
+				 	break;
+		      default:
+		     	  break;
+		   }
+		}
+	}
+	
+
+}

+ 71 - 21
Home/Lib/Action/RouteRfidKafkaAction.class.php

@@ -290,7 +290,7 @@ class RouteRfidKafkaAction extends Action {
 	}
 	
  
-	private  function addControlAlarm( $data ){
+	private  function addControlAlarm( $data, $params ){
 		$save_data=array(
 						'plate_no'=>$data['plate_no'],
 						'rfid_sn'=>$data['RF_FLAGID'],
@@ -301,6 +301,38 @@ class RouteRfidKafkaAction extends Action {
 						'state'=>1,
 		);
 		$res=M('control_alarm')->createAdd($save_data);
+		if($res && $params['is_sent'] && $params['phone']){
+			$time=time()-$params['sent_limit'];
+			$where=[
+				'recipient'=>$params['phone'],
+				'type'=>$data['alarm_type'],
+				'created_at'=>array('GT',$time)
+			];
+			$count=M('sms_send_log')->where($where)->count();
+			if($count){
+				return $res;
+			}
+			$sent_data = [
+		        "signName" => "任联科技",
+		        "templateCode" => "SMS_273620690",
+		        "phoneNumbers" => $params['phone'],
+		        "templateParam" => json_encode(['name' => $data['plate_no'],'time'=>date('Y-m-d H:i:s',$data['time']),'content'=>'在'.$data['address'].'处'.$data['remark']])
+		  	];
+			$response=sendSmsAli($sent_data);
+			$sms_save_data=[
+				'recipient'=>$params['phone'],
+				'type'=>$data['alarm_type'],
+				'created_at'=>time(),
+			];
+			if($response['code']=='ok'){
+				$sms_save_data['sent_result']='发送成功';
+			}else{
+				$sms_save_data['sent_result']=$response['message'];
+			}
+			M('sms_send_log')->createAdd($sms_save_data);
+		}
+		
+		
 		return $res;
 	}
 	
@@ -316,7 +348,7 @@ class RouteRfidKafkaAction extends Action {
 			//存在布控标签 并在时间内
 			if($ve_con && ($data['time']>$ve_con['start_time']) && ($data['time']<$ve_con['end_time'])){
 				$data['alarm_type']='control';
-				$this->addControlAlarm($data);
+				$this->addControlAlarm($data,$ve_con);
 			}
 		
 			//检测区域布控
@@ -341,7 +373,7 @@ class RouteRfidKafkaAction extends Action {
 								//在黑名单内 告警
 								$data['alarm_type']='forbid_in';
 								$data['remark']='驶入黑名单禁入区域';
-								$this->addControlAlarm($data);
+								$this->addControlAlarm($data,$sta_con);
 							}
 						}else{
 							//白名单的不禁止   
@@ -349,7 +381,7 @@ class RouteRfidKafkaAction extends Action {
 								//不在白名单内 告警
 								$data['alarm_type']='forbid_in';
 								$data['remark']='驶入禁入区域';
-								$this->addControlAlarm($data);
+								$this->addControlAlarm($data,$sta_con);
 							}
 						}
 						
@@ -360,14 +392,14 @@ class RouteRfidKafkaAction extends Action {
 								//在黑名单内 告警
 							$data['remark']='驶入活动区域';
 							$data['alarm_type']='activity_in';
-							$this->addControlAlarm($data);
+							$this->addControlAlarm($data,$sta_con);
 						}
 					}
 				}else{
 					//无黑白名单布控  全部禁止
 					$data['remark']='驶入禁入区域';
 					$data['alarm_type']='forbid_in';
-					$this->addControlAlarm($data);
+					$this->addControlAlarm($data,$sta_con);
 				}
 			}
 	}
@@ -477,6 +509,7 @@ class RouteRfidKafkaAction extends Action {
 	
  
 	public  function test(  ){
+		
 		$topic = C('ROUTE_INDEX_KAFKA_TOPIC');
 		
 		
@@ -597,25 +630,42 @@ class RouteRfidKafkaAction extends Action {
 	
  
 	public  function test3(  ){
-		//统计表数据添加
-		for($i=0;$i<5000;$i++){
-			$to_cond=array(
-				'mac'=>'AABBCCDD',
-				'date'=>date('Y-m-d',time())
-			);
-			if(!M('station_passing')->where($to_cond)->count()){
-				$total_data=array(
-					'address'=>'测试AAAAA',
-					'mac'=>'AABBCCDD',
-					'date'=>date('Y-m-d',time()),
-					'num'=>1,
-				);
-				$res=M('station_passing')->createAdd($total_data);
+		$params=[
+			'is_sent'=>1,
+			'phone'=>'15706857065',
+			'sent_limit'=>'600',
+		];
+		if( $params['is_sent'] && $params['phone']){
+			$time=time()-$params['sent_limit'];
+			$where=[
+				'recipient'=>$params['phone'],
+				'type'=>'test',
+				'created_at'=>array('GT',$time)
+			];
+			
+			$sent_data = [
+		        "signName" => "任联科技",
+		        "templateCode" => "SMS_273620690",
+		        "phoneNumbers" => $params['phone'],
+		        "templateParam" => json_encode(['name' => '测试A3123','time'=>date('Y-m-d H:i:s',time()),'content'=>'在滨江区华城街道处超速行驶'])
+		  	];
+			$response=sendSmsAli($sent_data);
+			$sms_save_data=[
+				'recipient'=>$params['phone'],
+				'type'=>'test',
+				'created_at'=>time(),
+			];
+			if($response['code']=='ok'){
+				$sms_save_data['sent_result']='发送成功';
 			}else{
-				$res=M('station_passing')->where($to_cond)->setInc('num');
+				$sms_save_data['sent_result']=$response['message'];
 			}
+			var_dump($sms_save_data);
+			//M('sms_send_log')->createAdd($sms_save_data);
 		}
 		
+		
+		return $res;
 	}
 	
  

+ 71 - 3
Home/Lib/Action/V1Action.class.php

@@ -7,7 +7,7 @@ class V1Action extends Action {
 
 
  
-	public  function kafka2createFile(  ){
+	public  function kafka2createFile_route(  ){
 		ini_set("memory_limit", "1024M");
 		
 		$broker_list = C('KAFKA_BROKER_LIST');
@@ -92,7 +92,7 @@ class V1Action extends Action {
 			switch ($message->err) {
 				case RD_KAFKA_RESP_ERR_NO_ERROR:
 					$locationPack .= ($message->payload) . PHP_EOL;
-					$fileName = $localDir . '/HC_' . $createTime . '.dat';
+					$fileName = $localDir . '/route_' . $createTime . '.dat';
 					$runTime = time() - $timeFram;
 					$sum++;
 					if($sum % 10000 ==0){
@@ -119,7 +119,7 @@ class V1Action extends Action {
 				//echo "No more messages; will wait for more".PHP_EOL;
 				break;
 				case RD_KAFKA_RESP_ERR__TIMED_OUT:
-					$fileName = $localDir . '/HC_' . $createTime . '.dat';
+					$fileName = $localDir . '/route_' . $createTime . '.dat';
 					if ($locationPack != '') {
 						$datRes = $this->writeRouteFile($fileName, $locationPack);
 						if ($datRes) {
@@ -247,5 +247,73 @@ class V1Action extends Action {
 		return $list;
 	}
 	
+ 
+	public  function redis2createFile_station_vehicle(  ){
+		ini_set("memory_limit", "1024M");
+		$redis = Redis('ningbo_fangdao_sync_data','queue');
+		/**模拟数据
+		for($i = 0;$i<23000;$i++){
+			$redis->add('{"PLATE_NO":"余姚698951","RFID_SN":"031FABDC","CAR_TYPE":"1","CAR_BRAND":"1","NAME":"叶春红","ID_CARD_NUMBER":"330219197001274302","MOBILE_NUMBER":"13958363623","type":1,"creator_id":1,"OLD_NO":"余姚698951","DATA_TYPE":"vehicle_update"}');
+		}
+		exit;
+		*/
+		$localDir = C('FTP_LOCAL_DIR');
+		$timeFram = time();
+		$createTime = date('Ymd_His', $timeFram);
+		$fileTimeInterval = C('FTP_FILE_CREATE_INTERVAL');
+		if(!$fileTimeInterval){
+			$fileTimeInterval = 30;
+		}
+		$sum = 0;//消息数据数量
+		$locationPack = '';
+		$total = 0; //总数10000个数  消息总数 = total*10000+endsum
+		$endsum = 0; //最后一次数量
+		while (true) {
+			$message = $redis->pop();
+			if(!$message){
+				echo 'waiting for data from redis'.PHP_EOL;
+				$fileName = $localDir . '/station_' . $createTime . '.dat';
+				if ($locationPack != '') {
+					$datRes = $this->writeRouteFile($fileName, $locationPack);
+					if ($datRes) {
+						$endsum = $sum;
+						$locationPack = '';
+						$sum = 0;
+					}
+					$md5Res = $this->createRouteMD5file($fileName);
+					$timeFram = time();
+					$createTime = date('Ymd_His', $timeFram);
+					echo "Timed out\n";
+					echo "消息总数total:" . ($total * 10000 + $endsum) . PHP_EOL;
+				}
+				sleep(3);
+			}else{
+				$locationPack .=  json_encode($message,JSON_UNESCAPED_UNICODE) . PHP_EOL;
+				$fileName = $localDir . '/station_' . $createTime . '.dat';
+				$runTime = time() - $timeFram;
+				$sum++;
+				if($sum % 10000 ==0){
+					echo $sum . PHP_EOL;
+				}
+				if ($runTime < $fileTimeInterval) {
+					if ($sum % 10000 == 0) {
+						//echo 'start write routefile...' . PHP_EOL;
+						echo 'locationPack length  ' .strlen($locationPack).PHP_EOL;
+						$datRes = $this->writeRouteFile($fileName, $locationPack);
+						if ($datRes) {
+							$locationPack = '';
+							$sum = 0;
+							$total++;
+						}
+					}
+				} else {
+					$md5Res = $this->createRouteMD5file($fileName);
+					$timeFram = time();
+					$createTime = date('Ymd_His', $timeFram);
+				}
+			}
+		}
+	}
+	
 
 }

+ 3 - 1
composer.json

@@ -13,7 +13,9 @@
 		"zmcoding/php-tp313": "dev-master",
 		"zmcoding/cmp_desutils": "dev-master",
 		"jrtkcoder/redlock-php":"dev-master",
-        "alibabacloud/dm-20151123": "^1.0"
+        "alibabacloud/dm-20151123": "^1.0",
+        "alibabacloud/dysmsapi-20170525": "2.0.15",
+		"zmcoding/ftp-file":"dev-master"
     },
 	"config": {
 		"secure-http": false