4
4
use Enqueue \Client \CommandSubscriberInterface ;
5
5
use Enqueue \Consumption \QueueSubscriberInterface ;
6
6
use Enqueue \Util \JSON ;
7
- use FOS \ElasticaBundle \Persister \ObjectPersisterInterface ;
7
+ use FOS \ElasticaBundle \Persister \PersisterRegistry ;
8
8
use FOS \ElasticaBundle \Provider \IndexableInterface ;
9
9
use Interop \Queue \PsrContext ;
10
10
use Interop \Queue \PsrMessage ;
14
14
final class SyncIndexWithDoctrineORMObjectChangeProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
15
15
{
16
16
/**
17
- * @var ObjectPersisterInterface
17
+ * @var PersisterRegistry
18
18
*/
19
- private $ objectPersister ;
19
+ private $ persisterRegistry ;
20
20
21
21
/**
22
22
* @var IndexableInterface
@@ -28,9 +28,9 @@ final class SyncIndexWithDoctrineORMObjectChangeProcessor implements PsrProcesso
28
28
*/
29
29
private $ doctrine ;
30
30
31
- public function __construct (RegistryInterface $ doctrine , ObjectPersisterInterface $ objectPersister , IndexableInterface $ indexable )
31
+ public function __construct (RegistryInterface $ doctrine , PersisterRegistry $ persisterRegistry , IndexableInterface $ indexable )
32
32
{
33
- $ this ->objectPersister = $ objectPersister ;
33
+ $ this ->persisterRegistry = $ persisterRegistry ;
34
34
$ this ->indexable = $ indexable ;
35
35
$ this ->doctrine = $ doctrine ;
36
36
}
@@ -46,42 +46,43 @@ public function process(PsrMessage $message, PsrContext $context)
46
46
return self ::REJECT ;
47
47
}
48
48
49
- $ indexName = $ data ['indexName ' ];
50
- $ typeName = $ data ['typeName ' ];
49
+ $ index = $ data ['indexName ' ];
50
+ $ type = $ data ['typeName ' ];
51
51
52
- $ objectRepository = $ this ->doctrine ->getManagerForClass ($ data ['modelClass ' ])->getRepository ($ data ['modelClass ' ]);
52
+ $ repository = $ this ->doctrine ->getManagerForClass ($ data ['modelClass ' ])->getRepository ($ data ['modelClass ' ]);
53
+ $ persister = $ this ->persisterRegistry ->getPersister ($ index , $ type );
53
54
54
55
switch ($ data ['action ' ]) {
55
56
case 'update ' :
56
- if (false == $ object = $ objectRepository ->find ($ data ['id ' ])) {
57
- $ this -> objectPersister ->deleteById ($ data ['id ' ]);
57
+ if (false == $ object = $ repository ->find ($ data ['id ' ])) {
58
+ $ persister ->deleteById ($ data ['id ' ]);
58
59
59
60
return self ::REJECT ;
60
61
}
61
62
62
- if ($ this -> objectPersister ->handlesObject ($ object )) {
63
- if ($ this ->indexable ->isObjectIndexable ($ indexName , $ typeName , $ object )) {
64
- $ this -> objectPersister ->replaceOne ($ object );
63
+ if ($ persister ->handlesObject ($ object )) {
64
+ if ($ this ->indexable ->isObjectIndexable ($ index , $ type , $ object )) {
65
+ $ persister ->replaceOne ($ object );
65
66
} else {
66
- $ this -> objectPersister ->deleteOne ($ object );
67
+ $ persister ->deleteOne ($ object );
67
68
}
68
69
}
69
70
70
71
break ;
71
72
case 'insert ' :
72
- if (false == $ object = $ objectRepository ->find ($ data ['id ' ])) {
73
- $ this -> objectPersister ->deleteById ($ data ['id ' ]);
73
+ if (false == $ object = $ repository ->find ($ data ['id ' ])) {
74
+ $ persister ->deleteById ($ data ['id ' ]);
74
75
75
76
return self ::REJECT ;
76
77
}
77
78
78
- if ($ this -> objectPersister -> handlesObject ($ object ) && $ this ->indexable ->isObjectIndexable ($ indexName , $ typeName , $ object )) {
79
- $ this -> objectPersister ->insertOne ($ object );
79
+ if ($ persister -> handlesObject ($ object ) && $ this ->indexable ->isObjectIndexable ($ index , $ type , $ object )) {
80
+ $ persister ->insertOne ($ object );
80
81
}
81
82
82
83
break ;
83
84
case 'delete ' :
84
- $ this -> objectPersister ->deleteById ($ data ['id ' ]);
85
+ $ persister ->deleteById ($ data ['id ' ]);
85
86
86
87
break ;
87
88
default :
0 commit comments