pub_func.php 971 B

123456789101112131415161718192021222324252627282930313233343536373839
  1. <?php
  2. function kafkaProducer( $topic, $msg_data ){
  3. if (!extension_loaded('rdkafka')){
  4. echo 'rdkafka extension is not installed!!'.PHP_EOL;
  5. return false;
  6. }
  7. /********************* 初始化生产者配置项 start **************************/
  8. //考勤记录分析结果生产者
  9. $rk = new RdKafka\Producer();
  10. $rk->setLogLevel(LOG_DEBUG);
  11. $rk->addBrokers("127.0.0.1");
  12. $start = microtime(true);
  13. $topic = $rk->newTopic($topic);
  14. /********************* 初始化生产者配置项 end **************************/
  15. if( empty($msg_data) ){
  16. return;
  17. }
  18. if( is_array($msg_data) || is_object($msg_data) ){ // 对象、数组转换
  19. $msg_data = json_encode($msg_data);
  20. }
  21. $topic->produce(RD_KAFKA_PARTITION_UA, 0, $msg_data);
  22. var_dump($rk->poll(0));
  23. }
  24. function mockProduce( ){
  25. $msg_data = '{
  26. "type":9,
  27. "title":"群推消息",
  28. "content":"这是一个广播"
  29. }';
  30. kafkaProducer('gps_location_data',$msg_data);
  31. }