Skip to content

Commit 96c4f08

Browse files
author
Maksym Kotliar
committed
init
0 parents  commit 96c4f08

File tree

11 files changed

+339
-0
lines changed

11 files changed

+339
-0
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

Async/ElasticaPopulateProcessor.php

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Async;
3+
4+
use Enqueue\Psr\Context;
5+
use Enqueue\Psr\Message;
6+
use Enqueue\Psr\Processor;
7+
use Enqueue\Util\JSON;
8+
use FOS\ElasticaBundle\Provider\ProviderRegistry;
9+
10+
class ElasticaPopulateProcessor implements Processor
11+
{
12+
/**
13+
* @var ProviderRegistry
14+
*/
15+
private $providerRegistry;
16+
17+
/**
18+
* @param ProviderRegistry $providerRegistry
19+
*/
20+
public function __construct(ProviderRegistry $providerRegistry)
21+
{
22+
$this->providerRegistry = $providerRegistry;
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*/
28+
public function process(Message $message, Context $context)
29+
{
30+
if (false == $message->getReplyTo()) {
31+
return self::REJECT;
32+
}
33+
34+
if ($message->isRedelivered()) {
35+
$replyMessage = $context->createMessage(false);
36+
$replyQueue = $context->createQueue($message->getReplyTo());
37+
$context->createProducer()->send($replyQueue, $replyMessage);
38+
39+
return self::REJECT;
40+
}
41+
42+
$options = JSON::decode($message->getBody());
43+
44+
$provider = $this->providerRegistry->getProvider($options['indexName'], $options['typeName']);
45+
$provider->populate(null, $options);
46+
47+
$this->sendReply($context, $message->getReplyTo(), true);
48+
49+
return self::ACK;
50+
}
51+
52+
/**
53+
* @param Context $context
54+
* @param string $replyTo
55+
* @param bool $message
56+
*/
57+
private function sendReply(Context $context, $replyTo, $message)
58+
{
59+
$replyMessage = $context->createMessage($message);
60+
$replyQueue = $context->createQueue($replyTo);
61+
$context->createProducer()->send($replyQueue, $replyMessage);
62+
}
63+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\DependencyInjection\Compiler;
3+
4+
use Enqueue\ElasticaBundle\Elastica\AsyncDoctrineOrmProvider;
5+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
6+
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
use Symfony\Component\DependencyInjection\Reference;
8+
9+
class AsyncProviderCompilerPass implements CompilerPassInterface
10+
{
11+
/**
12+
* {@inheritdoc}
13+
*/
14+
public function process(ContainerBuilder $container)
15+
{
16+
foreach ($container->getExtensionConfig('fos_elastica') as $config) {
17+
foreach ($config['indexes'] as $index => $indexData) {
18+
foreach ($indexData['types'] as $type => $typeData) {
19+
if ('orm' != $typeData['persistence']['driver']) {
20+
continue;
21+
}
22+
23+
$providerId = sprintf('fos_elastica.provider.%s.%s', $index, $type);
24+
if (false == $container->hasDefinition($providerId)) {
25+
continue;
26+
}
27+
28+
$provider = $container->getDefinition($providerId);
29+
$provider->setClass(AsyncDoctrineOrmProvider::class);
30+
$provider->addMethodCall('setContext', [new Reference('enqueue.transport.context')]);
31+
}
32+
}
33+
}
34+
}
35+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
namespace Enqueue\ElasticaBundle\DependencyInjection;
4+
5+
use Symfony\Component\Config\FileLocator;
6+
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
8+
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
9+
10+
class EnqueueElasticaExtension extends Extension
11+
{
12+
/**
13+
* {@inheritdoc}
14+
*/
15+
public function load(array $configs, ContainerBuilder $container)
16+
{
17+
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
18+
$loader->load('services.yml');
19+
}
20+
}

Elastica/AsyncDoctrineOrmProvider.php

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Listener;
3+
4+
use Enqueue\Psr\Context;
5+
use Enqueue\Util\JSON;
6+
use FOS\ElasticaBundle\Doctrine\ORM\Provider;
7+
8+
class AsyncDoctrineOrmProvider extends Provider
9+
{
10+
private $batchSize;
11+
12+
/**
13+
* @var Context
14+
*/
15+
private $context;
16+
17+
/**
18+
* @param Context $context
19+
*/
20+
public function setContext(Context $context)
21+
{
22+
$this->context = $context;
23+
}
24+
25+
/**
26+
* {@inheritDoc}
27+
*/
28+
protected function doPopulate($options, \Closure $loggerClosure = null)
29+
{
30+
$this->batchSize = null;
31+
if ($options['real_populate']) {
32+
$this->batchSize = $options['offset'] + $options['batch_size'];
33+
34+
return parent::doPopulate($options, $loggerClosure);
35+
}
36+
37+
$queryBuilder = $this->createQueryBuilder($options['query_builder_method']);
38+
$nbObjects = $this->countObjects($queryBuilder);
39+
$offset = $options['offset'];
40+
41+
$queue = $this->context->createQueue('fos_elastica.populate');
42+
$resultQueue = $this->context->createTemporaryQueue();
43+
$consumer = $this->context->createConsumer($resultQueue);
44+
45+
$producer = $this->context->createProducer();
46+
47+
$nbMessages = 0;
48+
for (; $offset < $nbObjects; $offset += $options['batch_size']) {
49+
$options['offset'] = $offset;
50+
$options['real_populate'] = true;
51+
$message = $this->context->createMessage(JSON::encode($options));
52+
$message->setReplyTo($resultQueue->getQueueName());
53+
$producer->send($queue, $message);
54+
55+
$nbMessages++;
56+
}
57+
58+
$limitTime = time() + 180;
59+
while ($nbMessages) {
60+
if ($message = $consumer->receive(20000)) {
61+
if (null !== $loggerClosure) {
62+
$loggerClosure($options['batch_size'], $nbObjects);
63+
}
64+
65+
$consumer->acknowledge($message);
66+
67+
$nbMessages--;
68+
69+
$limitTime = time() + 180;
70+
}
71+
72+
if (time() > $limitTime) {
73+
throw new \LogicException(sprintf('No response in %d seconds', 180));
74+
}
75+
}
76+
}
77+
78+
/**
79+
* {@inheritDoc}
80+
*/
81+
protected function countObjects($queryBuilder)
82+
{
83+
return $this->batchSize ? $this->batchSize : parent::countObjects($queryBuilder);
84+
}
85+
86+
/**
87+
* {@inheritDoc}
88+
*/
89+
protected function configureOptions()
90+
{
91+
parent::configureOptions();
92+
93+
$this->resolver->setDefaults([
94+
'real_populate' => false,
95+
]);
96+
}
97+
}

EnqueueElasticaBundle.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace Enqueue\ElasticaBundle;
4+
5+
use Enqueue\ElasticaBundle\DependencyInjection\Compiler\AsyncProviderCompilerPass;
6+
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
use Symfony\Component\HttpKernel\Bundle\Bundle;
8+
9+
class EnqueueElasticaBundle extends Bundle
10+
{
11+
/**
12+
* {@inheritdoc}
13+
*/
14+
public function build(ContainerBuilder $container)
15+
{
16+
$container->addCompilerPass(new AsyncProviderCompilerPass());
17+
}
18+
}

LICENSE

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Copyright (c) 2017 Kotliar Maksym
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy
4+
of this software and associated documentation files (the "Software"), to deal
5+
in the Software without restriction, including without limitation the rights
6+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
copies of the Software, and to permit persons to whom the Software is furnished
8+
to do so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in all
11+
copies or substantial portions of the Software.
12+
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
THE SOFTWARE.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Listener;
3+
4+
use Enqueue\Psr\Context;
5+
use FOS\ElasticaBundle\Event\IndexPopulateEvent;
6+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
7+
8+
class PurgeFosElasticPopulateQueueListener implements EventSubscriberInterface
9+
{
10+
/**
11+
* @var Context
12+
*/
13+
private $context;
14+
15+
/**
16+
* @param Context $context
17+
*/
18+
public function __construct(Context $context)
19+
{
20+
$this->context = $context;
21+
}
22+
23+
public function onPreIndexPopulate(IndexPopulateEvent $event)
24+
{
25+
if (method_exists($this->context, 'purge')) {
26+
$queue = $this->context->createQueue('fos_elastica.populate');
27+
28+
$this->context->purge($queue);
29+
30+
sleep(20);
31+
}
32+
}
33+
34+
/**
35+
* {@inheritdoc}
36+
*/
37+
public static function getSubscribedEvents()
38+
{
39+
return [
40+
IndexPopulateEvent::PRE_INDEX_POPULATE => 'onPreIndexPopulate',
41+
];
42+
}
43+
}

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Enqueue Elastica Bundle
2+
3+
## License
4+
5+
It is released under the [MIT License](LICENSE).

Resources/config/services.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
services:
2+
enqueue_elastica.async.elastica_populate_processor:
3+
class: 'Enqueue\ElasticaBundle\Async\ElasticaPopulateProcessor'
4+
arguments:
5+
- '@fos_elastica.provider_registry'
6+
7+
app.listener.purge_fos_elastic_populate_queue:
8+
class: 'Enqueue\ElasticaBundle\Listener\PurgeFosElasticPopulateQueueListener'
9+
arguments:
10+
- '@enqueue.transport.context'
11+
tags:
12+
- { name: 'kernel.event_subscriber' }

0 commit comments

Comments
 (0)