NoticeAction.class.php 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. <?php
  2. class NoticeAction extends Action {
  3. public function jgpush( ){
  4. $app_key = '58e15e345e0c21b7e638e186';
  5. $master_secret = 'a91d0b1c1fa1b327db832506';
  6. $client = new \JPush\Client($app_key, $master_secret);
  7. $push_payload = $client->push()
  8. ->setPlatform('all')
  9. ->addAllAudience()
  10. ->setNotificationAlert('车辆异动告警');
  11. try {
  12. $response = $push_payload->send();
  13. print_r($response);
  14. } catch (\JPush\Exceptions\APIConnectionException $e) {
  15. // try something here
  16. print $e;
  17. } catch (\JPush\Exceptions\APIRequestException $e) {
  18. // try something here
  19. print $e;
  20. }
  21. }
  22. public function pushFromKafka( ){
  23. $conf = new RdKafka\Conf();
  24. // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
  25. $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  26. switch ($err) {
  27. case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  28. echo "Assign: ";
  29. var_dump($partitions);
  30. $kafka->assign($partitions);
  31. break;
  32. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  33. echo "Revoke: ";
  34. var_dump($partitions);
  35. $kafka->assign(NULL);
  36. break;
  37. default:
  38. throw new \Exception($err);
  39. }
  40. });
  41. // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
  42. // different partitions.
  43. $conf->set('group.id', 'myConsumerGroup');
  44. // Initial list of Kafka brokers(添加 kafka集群服务器地址)
  45. $conf->set('metadata.broker.list', '127.0.0.1');
  46. $topicConf = new RdKafka\TopicConf();
  47. // Set where to start consuming messages when there is no initial offset in
  48. // offset store or the desired offset is out of range.
  49. // 'smallest': start from the beginning
  50. $topicConf->set('auto.offset.reset', 'smallest');
  51. // Set the configuration to use for subscribed/assigned topics
  52. $conf->setDefaultTopicConf($topicConf);
  53. $consumer = new RdKafka\KafkaConsumer($conf);
  54. // 消费者订阅
  55. $consumer->subscribe(['gps_alarm_msg_queue']);
  56. while (true) {
  57. $message = $consumer->consume(120*1000);
  58. switch ($message->err) {
  59. case RD_KAFKA_RESP_ERR_NO_ERROR:
  60. $msg_data = json_decode($message->payload,true);
  61. if($msg_data){
  62. // 使用极光推送消息
  63. $jpush_client = new \JPush\Client( C('JPUSH_APP_KEY'), C('JPUSH_MASTER_SECRET') );
  64. // 电子围栏告警,上锁车辆异动告警,低电量告警,被盗告警,广播消息推送
  65. $this->jpushMsg( $jpush_client, $message->payload );
  66. }
  67. break;
  68. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  69. echo "No more messages; will wait for more\n";
  70. break;
  71. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  72. echo "Timed out\n";
  73. break;
  74. default:
  75. throw new \Exception($message->errstr(), $message->err);
  76. break;
  77. }
  78. }
  79. }
  80. public function jpushMsg( $client, $msg_data ){
  81. /*
  82. $msg_data = '{
  83. "type":2,
  84. "title":"被盗告警",
  85. "content":"车被偷了,赶紧去找",
  86. "device_number":"FFFFFF123122"
  87. }';
  88. $msg_data = '{
  89. "type":9,
  90. "title":"群推消息",
  91. "content":"这是一个广播"
  92. }';
  93. */
  94. $single_push_type = array(
  95. C('LOCK_VEHICLE_ALARM'), // 1-锁车告警
  96. C('STOLEN_ALARM'), // 2-被盗告警
  97. C('FENCE_ALARM'), // 3-电子围栏告警
  98. C('LOWWER_BATTERY_ALARM'), // 4-低电量告警
  99. );
  100. $msg_data = json_decode($msg_data,true);
  101. if(!$msg_data){
  102. echo 'invalid message data format!'.$msg_data . PHP_EOL;
  103. return;
  104. }
  105. // 通过传过来的车牌查出JgClientRegistrationId
  106. if( empty($client) ){
  107. $client = new \JPush\Client( C('JPUSH_APP_KEY'), C('JPUSH_MASTER_SECRET') );
  108. }
  109. if( in_array($msg_data['type'],$single_push_type) && $msg_data['device_number'] ){
  110. $where = array('DeviceNumber|GpsDeviceNumber' => $msg_data['device_number']);
  111. $fields= 'CityId,LicensePlate,FullName,JgClientRegistrationId';
  112. if( !S( 'jpush_vinfo_'.$msg_data['device_number'] ) ){ //如果不存在,则缓存
  113. $vehicle_info = M('jms_vehicle')->field($fields)->where($where)->find();
  114. S( 'jpush_vinfo_'.$msg_data['device_number'], $vehicle_info, 60 ); //缓存1分钟
  115. }
  116. $vehicle_info = S( 'jpush_vinfo_'.$msg_data['device_number'] ); //从缓存取
  117. }
  118. // 推送平台
  119. $platform = array('ios', 'android');
  120. // 通知栏显示内容
  121. $alert = $msg_data['content'];
  122. // 推送android
  123. $android_notification = array(
  124. 'title' => $msg_data['title'],
  125. 'builder_id' => 2, //通知栏样式
  126. );
  127. // 推送ios
  128. $ios_notification = array(
  129. 'sound' => 'default',
  130. 'badge' => '+1',
  131. 'content-available' => true
  132. );
  133. // 可选项
  134. $options = array(
  135. 'sendno' => 100,
  136. 'time_to_live' => 100,
  137. );
  138. // 组装推送
  139. $cid = $client->push()->getCid();
  140. //var_dump($cid['body']['cidlist'][0]);
  141. $push_payload = $client->push()
  142. ->setCid($cid['body']['cidlist'][0])
  143. ->setPlatform( $platform )
  144. ->iosNotification($alert, $ios_notification)
  145. ->androidNotification($alert, $android_notification)
  146. ->options($options);
  147. if( $msg_data['type'] == C('BROADCASTING') ){ // 9-广播
  148. $push_payload->addAllAudience();
  149. }elseif( isset($vehicle_info) && $vehicle_info ){ // 单推
  150. $msg_data['vehicle_info'] = $vehicle_info;
  151. $push_payload->addRegistrationId($vehicle_info['JgClientRegistrationId']);
  152. }else{
  153. //未知类型
  154. var_dump('出错了:'.$msg_data);
  155. return;
  156. }
  157. try {
  158. $response = $push_payload->send();
  159. // 保存日志
  160. $msg_data['response'] = array(
  161. 'code' => $response['http_code'],
  162. 'resp_msg' => 'ok',
  163. );
  164. $this->saveLog($msg_data);
  165. print_r($response);
  166. } catch (\JPush\Exceptions\APIConnectionException $e) {
  167. // try something here
  168. $msg_data['response'] = array(
  169. 'code' => $e->getHttpCode(),
  170. 'resp_msg' => substr($e->__toString(), strpos($e->__toString(),'[')),
  171. );
  172. $this->saveLog($msg_data);
  173. print $e;
  174. } catch (\JPush\Exceptions\APIRequestException $e) {
  175. // try something here
  176. $msg_data['response'] = array(
  177. 'code' => $e->getHttpCode(),
  178. 'resp_msg' => substr($e->__toString(), strpos($e->__toString(),'[')),
  179. );
  180. $this->saveLog($msg_data);
  181. print $e;
  182. }
  183. }
  184. public function saveLog( $msg_data ){
  185. $fdls_app_message = M('jms_baojing_message');
  186. $log_data = array(
  187. 'ID' => create_guid(),
  188. 'Type' => $msg_data['type'],
  189. 'Title' => $msg_data['title'],
  190. 'Comment' => $msg_data['content'],
  191. 'SendStatus' => $msg_data['response']['code'],
  192. 'AddTime' => date('Y-m-d H:i:s'),
  193. 'RespMsg' => $msg_data['response']['resp_msg'],
  194. );
  195. if( $msg_data['type'] != C('BROADCASTING') ){ // 非广播
  196. $log_data['CityId'] = $msg_data['vehicle_info']['CityId'];
  197. $log_data['DeviceNumber'] = $msg_data['device_number'];
  198. $log_data['FullName'] = $msg_data['vehicle_info']['FullName'];
  199. $log_data['LicensePlate'] = $msg_data['vehicle_info']['LicensePlate'];
  200. }
  201. $result = $fdls_app_message->createAdd($log_data);
  202. return $result;
  203. }
  204. public function kafkaProducer( $msg_data ){
  205. if (!extension_loaded('rdkafka')){
  206. echo 'rdkafka extension is not installed!!'.PHP_EOL;
  207. return false;
  208. }
  209. /********************* 初始化生产者配置项 start **************************/
  210. //考勤记录分析结果生产者
  211. $rk = new RdKafka\Producer();
  212. $rk->setLogLevel(LOG_DEBUG);
  213. $rk->addBrokers("127.0.0.1");
  214. $start = microtime(true);
  215. $topic = $rk->newTopic('gps_alarm_msg_queue');
  216. /********************* 初始化生产者配置项 end **************************/
  217. if( empty($msg_data) ){
  218. var_dump($msg_data);
  219. return;
  220. }
  221. if( is_array($msg_data) || is_object($msg_data) ){ // 对象、数组转换
  222. $msg_data = json_encode($msg_data);
  223. }
  224. $topic->produce(RD_KAFKA_PARTITION_UA, 0, $msg_data);
  225. $rk->poll(0);
  226. }
  227. public function mockProduce( ){
  228. $msg_data = '{
  229. "type":9,
  230. "title":"群推消息",
  231. "content":"这是一个广播"
  232. }';
  233. $this->kafkaProducer($msg_data);
  234. }
  235. }