diff --git a/bin/resque b/bin/resque index 1d604851..da0f3e9d 100755 --- a/bin/resque +++ b/bin/resque @@ -9,15 +9,15 @@ $files = array( __DIR__ . '/../vendor/autoload.php', ); -$found = false; +$loader = null; foreach ($files as $file) { if (file_exists($file)) { - require_once $file; + $loader = require_once $file; break; } } -if (!class_exists('Composer\Autoload\ClassLoader', false)) { +if (!($loader instanceof Composer\Autoload\ClassLoader)) { die( 'You need to set up the project dependencies using the following commands:' . PHP_EOL . 'curl -s http://getcomposer.org/installer | php' . PHP_EOL . @@ -25,6 +25,9 @@ if (!class_exists('Composer\Autoload\ClassLoader', false)) { ); } +use Resque\Reserver\ReserverFactory; +use Resque\Reserver\UnknownReserverException; + $QUEUE = getenv('QUEUE'); if(empty($QUEUE)) { die("Set QUEUE env var containing the list of queues to work.\n"); @@ -41,10 +44,11 @@ $REDIS_BACKEND = getenv('REDIS_BACKEND'); // A redis database number $REDIS_BACKEND_DB = getenv('REDIS_BACKEND_DB'); if(!empty($REDIS_BACKEND)) { - if (empty($REDIS_BACKEND_DB)) + if (empty($REDIS_BACKEND_DB)) { Resque::setBackend($REDIS_BACKEND); - else + } else { Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB); + } } $logLevel = false; @@ -67,13 +71,32 @@ if($APP_INCLUDE) { require_once $APP_INCLUDE; } +// re-register the composer autoloader so that we use Resque here, in case APP_INCLUDE depends on a different version +$loader->register(true); + // See if the APP_INCLUDE containes a logger object, // If none exists, fallback to internal logger if (!isset($logger) || !is_object($logger)) { $logger = new Resque_Log($logLevel); } -$BLOCKING = getenv('BLOCKING') !== FALSE; +$reserverFactory = new ReserverFactory($logger); +Resque_Worker::setReserverFactory($reserverFactory); + +$queues = explode(',', $QUEUE); +if (!is_array($queues)) { + $queues = array($queues); +} + +$reserver = null; +try { + $reserver = $reserverFactory->createReserverFromEnvironment($queues); + $logger->notice('Using reserver {reserver}', array('reserver' => $reserver->getName())); +} catch (UnknownReserverException $exception) { + $logger->emergency("Could not create reserver: {error}", ['error' => $exception->getMessage()]); + die; +} + $interval = 5; $INTERVAL = getenv('INTERVAL'); @@ -102,19 +125,17 @@ if($count > 1) { } // Child, start the worker else if(!$pid) { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); + $worker = new Resque_Worker($reserver, $queues); $worker->setLogger($logger); $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); - $worker->work($interval, $BLOCKING); + $worker->work($interval); break; } } } // Start a single worker else { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); + $worker = new Resque_Worker($reserver, $queues); $worker->setLogger($logger); $PIDFILE = getenv('PIDFILE'); @@ -124,6 +145,5 @@ else { } $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); - $worker->work($interval, $BLOCKING); + $worker->work($interval); } -?> diff --git a/composer.json b/composer.json index b12fa291..e45aabe6 100644 --- a/composer.json +++ b/composer.json @@ -37,5 +37,10 @@ "psr-0": { "Resque": "lib" } + }, + "autoload-dev": { + "psr-0": { + "Resque": "test/" + } } } diff --git a/composer.lock b/composer.lock index 0f431b90..128cee07 100644 --- a/composer.lock +++ b/composer.lock @@ -4,21 +4,20 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", "This file is @generated automatically" ], - "hash": "41124ffd15a15b52947e430b92b8f10f", "content-hash": "11906622d4e017ff6807c6dff51f208d", "packages": [ { "name": "colinmollenhour/credis", - "version": "1.7", + "version": "1.8.1", "source": { "type": "git", "url": "https://github.com/colinmollenhour/credis.git", - "reference": "74b2b703da5c58dc07fb97e8954bc63280b469bf" + "reference": "215810e7161748a99dbc37020d38068a80aa0805" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/colinmollenhour/credis/zipball/74b2b703da5c58dc07fb97e8954bc63280b469bf", - "reference": "74b2b703da5c58dc07fb97e8954bc63280b469bf", + "url": "https://api.github.com/repos/colinmollenhour/credis/zipball/215810e7161748a99dbc37020d38068a80aa0805", + "reference": "215810e7161748a99dbc37020d38068a80aa0805", "shasum": "" }, "require": { @@ -44,7 +43,7 @@ ], "description": "Credis is a lightweight interface to the Redis key-value store which wraps the phpredis library when available for better performance.", "homepage": "https://github.com/colinmollenhour/credis", - "time": "2016-03-24 15:50:52" + "time": "2017-03-25T03:27:34+00:00" }, { "name": "psr/log", @@ -82,7 +81,7 @@ "psr", "psr-3" ], - "time": "2012-12-21 11:40:51" + "time": "2012-12-21T11:40:51+00:00" } ], "packages-dev": [ @@ -145,20 +144,20 @@ "testing", "xunit" ], - "time": "2014-09-02 10:13:14" + "time": "2014-09-02T10:13:14+00:00" }, { "name": "phpunit/php-file-iterator", - "version": "1.4.1", + "version": "1.4.2", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-file-iterator.git", - "reference": "6150bf2c35d3fc379e50c7602b75caceaa39dbf0" + "reference": "3cc8f69b3028d0f96a9078e6295d86e9bf019be5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-file-iterator/zipball/6150bf2c35d3fc379e50c7602b75caceaa39dbf0", - "reference": "6150bf2c35d3fc379e50c7602b75caceaa39dbf0", + "url": "https://api.github.com/repos/sebastianbergmann/php-file-iterator/zipball/3cc8f69b3028d0f96a9078e6295d86e9bf019be5", + "reference": "3cc8f69b3028d0f96a9078e6295d86e9bf019be5", "shasum": "" }, "require": { @@ -192,7 +191,7 @@ "filesystem", "iterator" ], - "time": "2015-06-21 13:08:43" + "time": "2016-10-03T07:40:28+00:00" }, { "name": "phpunit/php-text-template", @@ -233,29 +232,34 @@ "keywords": [ "template" ], - "time": "2015-06-21 13:50:34" + "time": "2015-06-21T13:50:34+00:00" }, { "name": "phpunit/php-timer", - "version": "1.0.8", + "version": "1.0.9", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-timer.git", - "reference": "38e9124049cf1a164f1e4537caf19c99bf1eb260" + "reference": "3dcf38ca72b158baf0bc245e9184d3fdffa9c46f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-timer/zipball/38e9124049cf1a164f1e4537caf19c99bf1eb260", - "reference": "38e9124049cf1a164f1e4537caf19c99bf1eb260", + "url": "https://api.github.com/repos/sebastianbergmann/php-timer/zipball/3dcf38ca72b158baf0bc245e9184d3fdffa9c46f", + "reference": "3dcf38ca72b158baf0bc245e9184d3fdffa9c46f", "shasum": "" }, "require": { - "php": ">=5.3.3" + "php": "^5.3.3 || ^7.0" }, "require-dev": { - "phpunit/phpunit": "~4|~5" + "phpunit/phpunit": "^4.8.35 || ^5.7 || ^6.0" }, "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0-dev" + } + }, "autoload": { "classmap": [ "src/" @@ -277,7 +281,7 @@ "keywords": [ "timer" ], - "time": "2016-05-12 18:03:57" + "time": "2017-02-26T11:10:40+00:00" }, { "name": "phpunit/php-token-stream", @@ -327,7 +331,7 @@ "keywords": [ "tokenizer" ], - "time": "2014-03-03 05:10:30" + "time": "2014-03-03T05:10:30+00:00" }, { "name": "phpunit/phpunit", @@ -400,7 +404,7 @@ "testing", "xunit" ], - "time": "2014-10-17 09:04:17" + "time": "2014-10-17T09:04:17+00:00" }, { "name": "phpunit/phpunit-mock-objects", @@ -449,20 +453,20 @@ "mock", "xunit" ], - "time": "2013-01-13 10:24:48" + "time": "2013-01-13T10:24:48+00:00" }, { "name": "symfony/yaml", - "version": "v2.8.12", + "version": "v2.8.19", "source": { "type": "git", "url": "https://github.com/symfony/yaml.git", - "reference": "e7540734bad981fe59f8ef14b6fc194ae9df8d9c" + "reference": "286d84891690b0e2515874717e49360d1c98a703" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/yaml/zipball/e7540734bad981fe59f8ef14b6fc194ae9df8d9c", - "reference": "e7540734bad981fe59f8ef14b6fc194ae9df8d9c", + "url": "https://api.github.com/repos/symfony/yaml/zipball/286d84891690b0e2515874717e49360d1c98a703", + "reference": "286d84891690b0e2515874717e49360d1c98a703", "shasum": "" }, "require": { @@ -498,7 +502,7 @@ ], "description": "Symfony Yaml Component", "homepage": "https://symfony.com", - "time": "2016-09-02 01:57:56" + "time": "2017-03-20T09:41:44+00:00" } ], "aliases": [], diff --git a/lib/Resque/Reserver/AbstractReserver.php b/lib/Resque/Reserver/AbstractReserver.php new file mode 100644 index 00000000..54cde8b6 --- /dev/null +++ b/lib/Resque/Reserver/AbstractReserver.php @@ -0,0 +1,59 @@ +logger = $logger; + $this->queues = $queues; + } + + /** + * {@inheritDoc} + */ + public function getQueues() + { + if (in_array('*', $this->queues)) { + $queues = Resque::queues(); + sort($queues); + return $queues; + } + + return $this->queues; + } + + /** + * {@inheritDoc} + */ + public function waitAfterReservationAttempt() + { + return true; + } + + /** + * {@inheritDoc} + */ + public function getName() + { + $name = get_class($this); + $name = str_replace(__NAMESPACE__, '', $name); + return trim($name, '\\'); + } +} diff --git a/lib/Resque/Reserver/BlockingListPopReserver.php b/lib/Resque/Reserver/BlockingListPopReserver.php new file mode 100644 index 00000000..3008a77a --- /dev/null +++ b/lib/Resque/Reserver/BlockingListPopReserver.php @@ -0,0 +1,59 @@ +timeout = $timeout; + parent::__construct($logger, $queues); + } + + /** + * {@inheritDoc} + */ + public function reserve() + { + $job = Resque_Job::reserveBlocking($this->getQueues(), $this->timeout); + if ($job) { + $this->logger->info("[{reserver}] Found job on queue '{queue}'", array( + 'queue' => $job->queue, + 'reserver' => $this->getName(), + )); + return $job; + } + return null; + } + + /** + * {@inheritDoc} + */ + public function waitAfterReservationAttempt() + { + return false; + } +} diff --git a/lib/Resque/Reserver/QueueOrderReserver.php b/lib/Resque/Reserver/QueueOrderReserver.php new file mode 100644 index 00000000..8c6a0dbc --- /dev/null +++ b/lib/Resque/Reserver/QueueOrderReserver.php @@ -0,0 +1,41 @@ +getQueues() as $queue) { + $this->logger->debug("[{reserver}] Checking queue '{queue}' for jobs", array( + 'queue' => $queue, + 'reserver' => $this->getName(), + )); + + $job = Resque_Job::reserve($queue); + if ($job) { + $this->logger->info("[{reserver}] Found job on queue '{queue}'", array( + 'queue' => $queue, + 'reserver' => $this->getName(), + )); + return $job; + } + } + + return null; + } +} diff --git a/lib/Resque/Reserver/RandomQueueOrderReserver.php b/lib/Resque/Reserver/RandomQueueOrderReserver.php new file mode 100644 index 00000000..1072243a --- /dev/null +++ b/lib/Resque/Reserver/RandomQueueOrderReserver.php @@ -0,0 +1,21 @@ +logger = $logger; + } + + /** + * Creates a reserver given its name in snake case format. + * + * @param string $name + * @return ReserverInterface + * @throws UnknownReserverException + */ + public function createReserverFromName($name, array $queues) + { + $parts = explode('_', $name); + $parts = array_map(function ($word) { + return ucfirst(strtolower($word)); + }, $parts); + + $methodName = 'create' . implode('', $parts) . 'Reserver'; + + if (!method_exists($this, $methodName)) { + throw new UnknownReserverException("Unknown reserver '$name' - could not find factory method $methodName"); + } + + return $this->$methodName($queues); + } + + /** + * Creates a reserver based off the environment configuration. + * + * The following environment vars are checked (in this order): + * - BLOCKING: Creates a BlockingListPopReserver (any non empty value) + * - RESERVER: Creates a reserver specified in snake case format without the reserver suffix, eg. 'random_queue_order' + * + * If neither var is specified, the default resever (QueueOrderReserver) is created. + * + * @param array $queues + * @return ReserverInterface + * @throws UnknownReserverException If the reserver specified in RESERVER could not be found. + */ + public function createReserverFromEnvironment(array $queues) + { + if (!empty(getenv('BLOCKING'))) { + $reserver = $this->createBlockingListPopReserver($queues); + } elseif (getenv('RESERVER') !== false) { + $reserver = $this->createReserverFromName((string)getenv('RESERVER'), $queues); + } else { + $reserver = $this->createDefaultReserver($queues); + } + + return $reserver; + } + + /** + * Creates the default reserver. + * + * @param array $queues + * @return ReserverInterface + */ + public function createDefaultReserver(array $queues) + { + return $this->createReserverFromName(self::DEFAULT_RESERVER, $queues); + } + + /** + * @param array $queues + * @return QueueOrderReserver + */ + public function createQueueOrderReserver(array $queues) + { + return new QueueOrderReserver($this->logger, $queues); + } + + /** + * @param array $queues + * @return RandomQueueOrderReserver + */ + public function createRandomQueueOrderReserver(array $queues) + { + return new RandomQueueOrderReserver($this->logger, $queues); + } + + /** + * @param array $queues + * @return BlockingListPopReserver + */ + public function createBlockingListPopReserver(array $queues) + { + $timeout = getenv('BPLOP_TIMEOUT'); + if ($timeout === false) { + $timeout = getenv('INTERVAL'); + } + + if ($timeout === false || $timeout < 0) { + $timeout = BlockingListPopReserver::DEFAULT_TIMEOUT; + } + + return new BlockingListPopReserver($this->logger, $queues, (int)$timeout); + } +} diff --git a/lib/Resque/Reserver/ReserverInterface.php b/lib/Resque/Reserver/ReserverInterface.php new file mode 100644 index 00000000..6e151e51 --- /dev/null +++ b/lib/Resque/Reserver/ReserverInterface.php @@ -0,0 +1,40 @@ +logger = new Resque_Log(); + $this->reserver = $reserver; + $this->logger = new Resque_Log(); if(!is_array($queues)) { $queues = array($queues); @@ -76,6 +91,16 @@ public function __construct($queues) $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); } + /** + * Sets the reserver factory instance. Used by the find() method to create worker instances. + * + * @param ReserverFactory $reserverFactory + */ + public static function setReserverFactory(ReserverFactory $reserverFactory) + { + self::$reserverFactory = $reserverFactory; + } + /** * Return all workers known to Resque as instantiated instances. * @return array @@ -119,7 +144,9 @@ public static function find($workerId) list($hostname, $pid, $queues) = explode(':', $workerId, 3); $queues = explode(',', $queues); - $worker = new self($queues); + + $reserver = self::$reserverFactory->createDefaultReserver($queues); + $worker = new self($reserver, $queues); $worker->setId($workerId); return $worker; } @@ -142,7 +169,7 @@ public function setId($workerId) * * @param int $interval How often to check for new jobs across the queues. */ - public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) + public function work($interval = Resque::DEFAULT_INTERVAL) { $this->updateProcLine('Starting'); $this->startup(); @@ -154,36 +181,25 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) // Attempt to find and reserve a job $job = false; - if(!$this->paused) { - if($blocking === true) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); - } else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); - } + if (!$this->paused) { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); - $job = $this->reserve($blocking, $interval); + $job = $this->reserve(); + } else { + $this->updateProcLine('Paused'); } - if(!$job) { + if (!$job) { // For an interval of 0, break now - helps with unit testing etc - if($interval == 0) { + if ($interval == 0) { break; } - if($blocking === false) - { - // If no job was found, we sleep for $interval before continuing and checking again + // If no job was found, we sleep for $interval before continuing and checking again + if ($this->reserver->waitAfterReservationAttempt()) { $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); - if($this->paused) { - $this->updateProcLine('Paused'); - } - else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); - } - - usleep($interval * 1000000); - } + usleep($interval * 1000000); + } continue; } @@ -252,33 +268,11 @@ public function perform(Resque_Job $job) /** * @param bool $blocking * @param int $timeout - * @return object|boolean Instance of Resque_Job if a job is found, false if not. + * @return object|boolean Instance of Resque_Job if a job is found, false if not. */ - public function reserve($blocking = false, $timeout = null) + public function reserve() { - $queues = $this->queues(); - if(!is_array($queues)) { - return; - } - - if($blocking === true) { - $job = Resque_Job::reserveBlocking($queues, $timeout); - if($job) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); - return $job; - } - } else { - foreach($queues as $queue) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); - $job = Resque_Job::reserve($queue); - if($job) { - $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); - return $job; - } - } - } - - return false; + return $this->reserver->reserve() ?: false; } /** diff --git a/test/Resque/Tests/EventTest.php b/test/Resque/Tests/EventTest.php index 6e102cf4..02149e2d 100644 --- a/test/Resque/Tests/EventTest.php +++ b/test/Resque/Tests/EventTest.php @@ -1,4 +1,7 @@ createDefaultReserver(array('jobs')); + // Register a worker to test with - $this->worker = new Resque_Worker('jobs'); - $this->worker->setLogger(new Resque_Log()); + $this->worker = new Resque_Worker($reserver, 'jobs'); + $this->worker->setLogger($logger); $this->worker->registerWorker(); } diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php index d751c37f..accd375e 100644 --- a/test/Resque/Tests/JobStatusTest.php +++ b/test/Resque/Tests/JobStatusTest.php @@ -1,4 +1,7 @@ createDefaultReserver(array('jobs')); + // Register a worker to test with - $this->worker = new Resque_Worker('jobs'); - $this->worker->setLogger(new Resque_Log()); + $this->worker = new Resque_Worker($reserver, 'jobs'); + $this->worker->setLogger($logger); } public function testJobStatusCanBeTracked() @@ -103,4 +110,4 @@ public function testRecreatedJobWithTrackingStillTracksStatus() $newJob = Resque_Job::reserve('jobs'); $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $newJob->getStatus()); } -} \ No newline at end of file +} diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index fb55d13b..3797f6a1 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -1,5 +1,7 @@ createDefaultReserver(array('jobs')); + // Register a worker to test with - $this->worker = new Resque_Worker('jobs'); - $this->worker->setLogger(new Resque_Log()); + $this->worker = new Resque_Worker($reserver, 'jobs'); + $this->worker->setLogger($logger); $this->worker->registerWorker(); } @@ -153,7 +159,7 @@ public function testInvalidJobThrowsException() $job->worker = $this->worker; $job->perform(); } - + public function testJobWithSetUpCallbackFiresSetUp() { $payload = array( @@ -165,10 +171,10 @@ public function testJobWithSetUpCallbackFiresSetUp() ); $job = new Resque_Job('jobs', $payload); $job->perform(); - + $this->assertTrue(Test_Job_With_SetUp::$called); } - + public function testJobWithTearDownCallbackFiresTearDown() { $payload = array( @@ -180,7 +186,7 @@ public function testJobWithTearDownCallbackFiresTearDown() ); $job = new Resque_Job('jobs', $payload); $job->perform(); - + $this->assertTrue(Test_Job_With_TearDown::$called); } @@ -329,7 +335,7 @@ public function testDequeueItemWithArg() $this->assertEquals(Resque::dequeue($queue, $test), 1); #$this->assertEquals(Resque::size($queue), 1); } - + public function testDequeueSeveralItemsWithArgs() { // GIVEN @@ -340,11 +346,11 @@ public function testDequeueSeveralItemsWithArgs() Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs); Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs); $this->assertEquals(Resque::size($queue), 3); - + // WHEN $test = array('Test_Job_Dequeue9' => $removeArgs); $removedItems = Resque::dequeue($queue, $test); - + // THEN $this->assertEquals($removedItems, 2); $this->assertEquals(Resque::size($queue), 1); diff --git a/test/Resque/Tests/Reserver/AbstractReserverTest.php b/test/Resque/Tests/Reserver/AbstractReserverTest.php new file mode 100644 index 00000000..34b1828a --- /dev/null +++ b/test/Resque/Tests/Reserver/AbstractReserverTest.php @@ -0,0 +1,59 @@ +assertEquals($this->reserverName, $this->getReserver()->getName()); + } + + public function testGetQueuesReturnsConfiguredQueues() + { + $queues = array( + 'queue_' . rand(1, 100), + 'queue_' . rand(101, 200), + 'queue_' . rand(201, 300), + ); + $this->assertEquals($queues, $this->getReserver($queues)->getQueues()); + } + + public function testGetQueuesWithAsterixQueueReturnsAllQueuesFromRedisInSortedOrder() + { + $queues = array( + 'queue_b', + 'queue_c', + 'queue_d', + 'queue_a', + ); + + // register queues in redis + foreach ($queues as $queue) { + Resque::redis()->sadd('queues', $queue); + } + + $expected = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + ); + + $this->assertEquals($expected, $this->getReserver(array('*'))->getQueues()); + } +} diff --git a/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php b/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php new file mode 100644 index 00000000..9664db54 --- /dev/null +++ b/test/Resque/Tests/Reserver/BlockingListPopReserverTest.php @@ -0,0 +1,101 @@ +assertFalse($this->getReserver()->waitAfterReservationAttempt()); + } + + public function testReserverWhenNoJobsEnqueuedReturnsNull() + { + $queues = array( + 'queue_1', + 'queue_2', + 'queue_3', + ); + + $redisQueues = array( + 'queue:queue_1', + 'queue:queue_2', + 'queue:queue_3', + ); + + // hhvm doesn't respect the timeout arg for blpop, so we need to mock this command + // https://github.com/facebook/hhvm/issues/6286 + $redis = $this->getMockBuilder('\Resque_Redis') + ->disableOriginalConstructor() + ->setMethods(['__call']) + ->getMock(); + + $redis + ->expects($this->once()) + ->method('__call') + ->with($this->equalTo('blpop'), $this->equalTo(array($redisQueues, 1))) + ->will($this->returnValue(null)); + + $originalRedis = Resque::$redis; + + Resque::$redis = $redis; + + $this->assertNull($this->getReserver($queues)->reserve()); + + Resque::$redis = $originalRedis; + } + + public function testReserveCallsBlpopWithTimeout() + { + $timeout = rand(1, 100); + + $queues = array( + 'high', + 'medium', + 'low', + ); + + $redisQueues = array( + 'queue:high', + 'queue:medium', + 'queue:low', + ); + + $payload = array('class' => 'Test_Job'); + $item = array('resque:queue:high', json_encode($payload)); + + $redis = $this->getMockBuilder('\Resque_Redis') + ->disableOriginalConstructor() + ->setMethods(['__call']) + ->getMock(); + + $redis + ->expects($this->once()) + ->method('__call') + ->with($this->equalTo('blpop'), $this->equalTo(array($redisQueues, $timeout))) + ->will($this->returnValue($item)); + + $originalRedis = Resque::$redis; + + Resque::$redis = $redis; + + $job = $this->getReserver($queues, $timeout)->reserve(); + $this->assertEquals('high', $job->queue); + $this->assertEquals($payload, $job->payload); + + Resque::$redis = $originalRedis; + } +} diff --git a/test/Resque/Tests/Reserver/QueueOrderReserverTest.php b/test/Resque/Tests/Reserver/QueueOrderReserverTest.php new file mode 100644 index 00000000..c19e2d1e --- /dev/null +++ b/test/Resque/Tests/Reserver/QueueOrderReserverTest.php @@ -0,0 +1,74 @@ +assertTrue($this->getReserver()->waitAfterReservationAttempt()); + } + + public function testReserverWhenNoJobsEnqueuedReturnsNull() + { + $queues = array( + 'queue_1', + 'queue_2', + 'queue_3', + ); + $this->assertNull($this->getReserver($queues)->reserve()); + } + + public function testReserveReservesJobsInSpecifiedQueueOrder() + { + $queues = array( + 'high', + 'medium', + 'low', + ); + $reserver = $this->getReserver($queues); + + // Queue the jobs in a different order + Resque::enqueue('low', 'Low_Job_1'); + Resque::enqueue('high', 'High_Job_1'); + Resque::enqueue('medium', 'Medium_Job_1'); + Resque::enqueue('medium', 'Medium_Job_2'); + Resque::enqueue('high', 'High_Job_2'); + Resque::enqueue('low', 'Low_Job_2'); + + // Now check we get the jobs back in the right order + $job = $reserver->reserve(); + $this->assertEquals('high', $job->queue); + $this->assertEquals('High_Job_1', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('high', $job->queue); + $this->assertEquals('High_Job_2', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('medium', $job->queue); + $this->assertEquals('Medium_Job_1', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('medium', $job->queue); + $this->assertEquals('Medium_Job_2', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('low', $job->queue); + $this->assertEquals('Low_Job_1', $job->payload['class']); + + $job = $reserver->reserve(); + $this->assertEquals('low', $job->queue); + $this->assertEquals('Low_Job_2', $job->payload['class']); + } +} diff --git a/test/Resque/Tests/Reserver/RandomQueueOrderReserverTest.php b/test/Resque/Tests/Reserver/RandomQueueOrderReserverTest.php new file mode 100644 index 00000000..a74d8cff --- /dev/null +++ b/test/Resque/Tests/Reserver/RandomQueueOrderReserverTest.php @@ -0,0 +1,134 @@ +assertEquals('RandomQueueOrderReserver', $this->getReserver()->getName()); + } + + public function testWaitAfterReservationAttemptReturnsTrue() + { + $this->assertTrue($this->getReserver()->waitAfterReservationAttempt()); + } + + private function assertQueuesAreShuffled(RandomQueueOrderReserver $reserver, array $queues) + { + // retrieve the queues 20 times + $shuffledQueues = array(); + for ($x = 0; $x < 20; $x++) { + $shuffledQueues[] = $reserver->getQueues(); + } + + $ordered = 0; + foreach ($shuffledQueues as $shuffled) { + // check if the order + if ($shuffled === $queues) { + $ordered++; + } + + // check that the shuffled queues contain all the right elements though + sort($shuffled); + $this->assertEquals($queues, $shuffled); + } + + // if all the shuffled queues were actually returned in sorted order then the shuffling is (unlikely) to be working + $this->assertNotEquals(20, $ordered, "queues were ordered 20 times; queues not shuffled correctly"); + } + + public function testGetQueuesReturnsConfiguredQueuesInShuffledOrder() + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + 'queue_e', + 'queue_f', + ); + + $reserver = $this->getReserver($queues); + + $this->assertQueuesAreShuffled($reserver, $queues); + } + + public function testGetQueuesWithAsterixQueueReturnsAllQueuesFromRedisInShuffledOrder() + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + 'queue_e', + 'queue_f', + ); + + // register queues in redis + foreach ($queues as $queue) { + Resque::redis()->sadd('queues', $queue); + } + + $reserver = $this->getReserver(array('*')); + + $this->assertQueuesAreShuffled($reserver, $queues); + } + + public function testReserverWhenNoJobsEnqueuedReturnsNull() + { + $queues = array( + 'queue_1', + 'queue_2', + 'queue_3', + ); + $this->assertNull($this->getReserver($queues)->reserve()); + } + + public function testReserveReservesJobsFromRandomQueue() + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + 'queue_e', + 'queue_f', + ); + + $reserver = $this->getReserver($queues); + + $jobsPerQueue = 5; + + // enqueue a bunch of jobs in each queue + foreach ($queues as $queue) { + for ($x = 0; $x < $jobsPerQueue; $x++) { + $queuesForAllJobs[] = $queue; + Resque::enqueue($queue, 'Test_Job'); + } + } + + $totalJobs = count($queues) * $jobsPerQueue; + + // track the queue for each reserved job + $reservedQueues = array(); + for ($x = 0; $x < $totalJobs; $x++) { + $job = $reserver->reserve(); + $this->assertNotNull($job); + $reservedQueues[] = $job->queue; + } + + // if jobs are reserved randomly, then $queueOrder shouldn't be ordered + $orderedQueues = $reservedQueues; + sort($orderedQueues); + $this->assertNotEquals($orderedQueues, $reservedQueues, "queues were ordered; queues not shuffled correctly"); + } +} diff --git a/test/Resque/Tests/Reserver/ReserverFactoryTest.php b/test/Resque/Tests/Reserver/ReserverFactoryTest.php new file mode 100644 index 00000000..e20106a9 --- /dev/null +++ b/test/Resque/Tests/Reserver/ReserverFactoryTest.php @@ -0,0 +1,111 @@ +getFactory()->createReserverFromName('foo', array()); + } + + public function createReserverFromNameDataProvider() + { + return array( + array('queue_order', '\Resque\Reserver\QueueOrderReserver'), + array('RANDOM_QUEUE_ORDER', '\Resque\Reserver\RandomQueueOrderReserver'), + array('Blocking_List_Pop', '\Resque\Reserver\BlockingListPopReserver'), + ); + } + + /** + * @dataProvider createReserverFromNameDataProvider + */ + public function testCreateReserverFromNameCreatesExpectedReserver($name, $expectedReserver) + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + ); + + $reserver = $this->getFactory()->createReserverFromName($name, $queues); + $this->assertInstanceOf($expectedReserver, $reserver); + + // account for shuffling by RandomQueueOrderReserver + $actualQueues = $reserver->getQueues(); + sort($actualQueues); + + $this->assertEquals($queues, $actualQueues); + } + + public function testCreateDefaultReserverCreatesExpectedReserver() + { + $reserver = $this->getFactory()->createDefaultReserver(array()); + $this->assertInstanceOf('\Resque\Reserver\QueueOrderReserver', $reserver); + } + + public function createReserverFromEnvironmentDataProvider() + { + return array( + array(array('BLOCKING=1'), '\Resque\Reserver\BlockingListPopReserver'), + array(array('BLOCKING=0', 'RESERVER=random_queue_order'), '\Resque\Reserver\RandomQueueOrderReserver'), + array(array('BLOCKING=', 'RESERVER=random_queue_order'), '\Resque\Reserver\RandomQueueOrderReserver'), + array(array('RESERVER=Queue_Order'), '\Resque\Reserver\QueueOrderReserver'), + array(array(), '\Resque\Reserver\QueueOrderReserver'), + ); + } + + /** + * @dataProvider createReserverFromEnvironmentDataProvider + */ + public function testCreateReserverFromEnvironmentCreatesExpectedReserver($env, $expectedReserver) + { + $queues = array( + 'queue_a', + 'queue_b', + 'queue_c', + 'queue_d', + ); + + foreach ($env as $var) { + putenv($var); + } + + $reserver = $this->getFactory()->createReserverFromEnvironment($queues); + $this->assertInstanceOf($expectedReserver, $reserver); + + // account for shuffling by RandomQueueOrderReserver + $actualQueues = $reserver->getQueues(); + sort($actualQueues); + + $this->assertEquals($queues, $actualQueues); + + putenv('BLOCKING'); + putenv('RESERVER'); + } + + /** + * @expectedException Resque\Reserver\UnknownReserverException + * @expectedExceptionMessage Unknown reserver 'foobar' + */ + public function testCreateReserverFromEnvironmentThrowsExceptionForUnknownReserver() + { + putenv('RESERVER=foobar'); + $this->getFactory()->createReserverFromEnvironment(array()); + putenv('RESERVER'); + } +} diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php index 93c0621a..e0115dd3 100644 --- a/test/Resque/Tests/WorkerTest.php +++ b/test/Resque/Tests/WorkerTest.php @@ -1,4 +1,7 @@ createDefaultReserver($queues); + + $worker = new Resque_Worker($reserver, $queues); + $worker->setLogger($logger); + + return $worker; + } + public function testWorkerRegistersInList() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->registerWorker(); // Make sure the worker is in the list @@ -23,8 +44,7 @@ public function testGetAllWorkers() $num = 3; // Register a few workers for($i = 0; $i < $num; ++$i) { - $worker = new Resque_Worker('queue_' . $i); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('queue_' . $i)); $worker->registerWorker(); } @@ -34,8 +54,7 @@ public function testGetAllWorkers() public function testGetWorkerById() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->registerWorker(); $newWorker = Resque_Worker::find((string)$worker); @@ -49,8 +68,7 @@ public function testInvalidWorkerDoesNotExist() public function testWorkerCanUnregister() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->registerWorker(); $worker->unregisterWorker(); @@ -61,8 +79,7 @@ public function testWorkerCanUnregister() public function testPausedWorkerDoesNotPickUpJobs() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->pauseProcessing(); Resque::enqueue('jobs', 'Test_Job'); $worker->work(0); @@ -72,8 +89,7 @@ public function testPausedWorkerDoesNotPickUpJobs() public function testResumedWorkerPicksUpJobs() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->pauseProcessing(); Resque::enqueue('jobs', 'Test_Job'); $worker->work(0); @@ -85,11 +101,10 @@ public function testResumedWorkerPicksUpJobs() public function testWorkerCanWorkOverMultipleQueues() { - $worker = new Resque_Worker(array( + $worker = $this->getWorker(array( 'queue1', - 'queue2' + 'queue2', )); - $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('queue1', 'Test_Job_1'); Resque::enqueue('queue2', 'Test_Job_2'); @@ -103,12 +118,11 @@ public function testWorkerCanWorkOverMultipleQueues() public function testWorkerWorksQueuesInSpecifiedOrder() { - $worker = new Resque_Worker(array( + $worker = $this->getWorker(array( 'high', 'medium', - 'low' + 'low', )); - $worker->setLogger(new Resque_Log()); $worker->registerWorker(); // Queue the jobs in a different order @@ -129,8 +143,7 @@ public function testWorkerWorksQueuesInSpecifiedOrder() public function testWildcardQueueWorkerWorksAllQueues() { - $worker = new Resque_Worker('*'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('*')); $worker->registerWorker(); Resque::enqueue('queue1', 'Test_Job_1'); @@ -145,8 +158,7 @@ public function testWildcardQueueWorkerWorksAllQueues() public function testWorkerDoesNotWorkOnUnknownQueues() { - $worker = new Resque_Worker('queue1'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('queue1')); $worker->registerWorker(); Resque::enqueue('queue2', 'Test_Job'); @@ -156,8 +168,7 @@ public function testWorkerDoesNotWorkOnUnknownQueues() public function testWorkerClearsItsStatusWhenNotWorking() { Resque::enqueue('jobs', 'Test_Job'); - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $job = $worker->reserve(); $worker->workingOn($job); $worker->doneWorking(); @@ -166,8 +177,7 @@ public function testWorkerClearsItsStatusWhenNotWorking() public function testWorkerRecordsWhatItIsWorkingOn() { - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->registerWorker(); $payload = array( @@ -189,8 +199,7 @@ public function testWorkerErasesItsStatsWhenShutdown() Resque::enqueue('jobs', 'Test_Job'); Resque::enqueue('jobs', 'Invalid_Job'); - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->work(0); $worker->work(0); @@ -201,19 +210,16 @@ public function testWorkerErasesItsStatsWhenShutdown() public function testWorkerCleansUpDeadWorkersOnStartup() { // Register a good worker - $goodWorker = new Resque_Worker('jobs'); - $goodWorker->setLogger(new Resque_Log()); + $goodWorker = $this->getWorker(array('jobs')); $goodWorker->registerWorker(); $workerId = explode(':', $goodWorker); // Register some bad workers - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->setId($workerId[0].':1:jobs'); $worker->registerWorker(); - $worker = new Resque_Worker(array('high', 'low')); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('high', 'low')); $worker->setId($workerId[0].':2:high,low'); $worker->registerWorker(); @@ -228,15 +234,13 @@ public function testWorkerCleansUpDeadWorkersOnStartup() public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() { // Register a bad worker on this machine - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $workerId = explode(':', $worker); $worker->setId($workerId[0].':1:jobs'); $worker->registerWorker(); // Register some other false workers - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->setId('my.other.host:1:jobs'); $worker->registerWorker(); @@ -252,8 +256,7 @@ public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() public function testWorkerFailsUncompletedJobsOnExit() { - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->registerWorker(); $payload = array( @@ -269,8 +272,7 @@ public function testWorkerFailsUncompletedJobsOnExit() public function testBlockingListPop() { - $worker = new Resque_Worker('jobs'); - $worker->setLogger(new Resque_Log()); + $worker = $this->getWorker(array('jobs')); $worker->registerWorker(); Resque::enqueue('jobs', 'Test_Job_1'); @@ -290,4 +292,4 @@ public function testBlockingListPop() $this->assertEquals(2, $i); } -} \ No newline at end of file +} diff --git a/test/bootstrap.php b/test/bootstrap.php index a4b68377..6e71ceb7 100644 --- a/test/bootstrap.php +++ b/test/bootstrap.php @@ -8,7 +8,8 @@ */ $loader = require __DIR__ . '/../vendor/autoload.php'; -$loader->add('Resque_Tests', __DIR__); + +use Resque\Reserver\ReserverFactory; define('TEST_MISC', realpath(__DIR__ . '/misc/')); define('REDIS_CONF', TEST_MISC . '/redis.conf'); @@ -37,6 +38,9 @@ Resque::setBackend('localhost:' . $matches[1]); +$reserverFactory = new ReserverFactory(new Resque_Log()); +Resque_Worker::setReserverFactory($reserverFactory); + // Shutdown function killRedis($pid) {