|
@@ -201,7 +201,7 @@ class CronAction extends Action {
|
|
|
});
|
|
|
// Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
|
|
|
// different partitions.
|
|
|
- $conf->set('group.id', 'myConsumerGroup');
|
|
|
+ $conf->set('group.id', 'fenceAlarmMsgGroup');
|
|
|
if( !C('KAFKA_BROKER_LIST') ){
|
|
|
echo 'please set broker list !!! ';
|
|
|
}
|
|
@@ -218,31 +218,31 @@ class CronAction extends Action {
|
|
|
// 订阅轨迹数据topic
|
|
|
$consumer->subscribe(['gps_location_data']);
|
|
|
while (true) {
|
|
|
- $message = $consumer->consume(120*1000);
|
|
|
- switch ($message->err) {
|
|
|
- case RD_KAFKA_RESP_ERR_NO_ERROR:
|
|
|
- // 判断是否超出围栏范围 ,存入 topic:gps_alarm_msg_queue
|
|
|
- $route_info = json_decode($message->payload,true);
|
|
|
- if( empty($route_info) ){
|
|
|
- echo 'empty route info.';
|
|
|
- break;
|
|
|
- }
|
|
|
- $result = $this->produceAcrossAlarmData($route_info);
|
|
|
- if($result){
|
|
|
- echo $result,PHP_EOL;
|
|
|
- debug_log('across_alarm',$result['message']);
|
|
|
- }
|
|
|
- break;
|
|
|
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
|
|
|
- echo "No more messages; will wait for more\n";
|
|
|
- break;
|
|
|
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
|
|
|
- echo "Timed out\n";
|
|
|
- break;
|
|
|
- default:
|
|
|
- throw new \Exception($message->errstr(), $message->err);
|
|
|
- break;
|
|
|
- }
|
|
|
+ $message = $consumer->consume(120*1000);
|
|
|
+ switch ($message->err) {
|
|
|
+ case RD_KAFKA_RESP_ERR_NO_ERROR:
|
|
|
+ // 判断是否超出围栏范围 ,存入 topic:gps_alarm_msg_queue
|
|
|
+ $route_info = json_decode($message->payload,true);
|
|
|
+ if( empty($route_info) ){
|
|
|
+ echo 'empty route info.';
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ $result = $this->produceAcrossAlarmData($route_info);
|
|
|
+ if($result){
|
|
|
+ echo $result,PHP_EOL;
|
|
|
+ debug_log('across_alarm',$result['message']);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case RD_KAFKA_RESP_ERR__PARTITION_EOF:
|
|
|
+ echo "No more messages; will wait for more\n";
|
|
|
+ break;
|
|
|
+ case RD_KAFKA_RESP_ERR__TIMED_OUT:
|
|
|
+ echo "Timed out\n";
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throw new \Exception($message->errstr(), $message->err);
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -451,5 +451,156 @@ class CronAction extends Action {
|
|
|
return $img_file;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ private function produceLockAlarmMessage( $route_info ){
|
|
|
+ if( !$route_info['DeviceId'] ){
|
|
|
+ return array('success' => false, 'message' => 'device id is not exists.');
|
|
|
+ }
|
|
|
+ if( !$route_info['Longitude'] ){
|
|
|
+ return array('success' => false, 'message' => 'longitude is not exists.');
|
|
|
+ }
|
|
|
+ if( !$route_info['Latitude'] ){
|
|
|
+ return array('success' => false, 'message' => 'latitude is not exists.');
|
|
|
+ }
|
|
|
+ if( !$route_info['DeviceTime'] ){
|
|
|
+ return array('success' => false, 'message' => 'device time is not exists.');
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从数据库中取出车牌号,缓存1天
|
|
|
+ if( S('plate-'.$route_info['DeviceId']) ){
|
|
|
+ $plate = S('plate-'.$route_info['DeviceId']);
|
|
|
+ }else{
|
|
|
+ $where = array('GpsDeviceNumber'=>$route_info['DeviceId']);
|
|
|
+ $plate = M('jms_vehicle')->where($where)->getField('LicensePlate');
|
|
|
+ S('plate-'.$route_info['DeviceId'], $plate, 24*60*60);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ //redis获取锁车状态
|
|
|
+ $vehicle = Redis('czapp_lock_status','hash');
|
|
|
+ $lockStatus = $vehicle->get($plate);
|
|
|
+ if(!$lockStatus){
|
|
|
+ return array('success' => false, 'message' => $plate.'-'.$route_info['DeviceId'].' 未启用锁车');
|
|
|
+ }
|
|
|
+
|
|
|
+ $lockInfoRedis = Redis('czapp_lockinfo', 'hash');
|
|
|
+ $lockInfoRes = $lockInfoRedis->get($route_info['DeviceId']);
|
|
|
+ if(!$lockInfoRes){
|
|
|
+ return array('success' => false, 'message' => $plate.'-'.$route_info['DeviceId'].' 无法查到redis锁车信息');
|
|
|
+ }
|
|
|
+ $lockInfoRes = json_decode($lockInfoRes, true);
|
|
|
+
|
|
|
+ /*
|
|
|
+ $deviceSleepTime = C('GPS_SLEEP_TIME');//gps休眠时间
|
|
|
+ if(!$deviceSleepTime){
|
|
|
+ $deviceSleepTime = 300;
|
|
|
+ }
|
|
|
+ $timeInterval = time() - strtotime($lockInfoRes['GpsOnlineTime']);
|
|
|
+ if($timeInterval < $deviceSleepTime){//小于休眠时间不报警
|
|
|
+ echo '--3'.PHP_EOL;
|
|
|
+ return array('success' => false, 'message' => $plate.'-'.$route_info['DeviceId'].' 小于休眠时间不告警');
|
|
|
+ }
|
|
|
+ */
|
|
|
+
|
|
|
+ //是否移动
|
|
|
+ $distance = \Jms\Algo\Geometry::getBdDistance($lockInfoRes['GpsLatitude'], $lockInfoRes['GpsLongitude'], $route_info['Latitude'], $route_info['Longitude']) * 1000;
|
|
|
+
|
|
|
+ $configDist = C('LOCK_ALARM_DISTANCE');//锁车后移动距离
|
|
|
+ if(!$configDist){
|
|
|
+ $configDist = 20;
|
|
|
+ }
|
|
|
+
|
|
|
+ $alarm = false;//默认不告警
|
|
|
+ if($distance > $configDist){
|
|
|
+ $alarm = true;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if($alarm){
|
|
|
+ $alarm_data = array(
|
|
|
+ //"type" => C('FENCE_ALARM'),
|
|
|
+ "type" => \Rlfd\Alarm\PushTypeEnum::LOCK_VEHICLE_ALARM,
|
|
|
+ "title" => "锁车告警",
|
|
|
+ "content" => "车辆 {$plate} 有异常移动,请前往停车处确认是否被盗。",
|
|
|
+ "device_number" => $route_info['DeviceId']
|
|
|
+ );
|
|
|
+ kafkaProducer('gps_alarm_msg_queue', $alarm_data); // 添加到kafka
|
|
|
+ return array('success' => true, 'message' => $plate.'-'.$route_info['DeviceId'].' 添加告警消息到 gps_alarm_msg_queue');
|
|
|
+ }
|
|
|
+ return array('success' => false, 'message' => $plate.'-'.$route_info['DeviceId'].' 无异常');
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public function lockAlarm2Kafka( ){
|
|
|
+ // 从 topic :gps_location_data 取轨迹
|
|
|
+ $conf = new RdKafka\Conf();
|
|
|
+ // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
|
|
|
+ $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
|
|
|
+ switch ($err) {
|
|
|
+ case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
|
|
|
+ echo "Assign: ";
|
|
|
+ var_dump($partitions);
|
|
|
+ $kafka->assign($partitions);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
|
|
|
+ echo "Revoke: ";
|
|
|
+ var_dump($partitions);
|
|
|
+ $kafka->assign(NULL);
|
|
|
+ break;
|
|
|
+
|
|
|
+ default:
|
|
|
+ throw new \Exception($err);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
|
|
|
+ // different partitions.
|
|
|
+ $conf->set('group.id', 'lockAlarmMsgGroup');
|
|
|
+ if( !C('KAFKA_BROKER_LIST') ){
|
|
|
+ echo 'please set broker list !!! ';
|
|
|
+ }
|
|
|
+ // Initial list of Kafka brokers(添加 kafka集群服务器地址)
|
|
|
+ $conf->set('metadata.broker.list', C('KAFKA_BROKER_LIST'));
|
|
|
+ $topicConf = new RdKafka\TopicConf();
|
|
|
+ // Set where to start consuming messages when there is no initial offset in
|
|
|
+ // offset store or the desired offset is out of range.
|
|
|
+ // 'smallest': start from the beginning
|
|
|
+ $topicConf->set('auto.offset.reset', 'smallest');
|
|
|
+ // Set the configuration to use for subscribed/assigned topics
|
|
|
+ $conf->setDefaultTopicConf($topicConf);
|
|
|
+ $consumer = new RdKafka\KafkaConsumer($conf);
|
|
|
+ // 订阅轨迹数据topic
|
|
|
+ $consumer->subscribe(['gps_location_data']);
|
|
|
+ while (true) {
|
|
|
+ $message = $consumer->consume(120*1000);
|
|
|
+ switch ($message->err) {
|
|
|
+ case RD_KAFKA_RESP_ERR_NO_ERROR:
|
|
|
+ // 判断是否超出围栏范围 ,存入 topic:gps_alarm_msg_queue
|
|
|
+ $route_info = json_decode($message->payload,true);
|
|
|
+ if( empty($route_info) ){
|
|
|
+ echo 'empty route info.';
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ $lockAlarmRes = $this->produceLockAlarmMessage($route_info);
|
|
|
+ if($lockAlarmRes){
|
|
|
+ echo $lockAlarmRes,PHP_EOL;
|
|
|
+ debug_log('lock_alarm',$lockAlarmRes['message']);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case RD_KAFKA_RESP_ERR__PARTITION_EOF:
|
|
|
+ echo "No more messages; will wait for more\n";
|
|
|
+ break;
|
|
|
+ case RD_KAFKA_RESP_ERR__TIMED_OUT:
|
|
|
+ echo "Timed out\n";
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throw new \Exception($message->errstr(), $message->err);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
}
|