NingboAction.class.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. <?php
  2. class NingboAction extends Action {
  3. public function dahua( ){
  4. Vendor('Workerman352.Autoloader');
  5. $worker = new \Workerman\Worker("Dahua://0.0.0.0:1234");
  6. $worker->count = 12;
  7. $worker->onWorkerStart = function($worker){
  8. $timeInterval = 60 * 5;//秒
  9. \Workerman\Lib\Timer::add($timeInterval, function()
  10. {
  11. \Jiaruan\DahuaUtil::rlog("timer exec");
  12. foreach(\Workerman\Connection\TcpConnection::$connections as $connection) {
  13. if (time() - $connection->recvTime > 10 * 60) {
  14. \Jiaruan\DahuaUtil::rlog("close idle", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  15. $connection->close();
  16. }
  17. }
  18. });
  19. \Jiaruan\DahuaUtil::rlog("workstart");
  20. };
  21. $worker->onConnect = function($connection){
  22. $connection->recvTime = time();
  23. \Jiaruan\DahuaUtil::rlog('imsync', "[newConn]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  24. };
  25. $worker->onMessage = function($connection, $data){
  26. $fun = 'fun' . $data['cmd'];
  27. if (method_exists($this, $fun)){
  28. $this->$fun($connection, $data);
  29. } else {
  30. \Jiaruan\DahuaUtil::rlog("ERR", "fun " . $fun . " not exists");
  31. }
  32. };
  33. $worker->onClose = function($connection){
  34. $connection->close();
  35. \Jiaruan\DahuaUtil::rlog('imsync', "[shutConn]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  36. };
  37. $worker->onError = function($connection, $code, $msg)
  38. {
  39. \Jiaruan\DahuaUtil::rlog("[ONERR]", $connection->getRemoteIp() . ':' . $connection->getRemotePort(), $code, $msg);
  40. };
  41. $worker->onBufferFull = function($connection)
  42. {
  43. \Jiaruan\DahuaUtil::rlog("[ONFULL]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  44. };
  45. $worker->onBufferDrain = function($connection)
  46. {
  47. \Jiaruan\DahuaUtil::rlog("[ONDRAIN]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  48. };
  49. $worker->reusePort = true;
  50. \Workerman\Worker::runAll();
  51. }
  52. private function fun83( $connection, $data ){
  53. //心跳 AAAAFF04B28FFE830000C984
  54. \Jiaruan\DahuaUtil::rlog("heartbeat", $data['mac']);
  55. $this->pushQueue(['methond' => 'heartbeat', 'mac' => $data['mac'], 'time' => time()]);
  56. $connection->send($data);
  57. }
  58. private function pushQueue( $data ){
  59. $data = json_encode($data);
  60. $startTime = microtime(true);
  61. \Jiaruan\DahuaUtil::rlog('[pushQueue]', $data);
  62. //return;
  63. $brokerlist = C('KAFKA_BROKER_LIST');
  64. if (empty($brokerlist)) {
  65. \Jiaruan\DahuaUtil::rlog("KAFKA_BROKER_LIST must be config!");
  66. }
  67. $topic = C('ROUTE_INDEX_KAFKA_TOPIC');
  68. if (empty($topic)) {
  69. \Jiaruan\DahuaUtil::rlog("error TRAVEL_ROUTE_INDEX_KAFKA_TOPIC empty");
  70. return;
  71. }
  72. static $rk;
  73. if (!extension_loaded('rdkafka')){
  74. \Jiaruan\DahuaUtil::rlog('pushToKafka fail,extension of rdkafka has not installed!!');
  75. return false;
  76. }
  77. if(!$rk){
  78. $conf = new Rdkafka\Conf();
  79. $conf->set('metadata.broker.list', $brokerlist);
  80. //$conf->set('group.id', $group); 这个参数是消费者的
  81. //$conf->set('batch.num.messages', 2);
  82. //$conf->set('linger.ms', 10);
  83. //$conf->set('log_level', (string) LOG_DEBUG);
  84. //$conf->set('debug', 'all');
  85. $conf->setErrorCb(function($producer, $err, $reason) {
  86. \Jiaruan\DahuaUtil::rlog('err', 'setErrorCb:' . rd_kafka_err2str($err) . ';' . $reason);
  87. });
  88. $conf->setDrMsgCb(function($producer, $msg) {
  89. if($msg->err) {
  90. \Jiaruan\DahuaUtil::rlog('err', 'setDrMsgCb:' . $msg->errstr());
  91. } else {
  92. \Jiaruan\DahuaUtil::rlog('info', "kafka sent ok.");
  93. }
  94. });
  95. $rk = new RdKafka\Producer($conf);
  96. }
  97. //$rk->setLogLevel(LOG_DEBUG);
  98. //$rk->addBrokers($brokerlist);
  99. $topic = $rk->newTopic($topic);
  100. $topic->produce(RD_KAFKA_PARTITION_UA, 0, $data);
  101. //$rk->poll(20000);
  102. \Jiaruan\DahuaUtil::rlog('info', "kafka start poll");
  103. $MAX_K = 2;
  104. $rec = 0;
  105. while ($rk->getOutQLen() > 30000) {//让回调成功或失败生效
  106. $rec++;
  107. if ($rec > $MAX_K) {
  108. break;
  109. }
  110. $rk->poll(10);
  111. }
  112. /*
  113. $result = $rk->flush(10000);
  114. if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
  115. \Jiaruan\Plate::log('error', 'push alarm err');
  116. }
  117. */
  118. $runTime = (microtime(true) - $startTime) * 1000;
  119. if ($rec > $MAX_K) {
  120. \Jiaruan\DahuaUtil::rlog('WARN', 'push perhaps failure runTime:' . intval($runTime) . 'ms');
  121. } else {
  122. \Jiaruan\DahuaUtil::rlog('info', 'push OK runTime:' . intval($runTime) . 'ms');
  123. }
  124. //sleep(2);
  125. }
  126. private function fun80( $connection, $data ){
  127. //登录 AAAAFF04B28FFF8000010077F5AAAAFF04B290FF8000010018F7
  128. $staionInfo = ord($data['buf'][0]);
  129. $gps = '';
  130. if (strlen($data['buf']) > 1) {
  131. $gps = \Jiaruan\DahuaUtil::parseLocation(rtrim(substr($data['buf'], 1), "\0"));//会在右边补null
  132. }
  133. $stationType = \Jiaruan\DahuaUtil::getBit($staionInfo, [7]);
  134. $version = \Jiaruan\DahuaUtil::getBit($staionInfo, ['0-5']);
  135. // $gpsStr = json_encode(json_encode);
  136. // \Jiaruan\DahuaUtil::rlog("login", $data['mac'], "stationType:{$stationType}", "version:{$version}", "gps:{$gpsStr}");
  137. $this->pushQueue(['methond' => 'login', 'mac' => $data['mac'], 'time' => time(),
  138. 'gps' => $gps, 'stationType' => $stationType, 'version' => $version]);
  139. //回复
  140. /*
  141. 数据索引 位 取值范围 备注
  142. 0~5 BCD码 6字节,系统时间,BCD码:YYMMDDHHMMSS,年在先
  143. 6 7 0/1 设备类型:0-野外基站,1-出入管理
  144. 6~0 0~126 心跳间隔,单位30S,(取值+1)*30S
  145. 7 7~0 0~254 野外基站模式RFID离开超时,单位2S,(取值+1)*2S,最大510S,8分钟多
  146. 8 0~254 射频触发读卡持续时间:单位秒。大端存储,0-射频常开,取值1~254,为射频触发读卡持续时间.(取值+1)×30S
  147. 9 0~254 白名单控制字节
  148. 位7~1,保留;
  149. 位0,白名单开启; 0000 0000
  150. 10~25 保留
  151. * */
  152. $bit0_5 = date('ymdHis', time()); //220928140554
  153. $bit6 = 0;
  154. $headBeat = 2;// 0 -> (0+1)*30=30秒 2 -> (2+1)*30=90秒
  155. $stationType = 0;
  156. \Jiaruan\DahuaUtil::setBit($bit6, ['0-6'], $headBeat);
  157. \Jiaruan\DahuaUtil::setBit($bit6, [7], $stationType);
  158. $bit7 = 0xf;// f -> (15+1)*2=32秒
  159. $bit8 = 0;
  160. $bit9 = 0;
  161. $bit10 = 0xb;//宁波这样回复的 不知道含义
  162. $bit11_25 = '';
  163. $data['reply']['buf'] = pack('H12C5a15', $bit0_5, $bit6, $bit7, $bit8, $bit9, $bit10, $bit11_25);
  164. $connection->send($data);
  165. }
  166. private function fun82( $connection, $data ){
  167. //确认包 AAAAFF04B28FFF820000F5D4
  168. \Jiaruan\DahuaUtil::rlog("ack", $data['mac']);
  169. }
  170. private function funfe( $connection, $data ){
  171. //上报两个读头的id号 客户端断开后 重新连接后不会发这个包 AAAAFFFFFFFC00FE00090AFF04B28FFF04B2904576
  172. $upk = unpack('H2unknow/H8id1/H8id2', $data['buf']);
  173. \Jiaruan\DahuaUtil::rlog("ID", $data['mac'], "id1:{$upk['id1']}", "id2:{$upk['id2']}");
  174. }
  175. private function fun84( $connection, $data ){
  176. //AAAAFF04B28F008400AA000003410F700096D714220930020104700096D708220930020106700096D714220930020109700096D708220930020111700096D714220930020114B05C
  177. $subpackageLen = 5;//头部5字节是分包
  178. $labelLen = 11;//每个标签
  179. $buf = $data['buf'];
  180. if (isset($data['lenBug'])) {//把有效数据取出来
  181. if (strlen($buf) < $subpackageLen + $labelLen) {//没有轨迹
  182. return;
  183. }
  184. $dropLen = (strlen($buf) - $subpackageLen) % $labelLen;
  185. $buf = substr($buf, 0, strlen($buf) - $dropLen);
  186. return;//先不处理错误的包了 这行反注释 就会处理错误的包
  187. }
  188. \Jiaruan\DahuaUtil::rlog("track", $data['mac']);
  189. $gps = '';
  190. if ((strlen($buf) - $subpackageLen) % $labelLen == 0) {
  191. //不含gps
  192. } else if ((strlen($buf) - $subpackageLen - 28) % $labelLen == 0) {//28 gps
  193. $gps = parseLocation(rtrim(substr($buf, -28), "\0"));//会在右边补null
  194. $buf = substr($buf, 0, strlen($buf) - 28);
  195. } else {
  196. //协议错误
  197. \Jiaruan\DahuaUtil::rlog('fun84 protocolLen error');
  198. return;
  199. }
  200. //轨迹包
  201. $recordSn = unpack('N', substr($buf, 0, $subpackageLen - 1))[1];
  202. $subpackageMax = \Jiaruan\DahuaUtil::getBit(ord($buf[$subpackageLen - 1]), ['4-7']);
  203. $subpackageCrrent = \Jiaruan\DahuaUtil::getBit(ord($buf[$subpackageLen - 1]), ['0-3']);
  204. $buf = substr($buf, $subpackageLen);
  205. $labelCount = strlen($buf) / $labelLen;
  206. $allLabel = [];
  207. for ($i = 0; $i < $labelCount; $i++) {
  208. $upk = unpack('H8rfid/C1event/H12time', $buf);//4+1+6 11 byte
  209. $item = [];
  210. $item['id'] = $upk['rfid'];
  211. /*
  212. 4 0/1 所处位置:0-场外,1-场内
  213. 3 0/1 离开事件:0-无效,1-离开
  214. 2 0/1 进入事件:0-无效,1-进入
  215. 1 0/1 低电事件:0-无效,1-低电
  216. * */
  217. $item['event']['dec'] = $upk['event'];
  218. $item['event']['lowBattery'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [1]);
  219. $item['event']['entry'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [2]);
  220. $item['event']['leave'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [3]);
  221. $item['event']['in'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [4]);
  222. $item['time'] = strtotime(substr(date('Y', time()), 0, 2) . $upk['time']);//221008155600 --> 1665215760
  223. $buf = substr($buf, $labelLen);
  224. $allLabel[] = $item;
  225. }
  226. $this->pushQueue(['methond' => 'track', 'mac' => $data['mac'], 'gps' => $gps, 'labels' => $allLabel,
  227. 'subpackageMax' => $subpackageMax, 'subpackageCrrent' => $subpackageCrrent, 'recordSn' => $recordSn]);
  228. // $gpsStr = json_encode($gps);
  229. // \Jiaruan\DahuaUtil::rlog("gps:{$gpsStr}", "recordSn:{$recordSn}",
  230. // "subpackageMax:{$subpackageMax}", "subpackageCrrent:{$subpackageCrrent}", "labelCount:{$labelCount}");
  231. $connection->send($data);
  232. }
  233. }