likang 2 jaren geleden
bovenliggende
commit
67e7125657
1 gewijzigde bestanden met toevoegingen van 27 en 0 verwijderingen
  1. 27 0
      catch/api/service/mysqlToKafuka.php

+ 27 - 0
catch/api/service/mysqlToKafuka.php

@@ -108,6 +108,7 @@ class mysqlToKafuka
      */
     public function toKafuka($data){
 
+<<<<<<< HEAD
         // $conf = new RdKafka\Conf();
         // $conf->setDrMsgCb(function ($kafka, $message) {
         //     file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
@@ -132,6 +133,32 @@ class mysqlToKafuka
             
         //     $rk->poll(50);
         // }
+=======
+        $conf = new RdKafka\Conf();
+        $conf->setDrMsgCb(function ($kafka, $message) {
+            file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
+        });
+        $conf->setErrorCb(function ($kafka, $err, $reason) {
+                file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
+        });
+
+        $rk = new RdKafka\Producer($conf);
+        $rk->setLogLevel(LOG_DEBUG);
+        $rk->addBrokers("127.0.0.1");
+        $cf = new RdKafka\TopicConf();
+        $cf->set('request.required.acks', 0);
+        $topic = $rk->newTopic("test", $cf);
+        $option = 'qkl';
+        for ($i = 0; $i < 20; $i++) {
+             $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
+        }
+        $len = $rk->getOutQLen();
+            while ($len > 0) {
+            $len = $rk->getOutQLen();
+            
+            $rk->poll(50);
+        }
+>>>>>>> d836e7393f81fe9e3002d4b1e17a4169fd485999