Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.

Commit 041d9bf

Browse files
committed
fixed socket loop null
1 parent 76db54d commit 041d9bf

File tree

4 files changed

+21
-7
lines changed

4 files changed

+21
-7
lines changed

src/CommonSocket.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ abstract class CommonSocket
9191
*/
9292
private $saslMechanismProvider = null;
9393

94-
private $loop = null;
94+
protected $loop = null;
9595

9696
/**
9797
* __construct

src/Consumer/State.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class State
2424
const STATUS_PROCESS = 8;
2525
const STATUS_FINISH = 16;
2626

27-
private const CLEAN_REQUEST_STATE = [
27+
private static $cleanRequestState = [
2828
self::REQUEST_METADATA => [],
2929
self::REQUEST_GETGROUP => [],
3030
self::REQUEST_JOINGROUP => [],
@@ -38,7 +38,17 @@ class State
3838

3939
private $callStatus = [];
4040

41-
private $requests = self::CLEAN_REQUEST_STATE;
41+
private $requests = [
42+
self::REQUEST_METADATA => [],
43+
self::REQUEST_GETGROUP => [],
44+
self::REQUEST_JOINGROUP => [],
45+
self::REQUEST_SYNCGROUP => [],
46+
self::REQUEST_HEARTGROUP => [],
47+
self::REQUEST_OFFSET => ['interval' => 2000],
48+
self::REQUEST_FETCH => ['interval' => 100],
49+
self::REQUEST_FETCH_OFFSET => ['interval' => 2000],
50+
self::REQUEST_COMMIT_OFFSET => ['norepeat' => true],
51+
];
4252

4353
private $loop = null;
4454

@@ -103,10 +113,10 @@ public function stop()
103113
$this->removeWatchers();
104114

105115
$this->callStatus = [];
106-
$this->requests = self::CLEAN_REQUEST_STATE;
116+
$this->requests = self::$cleanRequestState;
107117
}
108118

109-
private function removeWatchers(): void
119+
private function removeWatchers()
110120
{
111121
foreach (array_keys($this->requests) as $request) {
112122
if ($this->requests[$request]['watcher'] === null) {

src/Producer.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
<?php
22
namespace Kafka;
33

4+
use \Kafka\Loop;
5+
46
class Producer
57
{
68
use \Psr\Log\LoggerAwareTrait;
79
use \Kafka\LoggerTrait;
810

911
private $process = null;
1012

13+
private $loop = null;
14+
1115
/**
1216
* __construct
1317
*
@@ -22,6 +26,7 @@ public function __construct(callable $producer = null)
2226
} else {
2327
$this->process = new \Kafka\Producer\Process($producer);
2428
}
29+
$this->loop = Loop::getInstance();
2530
}
2631

2732
/**
@@ -39,7 +44,7 @@ public function send($data = true)
3944
if (is_bool($data)) {
4045
$this->process->start();
4146
if ($data) {
42-
\Amp\Loop::run();
47+
$this->loop->run();
4348
}
4449
} else {
4550
return $this->process->send($data);

tests/Base/SocketTest.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,6 @@ private function mockStreamSocketClient($host, $port, $config = null, $sasl = nu
446446
->setMethods($mockMethod)
447447
->setConstructorArgs([$host, $port, $config, $sasl])
448448
->getMock();
449-
$socket->loop = \Kafka\Loop::getInstance();
450449

451450
$socket->method('createSocket')
452451
->will($this->returnCallback(function ($remoteSocket, $context, &$errno, &$error) {

0 commit comments

Comments
 (0)