@@ -26,19 +26,22 @@ public function __construct(CloudTasksClient $client, Request $request, OpenIdVe
26
26
*/
27
27
public function handle ($ task = null )
28
28
{
29
- $ this ->authorizeRequest ();
30
-
31
29
$ task = $ task ?: $ this ->captureTask ();
32
30
33
- $ this ->listenForEvents ();
31
+ $ command = unserialize ($ task ['data ' ]['command ' ]);
32
+ $ connection = $ command ->connection ?? 'cloudtasks ' ;
33
+
34
+ $ this ->authorizeRequest ($ connection );
35
+
36
+ $ this ->listenForEvents ($ connection );
34
37
35
- $ this ->handleTask ($ task );
38
+ $ this ->handleTask ($ connection , $ task );
36
39
}
37
40
38
41
/**
39
42
* @throws CloudTasksException
40
43
*/
41
- public function authorizeRequest ()
44
+ public function authorizeRequest ($ connection )
42
45
{
43
46
if (!$ this ->request ->hasHeader ('Authorization ' )) {
44
47
throw new CloudTasksException ('Missing [Authorization] header ' );
@@ -49,7 +52,7 @@ public function authorizeRequest()
49
52
50
53
$ decodedToken = $ this ->publicKey ->decodeOpenIdToken ($ openIdToken , $ kid );
51
54
52
- $ this ->validateToken ($ decodedToken );
55
+ $ this ->validateToken ($ connection , $ decodedToken );
53
56
}
54
57
55
58
/**
@@ -58,13 +61,13 @@ public function authorizeRequest()
58
61
* @param $openIdToken
59
62
* @throws CloudTasksException
60
63
*/
61
- protected function validateToken ($ openIdToken )
64
+ protected function validateToken ($ connection , $ openIdToken )
62
65
{
63
66
if (!in_array ($ openIdToken ->iss , ['https://accounts.google.com ' , 'accounts.google.com ' ])) {
64
67
throw new CloudTasksException ('The given OpenID token is not valid ' );
65
68
}
66
69
67
- if ($ openIdToken ->aud != Config::handler ()) {
70
+ if ($ openIdToken ->aud != Config::handler ($ connection )) {
68
71
throw new CloudTasksException ('The given OpenID token is not valid ' );
69
72
}
70
73
@@ -93,11 +96,11 @@ private function captureTask()
93
96
return $ task ;
94
97
}
95
98
96
- private function listenForEvents ()
99
+ private function listenForEvents ($ connection )
97
100
{
98
- app ('events ' )->listen (JobFailed::class, function ($ event ) {
101
+ app ('events ' )->listen (JobFailed::class, function ($ event ) use ( $ connection ) {
99
102
app ('queue.failer ' )->log (
100
- ' cloudtasks ' , $ event ->job ->getQueue (),
103
+ $ connection , $ event ->job ->getQueue (),
101
104
$ event ->job ->getRawBody (), $ event ->exception
102
105
);
103
106
});
@@ -107,24 +110,24 @@ private function listenForEvents()
107
110
* @param $task
108
111
* @throws CloudTasksException
109
112
*/
110
- private function handleTask ($ task )
113
+ private function handleTask ($ connection , $ task )
111
114
{
112
115
$ job = new CloudTasksJob ($ task );
113
116
114
117
$ job ->setAttempts (request ()->header ('X-CloudTasks-TaskRetryCount ' ) + 1 );
115
118
$ job ->setQueue (request ()->header ('X-Cloudtasks-Queuename ' ));
116
- $ job ->setMaxTries ($ this ->getQueueMaxTries ($ job ));
119
+ $ job ->setMaxTries ($ this ->getQueueMaxTries ($ connection , $ job ));
117
120
118
121
$ worker = $ this ->getQueueWorker ();
119
122
120
- $ worker ->process (' cloudtasks ' , $ job , new WorkerOptions ());
123
+ $ worker ->process ($ connection , $ job , new WorkerOptions ());
121
124
}
122
125
123
- private function getQueueMaxTries (CloudTasksJob $ job )
126
+ private function getQueueMaxTries ($ connection , CloudTasksJob $ job )
124
127
{
125
128
$ queueName = $ this ->client ->queueName (
126
- Config::project (),
127
- Config::location (),
129
+ Config::project ($ connection ),
130
+ Config::location ($ connection ),
128
131
$ job ->getQueue ()
129
132
);
130
133
0 commit comments