Skip to content

Commit 8119cc4

Browse files
committed
[doctrine] Sync index with doctrine orm object change.
1 parent 82f6560 commit 8119cc4

File tree

4 files changed

+204
-7
lines changed

4 files changed

+204
-7
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Doctrine;
3+
4+
use Doctrine\Common\Persistence\Event\LifecycleEventArgs;
5+
use Enqueue\ElasticaBundle\Queue\Commands;
6+
use Enqueue\Util\JSON;
7+
use Interop\Queue\PsrContext;
8+
use Doctrine\Common\EventSubscriber;
9+
10+
final class SyncIndexWithObjectChangeListener implements EventSubscriber
11+
{
12+
/**
13+
* @var PsrContext
14+
*/
15+
private $context;
16+
17+
/**
18+
* @var string
19+
*/
20+
private $modelClass;
21+
22+
/**
23+
* @var array
24+
*/
25+
private $config;
26+
27+
public function __construct(PsrContext $context, $modelClass, array $config)
28+
{
29+
$this->context = $context;
30+
$this->modelClass = $modelClass;
31+
$this->config = $config;
32+
}
33+
34+
public function postUpdate(LifecycleEventArgs $args)
35+
{
36+
if ($args->getObject() instanceof $this->modelClass) {
37+
$this->sendUpdateIndexMessage('update', $args);
38+
}
39+
}
40+
41+
public function postPersist(LifecycleEventArgs $args)
42+
{
43+
if ($args->getObject() instanceof $this->modelClass) {
44+
$this->sendUpdateIndexMessage('insert', $args);
45+
}
46+
}
47+
48+
public function preRemove(LifecycleEventArgs $args)
49+
{
50+
if ($args->getObject() instanceof $this->modelClass) {
51+
$this->sendUpdateIndexMessage('remove', $args);
52+
}
53+
}
54+
55+
public function getSubscribedEvents()
56+
{
57+
return [
58+
'postPersist',
59+
'postUpdate',
60+
'preRemove',
61+
];
62+
}
63+
64+
/**
65+
* @param string $action
66+
* @param LifecycleEventArgs $args
67+
*/
68+
private function sendUpdateIndexMessage($action, LifecycleEventArgs $args)
69+
{
70+
$object = $args->getObject();
71+
72+
$rp = new \ReflectionProperty($object, $this->config['identifier']);
73+
$rp->setAccessible(true);
74+
$id = $rp->getValue($object);
75+
$rp->setAccessible(false);
76+
77+
$queue = $this->context->createQueue(Commands::SYNC_INDEX_WITH_DOCTRINE_ORM_OBJECT_CHANGE);
78+
79+
$message = $this->context->createMessage(JSON::encode([
80+
'action' => $action,
81+
'modelClass' => $this->modelClass,
82+
'id' => $id,
83+
'indexName' => $this->config['indexName'],
84+
'typeName' => $this->config['typeName'],
85+
]));
86+
87+
$this->context->createProducer()->send($queue, $message);
88+
}
89+
}

