git hace 2 años
padre
commit
71ab4068d0

+ 24 - 17
Home/Lib/Action/CronftpAction.class.php

@@ -10,42 +10,49 @@ class CronftpAction extends Action {
 	public  function searchFile2Ftp(  ){
 		$dir = C('FTP_LOCAL_DIR');
 		$targetDir = C('FTP_SERVER_DIR');
-		$config = C('FTP_CONFIG');
+		// $config = C('FTP_CONFIG');
 		while (true) {
 			$res = scanTargetFile($dir, 'md5');
 			$files = explode(';', $res);
 			array_pop($files);
-			if(empty($files)){
+			if (empty($files)) {
 				sleep(1);
 			}
 			foreach ($files as $v) {
 				$path_parts = pathinfo($v);
 				//$start = time();
 				//判断是否需要压缩,之前压缩过但没上传成功,需要重新上传
-				$zip_file = $path_parts['dirname'].'/' . $path_parts['filename'] . '.zip';
-				if(file_exists($zip_file)){
+				$zip_file = $path_parts['dirname'] . '/' . $path_parts['filename'] . '.zip';
+				if (file_exists($zip_file)) {
 					$zip = $zip_file;
-				}else{
+				} else {
 					$zip = $this->zip($path_parts['filename'], $path_parts['dirname']);
 				}
-				if ($zip) {//压缩完成
-					if(filesize($zip) <= 0){
+				if ($zip) { //压缩完成
+					if (filesize($zip) <= 0) {
 						@unlink($zip);
 						continue;
 					}
 					//echo '压缩用时:'.(time()-$start).PHP_EOL;
 					$tagetDat = $targetDir . '/' . $path_parts['filename'] . '.zip';
 					//echo $tagetDat . PHP_EOL;
-					//删除本地dat文件
-					@unlink($path_parts['dirname'] . '/' . $path_parts['filename'] . '.dat');
-					$datRes = Zmcoding\FtpFile::getInstance($config)->up_file($zip, $tagetDat);
-					echo "datRes:".$datRes.PHP_EOL;
-					if ($datRes) {
-						$md5Res = Zmcoding\FtpFile::getInstance($config)->up_file($v,  $targetDir . '/' . $path_parts['basename']); //md5文件
-						if ($md5Res) {
-							debug_log('upload_info', json_encode($targetDir . '/' . $path_parts['basename'], JSON_UNESCAPED_UNICODE));
-						}
-						debug_log('upload_info', json_encode($tagetDat, JSON_UNESCAPED_UNICODE));
+		
+					// $datRes = Zmcoding\FtpFile::getInstance($config)->up_file($zip, $tagetDat);
+					// echo "datRes:".$datRes.PHP_EOL;
+					// if ($datRes) {
+					// 	$md5Res = Zmcoding\FtpFile::getInstance($config)->up_file($v,  $targetDir . '/' . $path_parts['basename']); //md5文件
+					// 	if ($md5Res) {
+					// 		debug_log('upload_info', json_encode($targetDir . '/' . $path_parts['basename'], JSON_UNESCAPED_UNICODE));
+					// 	}
+					// 	debug_log('upload_info', json_encode($tagetDat, JSON_UNESCAPED_UNICODE));
+					// }
+					if (rename($zip, $tagetDat)) {
+						echo $zip . "文件移动成功!" . PHP_EOL;
+						//删除本地dat文件
+						@unlink($path_parts['dirname'] . '/' . $path_parts['filename'] . '.dat');
+						@unlink($path_parts['dirname'] . '/' . $path_parts['filename'] . '.md5');
+					} else {
+						echo $zip . "文件移动失败!" . $tagetDat . PHP_EOL;
 					}
 				}
 			}

+ 6 - 2
Home/Lib/Action/ListeningFileCreateAction.class.php

@@ -225,14 +225,18 @@ class ListeningFileCreateAction extends Action {
 					$t=0;
 					//如果文件名称带有sta 代表是基站  否则是代表是轨迹
 					if(strstr(pathinfo($res)["filename"],"station")){
-						$topic=$topic1;
+					
+						$publictopic=$topic1;
+					}else{
+						$publictopic=$topic;
 					}
+			
 					foreach($fuc($res) as $value)
 				   {
 						if(!empty($value))
 						{
 						 try{
-							$topic->produce(RD_KAFKA_PARTITION_UA, 0,$value);
+							$publictopic->produce(RD_KAFKA_PARTITION_UA, 0,$value);
 							$rk->poll(0);
 							$t++;
 						   $i++;

+ 247 - 0
Home/Lib/Action/NbInotifyMonitor.php

@@ -0,0 +1,247 @@
+<?php
+require 'DahuaUtil.php';
+class InotifyMonitor {
+             
+             const MONITOR_EVENT  = IN_MOVED_TO;
+             const EVENT_MASK  = [
+                 IN_ACCESS => 'File was accessed (read)',
+                 IN_MODIFY => 'File was modified',
+                 IN_ATTRIB => 'Metadata changed',
+                 IN_CLOSE_WRITE => 'File opened for writing was closed',
+                 IN_CLOSE_NOWRITE => 'File not opened for writing was closed',
+                 IN_OPEN => 'File was opened',
+                 IN_MOVED_TO => 'File moved into watched directory',
+                 IN_MOVED_FROM => 'File moved out of watched directory',
+                 IN_CREATE => 'File or directory created in watched directory',
+                 IN_DELETE => 'File or directory deleted in watched directory',
+                 IN_DELETE_SELF => 'Watched file or directory was deleted',
+                 IN_MOVE_SELF => 'Watch file or directory was moved',
+                 IN_CLOSE => 'Equals to IN_CLOSE_WRITE | IN_CLOSE_NOWRITE',
+                 IN_MOVE => 'Equals to IN_MOVED_FROM | IN_MOVED_TO',
+                 IN_ALL_EVENTS => 'Bitmask of all the above constants',
+                 IN_UNMOUNT => 'File system containing watched object was unmounted',
+                 IN_Q_OVERFLOW => 'Event queue overflowed (wd is -1 for this event)',
+                 IN_IGNORED => 'Watch was removed (explicitly by inotify_rm_watch() or because file was removed or filesystem unmounted',
+                 IN_ISDIR => 'Subject of this event is a directory',
+                 IN_ONLYDIR => 'Only watch pathname if it is a directory',
+                 IN_DONT_FOLLOW => 'Do not dereference pathname if it is a symlink',
+                 IN_MASK_ADD => 'Add events to watch mask for this pathname if it already exists',
+                 IN_ONESHOT => 'Monitor pathname for one event, then remove from watch list.',
+                 1073741840 => 'High-bit: File not opened for writing was closed',
+                 1073741856 => 'High-bit: File was opened',
+                 1073742080 => 'High-bit: File or directory created in watched directory',
+                 1073742336 => 'High-bit: File or directory deleted in watched directory',
+             ];
+          
+             public $fds  = [];
+              
+             public $paths  = [];
+              
+             public $wds = [];
+              
+             public $timeout  = 3;
+             
+             public $redis=null;
+             
+             public $db=1;
+             public $key='redis_to_kafka';
+           
+             public  function __construct( $paths,$ip='127.0.0.1',$password='',$post=6379,$db='1',$key='redis_to_kafka'){
+                 if (!empty($paths)) {
+                     foreach ($paths as $path) {
+                         if (file_exists($path)) {
+                             if (is_dir($path)) {
+                                 $this->addDir($path);
+                             } else {
+                                 $this->addFile($path);
+                             }
+                         }
+                     }
+                 }
+                
+                $this->redis = new Redis();
+                $this->db=$db;
+                $this->key=$key;
+
+                try{
+                    $this->redis->pconnect($ip,$post,2.5);
+                    $this->redis->auth($password); //设置密码
+                    $this->redis->select($this->db);
+                    $result = $this->redis->ping();
+                    if($result=='+PONG')
+                    {
+                      
+                        DahuaUtil::rlog('redis连接成功');
+                    }
+                    else
+                    {
+                        DahuaUtil::rlog("redis连接失败=>".$result.PHP_EOL);
+                
+                    }
+                    //扫描未存起来的redis文件
+                    DahuaUtil::rlog("扫描没有监听到的文件");
+                    $this->scan_file();
+                    DahuaUtil::rlog('已经调用了清理脚本');
+                }catch (Exception $e){
+                    DahuaUtil::rlog("redis连接异常".$e->getMessage());
+                    throw new Exception($e->getMessage());
+                }
+               
+              
+
+             }
+
+
+             public  function __destruct(  ){
+                 if (!empty($this->fds)) {
+                     foreach ($this->fds as $fd) {
+                         fclose($fd);
+                     }
+                 }
+             }
+              
+           
+             public  function addFile( $file ){
+                 $file = realpath($file);
+                 $fd = inotify_init();
+                 $fid = (int)$fd;
+                 //保存inotify资源
+                 $this->fds[$fid] = $fd;
+                 //设置为非阻塞模式
+                 stream_set_blocking($this->fds[$fid], 0);
+                 //保存文件路径
+                 $this->paths[$fid] = $file;
+                 //保存监控描述�?
+                 $this->wds[$fid] =inotify_add_watch($this->fds[$fid], $file, self::MONITOR_EVENT);
+             }
+              
+           
+             public  function addDir( $dir ){
+                 $dir = realpath($dir);
+                 if ($dh = opendir($dir)) {
+                     //将目录加入监控中
+                     $fd = inotify_init();
+                     //一般文件的资源描述符是一个整形,可以用来当索�?
+                     $fid = (int)$fd;
+                     $this->fds[$fid] = $fd;
+                     stream_set_blocking($this->fds[$fid], 0);
+                     $this->paths[$fid] = $dir;
+                     $this->wds[$fid] = inotify_add_watch($this->fds[$fid], $dir, self::MONITOR_EVENT);
+                     //遍历目录下文
+                     while (($file = readdir($dh)) !== false) {
+                         if ($file == '.' || $file == '..') {
+                             continue;
+                         }
+                         $file = $dir . DIRECTORY_SEPARATOR . $file;
+                         if (is_dir($file)) {
+                             $this->addDir($file);
+                         }
+                     }
+                     closedir($dh);
+                 }
+             }
+              
+           
+             public  function remove( $fid ){
+                 unset($this->paths[$fid]);
+                 fclose($this->fds[$fid]);
+                 unset($this->fds[$fid]);
+             }
+              
+           
+             public  function run(  ){
+
+                echo '我开始运行了'.PHP_EOL;
+
+                
+                 while (true) {
+                     $reads = $this->fds;
+                     $write = [];
+                     $except = [];//异常事件
+                     if (stream_select($reads, $write, $except, $this->timeout) > 0) {
+                         if (!empty($reads)) {
+                          
+                             foreach ($reads as $read) {
+                                 //从可读流中读取数�?
+                                 $events = inotify_read($read);
+                                 //资源描述符,整形
+                                 $fid = (int)$read;
+                                 //获取inotify实例的路�?
+                                 $path = $this->paths[$fid];
+                                 foreach ($events as $event) {
+                                    echo '监听读取'.PHP_EOL;
+                                   
+                                     $file = $path .'/' . $event['name'];
+                                     
+                                     if ($event['mask']==IN_MOVED_TO) {
+                                       
+                                         if(!(!strstr($file,'redis')&&strstr($file,'.zip'))){
+                                           continue; 
+                                         }
+                                         echo 'create file->'.$file.PHP_EOL;
+                                         //写入redis
+                                         try
+                                         {
+                                            $this->set_redis($file);
+                                         }catch(Exception $e)
+                                         {
+                                            DahuaUtil::rlog("redis数据库连接".$e->getMessage());
+                                            throw new Exception($e->getMessage());
+                                         }
+                                       
+                                         
+                                     }
+                                      
+                                     //echo $event['name'], ' --- ', self::EVENT_MASK[$event['mask']], PHP_EOL;
+                                 }
+                             }
+                         }
+                         if (!empty($except)) {
+                            DahuaUtil::rlog("监听异常".json_encode($except));
+                           
+                         }
+                     } else {
+                         //echo '------------------', PHP_EOL;
+                         //echo '当前监控的路径列�?, PHP_EOL;
+                         //print_r($this->paths);
+                         //echo '------------------', PHP_EOL;
+                     }
+                 }
+             }
+              
+             /**
+              * function
+              * 文件目录
+              * @param [type] $data
+              * @return void
+              */
+             public function set_redis($data)
+             {
+                $this->redis->ping();
+                
+                if(strstr($data,'.zip'))
+                {   
+                    if(rename($data,$data.'.redis'))
+                    {
+                        $data=$data.'.redis';
+                        $this->redis->lpush($this->key,$data);
+                    }
+                    
+                  
+                }
+               
+
+             }
+             //扫描目录下所有没有上传到redis文件
+             public function scan_file()
+             {  
+                
+                   shell_exec("/home/wwwroot/nbfd_tp3/nb_clear_file_redis.sh > /dev/null 2>&1 &");
+                 // echo shell_exec("/home/wwwroot/nbfd_tp3/clear_file_redis.sh");
+                   return;
+            }
+            
+
+        }
+
+

+ 394 - 0
Home/Lib/Action/NbListeningFileCreateAction.class.php

@@ -0,0 +1,394 @@
+<?php
+
+
+
+class NbListeningFileCreateAction extends Action {
+			
+
+
+ 
+	public  function redis_to_kafka(  ){
+		include('NbInotifyMonitor.php');
+							//文件夹,ip,密码,端口
+				if(!C('WATCH_REDIS'))
+				{
+					DahuaUtil::rlog('WATCH_REDIS 该常量不存在');
+					$str= "'WATCH_REDIS'=>array(
+								'ip'=>'192.168.1.105',
+								'password'=>'',
+								'port'=>'6379',
+								'db'=>1,
+								'key'=>'redis_to_kafka',
+					 );";
+					DahuaUtil::rlog('WATCH_REDIS 具体格式:'.$str);
+					return;
+				
+				}
+				if(!C('WATCH_FTP_ADDRESS'))
+				{
+					DahuaUtil::rlog('WATCH_FTP_ADDRESS 该常量不存在');
+					$str= "'WATCH_FTP_ADDRESS'=>'/home/renlian'";
+					DahuaUtil::rlog('WATCH_FTP_ADDRESS 具体格式:'.$str);
+					return;
+				}
+				$path = C('WATCH_FTP_ADDRESS');
+				$redis = C('WATCH_REDIS');
+				$test = new InotifyMonitor([$path],$redis['ip'],$redis['password'],$redis['6379'],$redis['db'],$redis['key']);
+				$test->run();
+				
+					
+				 
+				
+			}
+			
+		 
+			public  function redis_to_kafka(  ){
+				include('DahuaUtil.php');
+				ini_set('memory_limit', '1024M');
+				if (!extension_loaded('rdkafka')){
+					DahuaUtil::rlog('pushToKafka fail,extension of rdkafka has not installed!!'.PHP_EOL);
+					DahuaUtil::rlog('请重新安装kafka扩展');
+				   return false;
+				}
+				if (!extension_loaded('redis')){
+								
+					DahuaUtil::rlog('redis fail,extension of rdkafka has not installed!!'.PHP_EOL);
+					DahuaUtil::rlog('请重新安装redis扩展');
+					return false;
+				}
+				
+				if(!C('WATCH_REDIS'))
+				{
+					DahuaUtil::rlog('WATCH_REDIS 该常量不存在');
+					$str= "'WATCH_REDIS'=>array(
+						'ip'=>'192.168.1.105',
+						'password'=>'',
+						'port'=>'6379',
+						'db'=>1,
+						'key'=>'redis_to_kafka',
+					);";
+					DahuaUtil::rlog('WATCH_REDIS 具体格式:'.$str);
+					return;
+				}
+				if(!C('WATCH_FTP_ADDRESS'))
+				{
+					DahuaUtil::rlog('WATCH_FTP_ADDRESS 该常量不存在');
+					$str= "'WATCH_FTP_ADDRESS'=>'/home/renlian'";
+					DahuaUtil::rlog('WATCH_FTP_ADDRESS 具体格式:'.$str);
+					return;
+				
+				}
+				if(!C('WATCH_KAFKA'))
+				{
+					DahuaUtil::rlog('WATCH_KAFKA 该常量不存在');
+					$str="'WATCH_KAFKA'=>array(
+					'address'=>'192.168.1.105:9092',
+					'topic'=>'ningbo_rfid_kafka_topic',
+					);";
+				   DahuaUtil::rlog('WATCH_KAFKA 具体格式:'.$str);
+					return;
+				}
+								
+				$path=C('WATCH_FTP_ADDRESS');
+								//kafka配置
+				$topic =C('WATCH_KAFKA')['topic'];
+				$topic1 =C('WATCH_KAFKA')['topic1'];
+				$kafka_address=C('WATCH_KAFKA')['address'];
+								//redis配置
+				$redis_ip=C('WATCH_REDIS')['ip'];
+				$redis_password=C('WATCH_REDIS')['password'];
+				$redis_port=C('WATCH_REDIS')['port'];
+				$redis_key=C('WATCH_REDIS')['key'];
+				$redis_db=C('WATCH_REDIS')['db'];
+				DahuaUtil::rlog('从redis中获取被扫描的文件存入kafka');
+				DahuaUtil::rlog('开始连接kafka');
+				$conf = new Rdkafka\Conf();
+								//$conf->set('batch.num.messages', 2);
+				$conf->set('metadata.broker.list',$kafka_address);
+				$conf->setErrorCb(function($producer, $msg) {
+					DahuaUtil::rlog(rd_kafka_err2str($err), $errstr);
+				});
+								
+				$conf->setDrMsgCb(function($producer, $msg) {
+					if($msg->err) {
+						DahuaUtil::rlog('Message delivery failed:' . $msg->errstr());
+					}
+				    });
+							
+				$rk = new RdKafka\Producer($conf);
+				$topic = $rk->newTopic($topic);
+				$topic1 = $rk->newTopic($topic1);
+				DahuaUtil::rlog("kafka连接成功");
+				
+				try{
+				
+						$redis = new redis();
+						DahuaUtil::rlog('redis连接中....');
+						$redis->connect($redis_ip,$redis_port);
+						$redis->auth($redis_password);
+						$redis->select($redis_db);
+						$result = $redis->ping();
+						if($result=='+PONG')
+						{
+							DahuaUtil::rlog('redis连接成功');
+						}else
+						{
+							DahuaUtil::rlog('redis连接失败');
+							
+						}
+								
+				}catch(Exception $e)
+				{
+					    DahuaUtil::rlog('redis连接异常',$e->getMessage());
+						throw new Exception($e->getMessage());
+					
+				}
+								
+						$i=0;
+					   $f=0;
+						DahuaUtil::rlog('开始扫描文件');
+				while(true)
+				{
+					$data=null;
+					try{
+						 $res =  $redis->rPop($redis_key);
+							
+					}catch(Exception $e)
+				   {
+							DahuaUtil::rlog('redis取出数据异常'.$e->getMessage());
+							throw new Exception($e->getMessage());
+					}
+				    if(!$res)
+					{
+						sleep(1);
+						continue;
+					}
+					if(!file_exists($res))
+					{
+					    continue;
+					}
+									
+					DahuaUtil::rlog('扫描文件路径为:'.$res);
+					$zip = new ZipArchive();
+					DahuaUtil::rlog('开始解压文件:'.$res);
+					$new_res=str_replace(".zip.redis","redis.zip",$res);
+					//将名字中的.redis取消掉
+					rename($res,$new_res);
+					$res=$new_res;
+					if ($zip->open($res)===true){
+						$zip->extractTo($path);
+						$zip->close();
+						//删除压缩包
+						DahuaUtil::rlog('解压文件成功');
+						unlink($res);
+						DahuaUtil::rlog('删除压缩包');
+						$res=str_replace("redis.zip",".dat",$res);
+						DahuaUtil::rlog('压缩包中的文件为'.$res);
+										
+					}else
+					{
+						DahuaUtil::rlog('解压文件失败,进行跳过');
+						continue;
+					}
+						if(!file_exists($res))
+						{
+							DahuaUtil::rlog('解压中的文件不存在');
+							unlink($res);
+							
+							continue;
+						}
+				
+							$f++;
+						   $start_time = microtime(true);
+							$fuc=function($res)
+							{	
+								try{
+									$handle = fopen($res, 'rb');
+									
+								}catch(Exception $e)
+								{
+										DahuaUtil::rlog('打开文件失败'.$e->getMessage());
+									throw new Exception($e->getMessage());
+								}
+									
+								
+								
+										while (feof($handle)===false) {
+											# code...
+											yield fgets($handle);
+							
+										}
+										fclose($handle);
+							};
+						DahuaUtil::rlog('读取文件成功');
+							$t=0;
+							//如果文件名称带有sta 代表是基站  否则是代表是轨迹
+							if(strstr(pathinfo($res)["filename"],"station")){
+								$topic=$topic1;
+							}
+							foreach($fuc($res) as $value)
+						   {
+								if(!empty($value))
+								{
+								 try{
+									$topic->produce(RD_KAFKA_PARTITION_UA, 0,$value);
+									$rk->poll(0);
+									$t++;
+								   $i++;
+									while ($rk->getOutQLen() >10000) {
+											$rk->poll(10);
+								    }	
+									
+								 }catch(Exception $e)
+								{			
+									 		DahuaUtil::rlog('kafka存取异常'."路径是".$res);
+											DahuaUtil::rlog('kafka存取异常'.$e->getMessage());
+									 
+										 throw new Exception($e->getMessage());
+								}
+									
+								}
+										
+						   }
+									
+									
+							DahuaUtil::rlog('kafaka文件成功:'.$res);
+							$end_time = microtime(true);
+							unlink($res);
+							DahuaUtil::rlog('完成扫描文件:'.$res);
+							DahuaUtil::rlog('删除文件:'.$res);
+						  
+							DahuaUtil::rlog('一个文件处理完成时间:'.($end_time-$start_time).'s');
+							DahuaUtil::rlog('上传的文件有'.$t.'条');
+							DahuaUtil::rlog('总共有了'.$i.'条');
+							DahuaUtil::rlog('总共有的文件数据'.$f.'条');
+							echo '______________________________'.PHP_EOL;
+							echo '______________________________'.PHP_EOL;
+							echo '______________________________'.PHP_EOL;
+				  }				
+	}
+	
+ 
+	public  function scan_no_listening(  ){
+		include('DahuaUtil.php');
+				if(!C('WATCH_REDIS'))
+				{
+							DahuaUtil::rlog('WATCH_REDIS 该常量不存在');
+							$str= "'WATCH_REDIS'=>array(
+								'ip'=>'192.168.1.105',
+								'password'=>'',
+								'port'=>'6379',
+								'db'=>1,
+								'key'=>'redis_to_kafka',
+							);";
+							DahuaUtil::rlog('WATCH_REDIS 具体格式:'.$str);
+							return;
+				
+				}
+				if(!C('WATCH_FTP_ADDRESS'))
+				{
+							DahuaUtil::rlog('WATCH_FTP_ADDRESS 该常量不存在');
+							$str= "'WATCH_FTP_ADDRESS'=>'/home/renlian'";
+							DahuaUtil::rlog('WATCH_FTP_ADDRESS 具体格式:'.$str);
+							return;
+				}
+					
+				$dir = C('WATCH_FTP_ADDRESS');
+				$redis=C('WATCH_REDIS');
+				$ip=$redis['ip'];
+				$post=$redis['port'];
+				$password=$redis['password'];
+				$key=$redis['key'];
+				$db=$redis['db'];
+				
+				if(is_file($dir))
+				{
+					return;
+				}
+				$redis = new Redis();
+				DahuaUtil::rlog('连接redis'. $data);
+				try{
+						$redis->connect($ip,$post,2.5);
+						$redis->auth($password); //设置密码
+						$redis->select($db);
+						$result = $redis->ping();
+						if($result=='+PONG')
+						{
+						             
+						      DahuaUtil::rlog('redis连接成功');
+						}
+						else
+						{
+							DahuaUtil::rlog("redis连接失败=>".$result);
+										
+						}
+						                  
+					}catch (Exception $e){
+						
+						DahuaUtil::rlog("redis连接异常".$e->getMessage());
+						throw new Exception($e->getMessage());
+					
+						
+					 }
+							
+				DahuaUtil::rlog('开始扫描文件夹 '. $dir);
+				$files = scandir($dir);
+				foreach($files as $k=>$filename) {//务必使用!==,防止目录下出现类似文件名“0”等情况
+				     if ($filename != "." && $filename != ".."   &&!strstr($filename,'.zip.redis')&&!strstr($filename,'.tump') &&!strstr($filename,'redis.zip')&&strstr($filename,'.zip') &&is_file($dir.'/'.$filename)) {
+						          $data = $dir.'/'.$filename;
+								  
+								     DahuaUtil::rlog('将文件添加到redis中'. $data);
+						          if(rename($data,$data.'.redis'))
+						          {		
+									      try{
+										     $redis->lpush($key,$data.'.redis');
+									      }catch(Exception $e)
+									     {	
+										     DahuaUtil::rlog("redis连接异常".$e->getMessage());
+											  throw new Exception($e->getMessage());
+										
+										//重新连接redis
+										   }
+						              
+						          }
+						    }
+						                   }
+						   closedir($dir);
+							$redis->close();
+						
+				return;
+	}
+	
+ 
+	public  function start_listening(  ){
+		include('NbInotifyMonitor.php');
+							//文件夹,ip,密码,端口
+				if(!C('WATCH_REDIS'))
+				{
+					DahuaUtil::rlog('WATCH_REDIS 该常量不存在');
+					$str= "'WATCH_REDIS'=>array(
+								'ip'=>'192.168.1.105',
+								'password'=>'',
+								'port'=>'6379',
+								'db'=>1,
+								'key'=>'redis_to_kafka',
+					 );";
+					DahuaUtil::rlog('WATCH_REDIS 具体格式:'.$str);
+					return;
+				
+				}
+				if(!C('WATCH_FTP_ADDRESS'))
+				{
+					DahuaUtil::rlog('WATCH_FTP_ADDRESS 该常量不存在');
+					$str= "'WATCH_FTP_ADDRESS'=>'/home/renlian'";
+					DahuaUtil::rlog('WATCH_FTP_ADDRESS 具体格式:'.$str);
+					return;
+				}
+				$path = C('WATCH_FTP_ADDRESS');
+				$redis = C('WATCH_REDIS');
+				$test = new InotifyMonitor([$path],$redis['ip'],$redis['password'],$redis['6379'],$redis['db'],$redis['key']);
+				$test->run();
+	}
+	
+
+}

+ 1 - 0
Home/Lib/Action/NingboAction.class.php

@@ -41,6 +41,7 @@ class NingboAction extends Action {
 		};
 		
 		$worker->onClose = function($connection){
+			$connection->close();
 			\Jiaruan\DahuaUtil::rlog('imsync', "[shutConn]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
 		};
 		

+ 8 - 9
Home/Lib/Action/NingboFangdaoSyncDataAction.class.php

@@ -98,10 +98,7 @@ class NingboFangdaoSyncDataAction extends Action {
 						 $data = json_decode($message->payload,true);
 						 if( $data ){
 							
-							$res=$this->addDataToNingbo($data,$conn);
-							if(!$res['success']){
-								throw new \Exception($res['message']);
-							}
+							$this->addDataToNingbo($data,$conn);
 		
 							 //$this->addRfidDataToRenlian($data);
 						 }
@@ -139,25 +136,27 @@ class NingboFangdaoSyncDataAction extends Action {
 		        break;        
 		      case 'update':
 					//更新
-		        $sql='UPDATE DSSC3.W_DW_NON_MOTOR o,DSSC3.W_DW_RFID_TAGS s SET s.RFID_SN = \''.$params['RFID_SN'].'\' WHERE s.PLATE_NO =\''.$params['OLD_NO'].'\'  AND o.rfid_id = s.id  ';
+		        $sql='UPDATE DSSC3.W_DW_NON_MOTOR o,DSSC3.W_DW_RFID_TAGS s SET s.RFID_SN = \''.$data['RFID_SN'].'\' WHERE o.PLATE_NO =\''.$data['OLD_NO'].'\'  AND o.rfid_id = s.id  ';
 		        $stid = oci_parse($conn, $sql);
 		        $r = oci_execute($stid);
-		        $sql='UPDATE DSSC3.W_DW_NON_MOTOR  SET PLATE_NO = \''.$params['PLATE_NO'].'\',CAR_BRAND=\''.$params['CAR_BRAND'].'\',CAR_TYPE=\''.$params['CAR_TYPE'].'\' WHERE PLATE_NO = \''.$params['OLD_NO'].'\' ';
+		        $sql='UPDATE DSSC3.W_DW_NON_MOTOR  SET PLATE_NO = \''.$data['PLATE_NO'].'\',CAR_BRAND=\''.$data['CAR_BRAND'].'\',CAR_TYPE=\''.$data['CAR_TYPE'].'\' WHERE PLATE_NO = \''.$data['OLD_NO'].'\' ';
 		        $stid2 = oci_parse($conn, $sql);
 		        $r2 = oci_execute($stid2);
 		      	 break;
 				case 'delete':
 				 	$rfid_ids=[];
-		 		   $sql='SELECT RFID_ID FROM DSSC3.W_DW_NON_MOTOR  WHERE PLATE_NO in ('.$data['PLATE_NO'].')';
+				   $str=implode('\',\'',$data['PLATE_NO']);
+		 		   $sql='SELECT RFID_ID FROM DSSC3.W_DW_NON_MOTOR  WHERE PLATE_NO in (\''.$str.'\')';
 		     	  $stid = oci_parse($conn, $sql);
 		     	  $r = oci_execute($stid);
 		  		 	while($row = oci_fetch_array($stid, OCI_ASSOC+OCI_RETURN_NULLS)) {
 		            $rfid_ids[]=$row['RFID_ID'];
 		  			}
-				 	$sql='DELETE FROM DSSC3.W_DW_RFID_TAGS WHERE ID in ('.$rfid_ids.')';
+				 	$rfid_str=implode('\',\'',$rfid_ids);
+				 	$sql='DELETE FROM DSSC3.W_DW_RFID_TAGS WHERE ID in (\''.$rfid_str.'\')';
 				 	$stid = oci_parse($conn, $sql);
 				 	$r = oci_execute($stid);
-				 	$sql='DELETE FROM DSSC3.W_DW_NON_MOTOR WHERE PLATE_NO in ('.$data['PLATE_NO'].')';
+				 	$sql='DELETE FROM DSSC3.W_DW_NON_MOTOR WHERE PLATE_NO in (\''.$str.'\')';
 				 	$stid = oci_parse($conn, $sql);
 				 	$r = oci_execute($stid);
 				 	break;

+ 208 - 0
Home/Lib/Action/QuzhouFangdaoSyncDataAction.class.php

@@ -0,0 +1,208 @@
+<?php
+
+
+
+class QuzhouFangdaoSyncDataAction extends Action {
+			
+
+
+ 
+	public  function kafka_index(  ){
+		
+		$broker_list = C('KAFKA_BROKER_LIST');
+		
+		if (empty($broker_list)) {
+			exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
+		}
+		$group = C('SYNC_DATA_KAFKA_GROUP');
+		if (empty($group)) {
+			exit("SYNC_DATA_KAFKA_GROUP must be config!".PHP_EOL);
+		}
+		$topics = C('SYNC_DATA_KAFKA_TOPIC');
+		if (empty($topics)) {
+			exit("SYNC_DATA_KAFKA_TOPIC must be config!".PHP_EOL);
+		}
+		$topics = explode(',',$topics);
+		
+		// 从 topic :rlstation_rfid_location 取轨迹
+		$conf = new RdKafka\Conf();
+		// Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
+		$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
+		    switch ($err) {
+		        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+		            echo "Assign: ";
+		            var_dump($partitions);
+		            $kafka->assign($partitions);
+		            break;
+		
+		         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
+		             echo "Revoke: ";
+		             var_dump($partitions);
+		             $kafka->assign(NULL);
+		             break;
+		
+		         default:
+		            throw new \Exception($err);
+		    }
+		});
+		// Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
+		// different partitions.
+		$conf->set('group.id', $group);
+		// Initial list of Kafka brokers(添加 kafka集群服务器地址)
+		
+		$conf->set('metadata.broker.list', $broker_list);
+		$topicConf = new RdKafka\TopicConf();
+		// Set where to start consuming messages when there is no initial offset in
+		// offset store or the desired offset is out of range.
+		// 'smallest': start from the beginning
+		$topicConf->set('auto.offset.reset', 'latest');
+		// Set the configuration to use for subscribed/assigned topics
+		$conf->setDefaultTopicConf($topicConf);
+		$consumer = new RdKafka\KafkaConsumer($conf);
+		
+		
+		
+		// 订阅轨迹数据topic
+		$consumer->subscribe($topics);
+		
+		$config = C('ORACLE_CONFIG');
+		if (empty($config)) {
+			exit("ORACLE_CONFIG must be config!".PHP_EOL);
+		}
+		$host= $config['host'];
+		$port=  $config['port'];
+		$instance_name= $config['instance_name'];
+		$username= $config['username'];
+		$password=  $config['password'];
+		
+		/*
+		$host= '192.168.100.23';
+		$port=   '1521';
+		$instance_name= 'helowin';
+		$username=  'DSSC3';
+		$password=  'Rliandssc3';
+		*/
+		$conn = oci_connect($username, $password, $host.':'.$port.'/'. $instance_name,'AL32UTF8');
+		if (!$conn) {
+		   $e = oci_error();
+		   trigger_error(htmlentities($e['message'], ENT_QUOTES), E_USER_ERROR);
+		}
+		
+		while (true) {
+			//var_dump($conn);
+		    $message = $consumer->consume(120*1000);
+		
+		    switch ($message->err) {
+		        case RD_KAFKA_RESP_ERR_NO_ERROR:
+						
+						 $data = json_decode($message->payload,true);
+						 if( $data ){
+							
+							$this->addDataToQuzhou($data,$conn);
+		
+							 //$this->addRfidDataToRenlian($data);
+						 }
+		            break;
+		        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
+		            //echo "No more messages; will wait for more\n";
+		            break;
+		        case RD_KAFKA_RESP_ERR__TIMED_OUT:
+		            echo "Timed out\n";
+		            break;
+		        default:
+					    echo "default break";
+						$this->debug_log( 'default_Log', $message->errstr() );
+						$this->debug_log( 'default_Log', $message->err );
+		            break;
+		    }
+		}
+	}
+	
+ 
+	private  function addDataToQuzhou( $data, $conn ){
+		$type_arr=explode('_',$data['DATA_TYPE']);
+		if($type_arr[0]=='vehicle'){
+			 switch ($type_arr['1']) {
+		      case 'save':
+				   $sql='declare
+		            tagId number;
+						 ownId number;
+		        	begin
+		            INSERT INTO DSSC3.W_DW_RFID_TAGS("ID", "RFID_SN","INSTALLER","INSTA_DATE") VALUES (DSSC3.SEQ_W_DW_RFID_TAGS.nextval, \''.$data['RFID_SN'].'\',\''.$data['INSTALLER'].'\', TO_DATE(\''.$data['INSTA_DATE'].'\', \'SYYYY-MM-DD HH24:MI:SS\'))  returning ID into tagId;
+						 INSERT INTO DSSC3.W_DW_NON_MOTOR_OWNER("ID", "NAME","ID_CARD_NUMBER","MOBILE_NUMBER") VALUES (DSSC3.SEQ_W_DW_NON_MOTOR_OWNER.nextval,\''.$data['NAME'].'\',\''.$data['ID_CARD_NUMBER'].'\',\''.$data['MOBILE_NUMBER'].'\') returning ID into ownId;
+						 INSERT INTO DSSC3.W_DW_NON_MOTOR("ID", "RFID_ID","PLATE_NO","CAR_BRAND","CAR_TYPE","OWNER_ID") VALUES (DSSC3.SEQ_W_DW_RFID_TAGS.nextval, tagId,\''.$data['PLATE_NO'].'\',\''.$data['CAR_BRAND'].'\',\''.$data['CAR_TYPE'].'\',ownId);
+		        	end;';
+		        $stid = oci_parse($conn, $sql);
+		        $r = oci_execute($stid);
+		        oci_free_statement($stid);
+		        break;        
+		      case 'update':
+					//更新
+		        $sql='UPDATE DSSC3.W_DW_NON_MOTOR o,DSSC3.W_DW_RFID_TAGS s SET s.RFID_SN = \''.$data['RFID_SN'].'\' WHERE o.PLATE_NO =\''.$data['OLD_NO'].'\'  AND o.rfid_id = s.id  ';
+		        $stid = oci_parse($conn, $sql);
+		        $r = oci_execute($stid);
+		        $sql='UPDATE DSSC3.W_DW_NON_MOTOR  SET PLATE_NO = \''.$data['PLATE_NO'].'\',CAR_BRAND=\''.$data['CAR_BRAND'].'\',CAR_TYPE=\''.$data['CAR_TYPE'].'\' WHERE PLATE_NO = \''.$data['OLD_NO'].'\' ';
+		        $stid2 = oci_parse($conn, $sql);
+		        $r2 = oci_execute($stid2);
+				 //查车主ID
+		        $vehicle_sql='SELECT o.OWNER_ID FROM DSSC3.W_DW_NON_MOTOR o WHERE o.PLATE_NO =\''.$data['OLD_NO'].'\' ';
+		        $owner_id='';
+		        $stid = oci_parse($conn, $vehicle_sql);
+		        oci_define_by_name($stid, 'OWNER_ID', $owner_id);
+		        oci_execute($stid);
+		        oci_fetch($stid);
+		        $sql='UPDATE DSSC3.W_DW_NON_MOTOR_OWNER SET MOBILE_NUMBER=\''.$data['MOBILE_NUMBER'].'\',NAME=\''.$data['NAME'].'\',ID_CARD_NUMBER=\''.$data['ID_CARD_NUMBER'].'\' WHERE ID = \''.$owner_id.'\' ';
+		        $stid3 = oci_parse($conn, $sql);
+		        $r3 = oci_execute($stid3);
+		      	 break;
+				case 'delete':
+				 	$rfid_ids=[];
+				   $owner_ids=[];
+				   $str=implode('\',\'',$data['PLATE_NO']);
+		 		   $sql='SELECT RFID_ID,OWNER_ID FROM DSSC3.W_DW_NON_MOTOR  WHERE PLATE_NO in (\''.$str.'\')';
+		     	  $stid = oci_parse($conn, $sql);
+		     	  $r = oci_execute($stid);
+		  		 	while($row = oci_fetch_array($stid, OCI_ASSOC+OCI_RETURN_NULLS)) {
+		            $rfid_ids[]=$row['RFID_ID'];
+						 $owner_ids[]=$row['OWNER_ID'];
+		  			}
+				 	$owner_str=implode('\',\'',$owner_ids);
+					$sql='DELETE FROM DSSC3.W_DW_NON_MOTOR_OWNER WHERE ID in ('.$owner_str.')';
+		         $stid = oci_parse($conn, $sql);
+		         $r = oci_execute($stid);
+				 
+				 	$rfid_str=implode('\',\'',$rfid_ids);
+				 	$sql='DELETE FROM DSSC3.W_DW_RFID_TAGS WHERE ID in (\''.$rfid_str.'\')';
+				 	$stid = oci_parse($conn, $sql);
+				 	$r = oci_execute($stid);
+				 	$sql='DELETE FROM DSSC3.W_DW_NON_MOTOR WHERE PLATE_NO in (\''.$str.'\')';
+				 	$stid = oci_parse($conn, $sql);
+				 	$r = oci_execute($stid);
+				 	break;
+		      default:
+		     	  break;
+		   }
+		}elseif($type_arr[0]=='station'){
+			 switch ($type_arr['1']) {
+		      case 'save':
+				 	$data['created_at']=time();
+				   M('stations')->createAdd($data);
+		        break;        
+		      case 'update':
+					$where=array(
+						'shortcode'=>$data['update_code'],
+					);
+				 	M('stations')->createSave($where,$data);
+		      	  break;
+				case 'delete':
+				 	M('stations')->where(['shortcode'=>['in',$data['delete_code']]])->delete();
+				 	break;
+		      default:
+		     	  break;
+		   }
+		}
+		return;
+	}
+	
+
+}

+ 7 - 1
Home/Lib/Action/RouteRfidKafkaAction.class.php

@@ -20,7 +20,7 @@ class RouteRfidKafkaAction extends Action {
 		$station_cond=array('mac'=>$RF_ID);
 		$device_name=M('stations')->where($station_cond)->getField('name');
 		if(!$device_name){
-			return array('success'=>true,'message'=>'addRfidDataToNingbo failed,station not existed!');
+			//return array('success'=>true,'message'=>'addRfidDataToNingbo failed,station not existed!');
 		}
 		if($data['methond']=='heartbeat'){
 			if( ($data['time']<(time()-3600) ) || ($data['time']>(time()+3600) ) ){
@@ -70,6 +70,9 @@ class RouteRfidKafkaAction extends Action {
 				$this->debug_log( 'abnormal_labels', $val);
 				continue;
 			}
+			if(!$val['time']){
+				continue;
+			}
 			$RF_DATE=date('Y-m-d H:i:s',$val['time']);
 			$sql = 'INSERT INTO "DSSC2"."W_DW_RF_RECORD"("ID", "RF_ID", "RF_FLAGID", "RF_DATE", "RF_STAT") VALUES (DSSC2.SEQ_W_DW_RF_RECORD.nextval, \''.$RF_ID.'\', \''.$RF_FLAGID.'\', TO_DATE(\''.$RF_DATE.'\', \'SYYYY-MM-DD HH24:MI:SS\'), \''.$RF_STAT.'\')';
 		 	//var_dump($sql); 
@@ -84,6 +87,9 @@ class RouteRfidKafkaAction extends Action {
 				$this->debug_log( 'insert_oracle_error', $val );
 				return array('success'=>false,'message'=>'addRfidDataToNingbo failed,insert_oracle_error!');
 			}
+			if(!$device_name){
+				continue;
+			}
 			//插入成功就执行统计
 			$handle_data=array(
 				'RF_STAT'=>$RF_STAT,

+ 72 - 53
Home/Lib/Action/V1Action.class.php

@@ -67,22 +67,17 @@ class V1Action extends Action {
 		// Subscribe to topic 'test'
 		$consumer->subscribe($topics);
 		
-		$tmp_list = array();
-		$start = time();
-		$count = 0;
-		
-		
 		//HC_YYYYMMDD_HHIISS.dat
 		//HC_YYYYMMDD_HHIISS.md5
 		$localDir = C('FTP_LOCAL_DIR');
 		$timeFram = time();
-		$createTime = date('Ymd_His', $timeFram);
+		$random = rand(100,999);
+		$createTime = date('Ymd_His', $timeFram).$random;
 		$fileTimeInterval = C('FTP_FILE_CREATE_INTERVAL');
 		if(!$fileTimeInterval){
 			$fileTimeInterval = 30;
 		}
-		// $x = 0;
-		$sum = 0;
+		$sum = 0;//每次写入一个文件的数据条数
 		$locationPack = '';
 		$total = 0; //总数10000个数  消息总数 = total*10000+endsum
 		$endsum = 0; //最后一次数量
@@ -91,48 +86,61 @@ class V1Action extends Action {
 			// $_st  = microtime(TRUE);
 			switch ($message->err) {
 				case RD_KAFKA_RESP_ERR_NO_ERROR:
-					$locationPack .= ($message->payload) . PHP_EOL;
-					$fileName = $localDir . '/route_' . $createTime . '.dat';
-					$runTime = time() - $timeFram;
-					$sum++;
-					if($sum % 10000 ==0){
-						echo $sum . PHP_EOL;
-					}
-					if ($runTime < $fileTimeInterval) {
-						if ($sum % 50000 == 0) {
-							//echo 'start write routefile...' . PHP_EOL;
-							echo 'locationPack length  ' .strlen($locationPack).PHP_EOL;
-							$datRes = $this->writeRouteFile($fileName, $locationPack);
-							if ($datRes) {
-								$locationPack = '';
-								$sum = 0;
-								$total++;
-							}
+				$locationPack .= ($message->payload) . PHP_EOL;
+				$fileName = $localDir . '/route_' . $createTime . '.dat';
+				$runTime = time() - $timeFram;
+				$sum++;
+				if($sum % 10000 ==0){
+					echo $sum . PHP_EOL;
+				}
+				if ($runTime < $fileTimeInterval) {
+					if ($sum % 50000 == 0) {
+						echo "start write routefile to $fileName" . PHP_EOL;
+						//echo 'locationPack length  ' .strlen($locationPack).PHP_EOL;
+						$datRes = $this->writeRouteFile($fileName, $locationPack);
+						if ($datRes) {
+							echo "write success...$sum 条" . PHP_EOL;
+							$locationPack = '';
+							$sum = 0;
+							$total++;
+						}else{
+							$locationPack = '';
+							$sum = 0;
+							echo 'file data write failed ,please check filepath or file permission';
 						}
-					} else {
-						$md5Res = $this->createRouteMD5file($fileName);
-						$timeFram = time();
-						$createTime = date('Ymd_His', $timeFram);
 					}
+				} else {
+					$res = $this->createRouteMD5file($fileName);
+					if($res){
+						echo "file completed" . PHP_EOL;
+					}
+					$timeFram = time();
+					$random = rand(100, 999);
+					$createTime = date('Ymd_His', $timeFram) . $random;
+				}
 				break;
 				case RD_KAFKA_RESP_ERR__PARTITION_EOF:
 				//echo "No more messages; will wait for more".PHP_EOL;
 				break;
 				case RD_KAFKA_RESP_ERR__TIMED_OUT:
-					$fileName = $localDir . '/route_' . $createTime . '.dat';
-					if ($locationPack != '') {
-						$datRes = $this->writeRouteFile($fileName, $locationPack);
-						if ($datRes) {
-							$endsum = $sum;
-							$locationPack = '';
-							$sum = 0;
-						}
+				$fileName = $localDir . '/route_' . $createTime . '.dat';
+				if ($locationPack != '') {
+					echo "start write routefile to $fileName" . PHP_EOL;
+					$datRes = $this->writeRouteFile($fileName, $locationPack);
+					if ($datRes) {
+						$endsum = $sum;
+						$locationPack = '';
+						$sum = 0;
+						echo "write success...$endsum 条" . PHP_EOL;
 					}
-					$md5Res = $this->createRouteMD5file($fileName);
-					$timeFram = time();
-					$createTime = date('Ymd_His', $timeFram);
-					echo "Timed out\n";
-					echo "消息总数total:" . ($total * 50000 + $endsum) . PHP_EOL;
+					$this->createRouteMD5file($fileName);
+					echo "file completed" . PHP_EOL;
+				}
+				$timeFram = time();
+				$random = rand(100,999);
+				$createTime = date('Ymd_His', $timeFram).$random;
+				echo "Timed out\n";
+				echo "消息总数total:" . ($total * 50000 + $endsum) . PHP_EOL;
 				break;
 				default:
 				throw new \Exception($message->errstr(), $message->err);
@@ -147,10 +155,15 @@ class V1Action extends Action {
 		$config = C('FTP_CONFIG');
 		
 		debug_log('route_info', $data);
+		$r = file_put_contents($fileName, $data, FILE_APPEND);
 		
-		$res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $data);
-		
-		return $res;
+		if($r === false){
+			debug_log('route_info', 'file write fail'.$fileName);
+			return false; 
+		}
+		return true;
+		//$res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $data);
+		//return $res;
 	}
 	
  
@@ -163,17 +176,23 @@ class V1Action extends Action {
 			unlink($fileName);
 			return false;
 		}
-		$config = C('FTP_CONFIG');
-		//$writeData = md5_file($fileName);
+		//$config = C('FTP_CONFIG');
+		
+		//$pathInfo = pathinfo($fileName);
+		//$md5File = $pathInfo['dirname'].'/'.$pathInfo['filename'].'.md5';
+		//$res = Zmcoding\FtpFile::getInstance($config)->writeFile($md5File, '');
 		
+		//return $res;
 		$pathInfo = pathinfo($fileName);
 		$md5File = $pathInfo['dirname'].'/'.$pathInfo['filename'].'.md5';
 		
-		//$writeData = $writeData. '  '. $pathInfo['basename'];
+		$r = file_put_contents($md5File, '', FILE_APPEND);
 		
-		$res = Zmcoding\FtpFile::getInstance($config)->writeFile($md5File, '');
-		
-		return $res;
+		if($r === false){
+			debug_log('md5', 'file write fail'.$md5File);
+			return false; 
+		}
+		return true;
 	}
 	
  
@@ -280,7 +299,7 @@ class V1Action extends Action {
 						$locationPack = '';
 						$sum = 0;
 					}
-					$md5Res = $this->createRouteMD5file($fileName);
+					$this->createRouteMD5file($fileName);
 					$timeFram = time();
 					$createTime = date('Ymd_His', $timeFram);
 					echo "Timed out\n";
@@ -307,7 +326,7 @@ class V1Action extends Action {
 						}
 					}
 				} else {
-					$md5Res = $this->createRouteMD5file($fileName);
+					$this->createRouteMD5file($fileName);
 					$timeFram = time();
 					$createTime = date('Ymd_His', $timeFram);
 				}

+ 1 - 0
nb_clear_file_redis.sh

@@ -0,0 +1 @@
+php index.php  nb_listening_file_create/scan_no_listening

+ 1 - 0
nb_listening_add_file.sh

@@ -0,0 +1 @@
+php index.php start nb_listening_file_create/start_listening

+ 2 - 0
nb_redis_to_kafka.sh

@@ -0,0 +1,2 @@
+php index.php start nb_listening_file_create/redis_to_kafka
+