Skip to content

Feature: consumer standoff for service managers #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Configuration extends Component
*/
const DEFAULTS = [
'auto_declare' => true,
'consumer_standoff' => 0,
'connections' => [
[
'name' => self::DEFAULT_CONNECTION_NAME,
Expand Down Expand Up @@ -122,6 +123,7 @@ class Configuration extends Component
];

public $auto_declare = null;
public $consumer_standoff = null;
public $connections = [];
public $producers = [];
public $consumers = [];
Expand Down Expand Up @@ -250,6 +252,10 @@ protected function validateTopLevel()
throw new InvalidConfigException("Option `auto_declare` should be of type boolean.");
}

if (($this->consumer_standoff !== null) && !is_int($this->consumer_standoff)) {
throw new InvalidConfigException("Option `consumer_standoff` should be of type int.");
}

if (!is_array($this->logger)) {
throw new InvalidConfigException("Option `logger` should be of type array.");
}
Expand Down Expand Up @@ -436,6 +442,9 @@ protected function completeWithDefaults()
if (null === $this->auto_declare) {
$this->auto_declare = $defaults['auto_declare'];
}
if (null === $this->consumer_standoff) {
$this->consumer_standoff = $defaults['consumer_standoff'];
}
if (empty($this->logger)) {
$this->logger = $defaults['logger'];
} else {
Expand Down
4 changes: 3 additions & 1 deletion DependencyInjection.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ protected function registerProducers(Configuration $config)
protected function registerConsumers(Configuration $config)
{
$autoDeclare = $config->auto_declare;
$consumerStandoff = $config->consumer_standoff;
foreach ($config->consumers as $options) {
$serviceAlias = sprintf(Configuration::CONSUMER_SERVICE_NAME, $options['name']);
\Yii::$container->setSingleton($serviceAlias, function () use ($options, $autoDeclare) {
\Yii::$container->setSingleton($serviceAlias, function () use ($options, $autoDeclare, $consumerStandoff) {
/**
* @var $connection AbstractConnection
*/
Expand All @@ -144,6 +145,7 @@ protected function registerConsumers(Configuration $config)
\Yii::$container->invoke([$consumer, 'setIdleTimeoutExitCode'], [$options['idle_timeout_exit_code']]);
\Yii::$container->invoke([$consumer, 'setProceedOnException'], [$options['proceed_on_exception']]);
\Yii::$container->invoke([$consumer, 'setDeserializer'], [$options['deserializer']]);
\Yii::$container->invoke([$consumer, 'setStandoff'], [$consumerStandoff]);

return $consumer;
});
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ As PHP daemon especially based upon a framework may be prone to memory leaks, it
#### Auto-declare
By default extension configured in auto-declare mode, which means that on every message published exchanges, queues and bindings will be checked and created if missing. If performance means much to your application you should disable that feature in configuration and use console commands to declare and delete routing schema by yourself.

#### Consumer standoff
PHP processes have a reputation of leaking memory, especially when running as a daemon. It is therefore prudent to limit the runtime of AMQP consumers by frequently reloading them. This is achieved by terminating consumers after handling a maximum number of messages.

However, when using certain service managers (`supervisord`, for example), this presents a problem. If the number of consumed messages is reached within a certain time period, the service manager will consider the consumer to have exited too soon and assume there is a problem with the service. Removing this threshold in the service manager may lead to infinite restart loops.

The chosen solution is to implement a standoff period that is applied before the actual consumer loop is started. This will ensure that the controller action can actually be executed because the bootstrapping has succeeded, but the process waits long enough to satisfy the service manager (the threshold may differ per service manager). The default standoff period is 0 seconds due to the nature of this problem.

Usage
-------------
As the consumer worker will read messages from the queue, execute a callback method and pass a message to it.
Expand Down Expand Up @@ -248,6 +255,7 @@ All configuration options:
```php
$rabbitmq_defaults = [
'auto_declare' => true,
'consumer_standoff' => 0,
'connections' => [
[
'name' => self::DEFAULT_CONNECTION_NAME,
Expand Down
19 changes: 19 additions & 0 deletions components/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class Consumer extends BaseRabbitMQ

protected $name = 'unnamed';

protected $standoff = 0;

private $id;

private $target;
Expand Down Expand Up @@ -159,6 +161,22 @@ public function getName(): string
return $this->name;
}

/**
* @param int $duration in seconds
*/
public function setStandoff(int $duration)
{
$this->standoff = $duration;
}

/**
* @return int
*/
public function getStandoff(): int
{
return $this->standoff;
}

/**
* Resets the consumed property.
* Use when you want to call start() or consume() multiple times.
Expand Down Expand Up @@ -483,6 +501,7 @@ protected function setup()
$this->routing->declareAll();
}
$this->setQosOptions();
sleep($this->standoff);
$this->startConsuming();
}
}
8 changes: 7 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,11 @@
},
"extra": {
"bootstrap": "mikemadisonweb\\rabbitmq\\DependencyInjection"
}
},
"repositories": [
{
"type": "composer",
"url": "https://asset-packagist.org"
}
]
}