Queue/Commands.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ final class Commands
55
{
66
const POPULATE = 'fos_elastica_populate';
77

8+
const SYNC_INDEX_WITH_DOCTRINE_ORM_OBJECT_CHANGE = 'fos_elastica_sync_index_with_doctrine_orm_object_change';
9+
810
private function __construct()
911
{
1012
}

Queue/PopulateProcessor.php

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,15 @@
44
use Enqueue\Client\CommandSubscriberInterface;
55
use Enqueue\Consumption\QueueSubscriberInterface;
66
use Enqueue\Consumption\Result;
7-
use FOS\ElasticaBundle\Persister\Event\Events;
8-
use FOS\ElasticaBundle\Persister\Event\PostInsertObjectsEvent;
9-
use FOS\ElasticaBundle\Persister\Event\PreInsertObjectsEvent;
107
use FOS\ElasticaBundle\Persister\InPlacePagerPersister;
118
use FOS\ElasticaBundle\Persister\PagerPersisterRegistry;
12-
use FOS\ElasticaBundle\Provider\PagerInterface;
139
use FOS\ElasticaBundle\Provider\PagerProviderRegistry;
1410
use Interop\Queue\PsrContext;
1511
use Interop\Queue\PsrMessage;
1612
use Interop\Queue\PsrProcessor;
1713
use Enqueue\Util\JSON;
18-
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
19-
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
2014

21-
class PopulateProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
15+
final class PopulateProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
2216
{
2317
/**
2418
* @var PagerProviderRegistry
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Queue;
3+
4+
use Enqueue\Client\CommandSubscriberInterface;
5+
use Enqueue\Consumption\QueueSubscriberInterface;
6+
use Enqueue\Util\JSON;
7+
use FOS\ElasticaBundle\Persister\ObjectPersisterInterface;
8+
use FOS\ElasticaBundle\Provider\IndexableInterface;
9+
use Interop\Queue\PsrContext;
10+
use Interop\Queue\PsrMessage;
11+
use Interop\Queue\PsrProcessor;
12+
use Symfony\Bridge\Doctrine\RegistryInterface;
13+
14+
final class SyncIndexWithDoctrineORMObjectChangeProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
15+
{
16+
/**
17+
* @var ObjectPersisterInterface
18+
*/
19+
private $objectPersister;
20+
21+
/**
22+
* @var IndexableInterface
23+
*/
24+
private $indexable;
25+
26+
/**
27+
* @var RegistryInterface
28+
*/
29+
private $doctrine;
30+
31+
public function __construct(RegistryInterface $doctrine, ObjectPersisterInterface $objectPersister, IndexableInterface $indexable)
32+
{
33+
$this->objectPersister = $objectPersister;
34+
$this->indexable = $indexable;
35+
$this->doctrine = $doctrine;
36+
}
37+
38+
/**
39+
* {@inheritdoc}
40+
*/
41+
public function process(PsrMessage $message, PsrContext $context)
42+
{
43+
$data = JSON::decode($message->getBody());
44+
45+
if (false == isset($data['action'], $data['modelClass'], $data['id'], $data['indexName'], $data['typeName'])) {
46+
return self::REJECT;
47+
}
48+
49+
$indexName = $data['indexName'];
50+
$typeName = $data['typeName'];
51+
52+
$objectRepository = $this->doctrine->getManagerForClass($data['modelClass'])->getRepository($data['modelClass']);
53+
54+
switch ($data['action']) {
55+
case 'update':
56+
if (false == $object = $objectRepository->find($data['id'])) {
57+
$this->objectPersister->deleteById($data['id']);
58+
59+
return self::REJECT;
60+
}
61+
62+
if ($this->objectPersister->handlesObject($object)) {
63+
if ($this->indexable->isObjectIndexable($indexName, $typeName, $object)) {
64+
$this->objectPersister->replaceOne($object);
65+
} else {
66+
$this->objectPersister->deleteOne($object);
67+
}
68+
}
69+
70+
break;
71+
case 'insert':
72+
if (false == $object = $objectRepository->find($data['id'])) {
73+
$this->objectPersister->deleteById($data['id']);
74+
75+
return self::REJECT;
76+
}
77+
78+
if ($this->objectPersister->handlesObject($object) && $this->indexable->isObjectIndexable($indexName, $typeName, $object)) {
79+
$this->objectPersister->insertOne($object);
80+
}
81+
82+
break;
83+
case 'delete':
84+
$this->objectPersister->deleteById($data['id']);
85+
86+
break;
87+
default:
88+
return self::REJECT;
89+
}
90+
}
91+
92+
/**
93+
* {@inheritdoc}
94+
*/
95+
public static function getSubscribedCommand()
96+
{
97+
return [
98+
'processorName' => Commands::SYNC_INDEX_WITH_DOCTRINE_ORM_OBJECT_CHANGE,
99+
'queueName' => Commands::SYNC_INDEX_WITH_DOCTRINE_ORM_OBJECT_CHANGE,
100+
'queueNameHardcoded' => true,
101+
'exclusive' => true,
102+
];
103+
}
104+
105+
/**
106+
* {@inheritdoc}
107+
*/
108+
public static function getSubscribedQueues()
109+
{
110+
return [Commands::SYNC_INDEX_WITH_DOCTRINE_ORM_OBJECT_CHANGE];
111+
}
112+
}

0 commit comments

Comments
 (0)