From 2fc4cda746665e9fbf1ec3fbdd0855e6658fcc01 Mon Sep 17 00:00:00 2001 From: zouyi Date: Wed, 30 Jan 2019 13:06:01 +0800 Subject: [PATCH 1/2] Update Process.php somehow, after each consume, its offest remain to be `0`, then I found only one offset in messages are `0` or `1`, to tell the server I've done my job,I force to commit the biggset offset in the message sets. --- src/Consumer/Process.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index 00061fa5..9766bbcb 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -651,7 +651,7 @@ public function succFetch(array $result, int $fd): void foreach ($part['messages'] as $message) { $this->messages[$topic['topicName']][$part['partition']][] = $message; - $offset = $message['offset']; + $offset = $offset > $message['offset'] ? $offset : $message['offset']; } $consumerOffset = ($part['highwaterMarkOffset'] > $offset) ? ($offset + 1) : $offset; From 1b9b14e1be48c353f325e7f9a7487c6834195ead Mon Sep 17 00:00:00 2001 From: zouyi Date: Wed, 30 Jan 2019 22:28:45 +0800 Subject: [PATCH 2/2] Update Process.php --- src/Consumer/Process.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Consumer/Process.php b/src/Consumer/Process.php index 9766bbcb..254ae0b7 100644 --- a/src/Consumer/Process.php +++ b/src/Consumer/Process.php @@ -651,7 +651,7 @@ public function succFetch(array $result, int $fd): void foreach ($part['messages'] as $message) { $this->messages[$topic['topicName']][$part['partition']][] = $message; - $offset = $offset > $message['offset'] ? $offset : $message['offset']; + $offset = $offset > $message['offset'] ? $offset + 1 : $message['offset']; } $consumerOffset = ($part['highwaterMarkOffset'] > $offset) ? ($offset + 1) : $offset;